feat: tsma refactor
This commit is contained in:
parent
988f4126b4
commit
ed829455a9
|
@ -98,7 +98,6 @@ extern char *qtypeStr[];
|
||||||
#undef TD_DEBUG_PRINT_ROW
|
#undef TD_DEBUG_PRINT_ROW
|
||||||
#undef TD_DEBUG_PRINT_TSDB_LOAD_DCOLS
|
#undef TD_DEBUG_PRINT_TSDB_LOAD_DCOLS
|
||||||
#undef TD_DEBUG_PRINT_TAG
|
#undef TD_DEBUG_PRINT_TAG
|
||||||
#define TD_DEBUG_SMA_ID 123456
|
|
||||||
|
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
}
|
}
|
||||||
|
|
|
@ -2345,7 +2345,6 @@ typedef struct {
|
||||||
char indexName[TSDB_INDEX_NAME_LEN];
|
char indexName[TSDB_INDEX_NAME_LEN];
|
||||||
int32_t exprLen;
|
int32_t exprLen;
|
||||||
int32_t tagsFilterLen;
|
int32_t tagsFilterLen;
|
||||||
int32_t numOfVgroups; // for dstVgroup
|
|
||||||
int64_t indexUid;
|
int64_t indexUid;
|
||||||
tb_uid_t tableUid; // super/child/common table uid
|
tb_uid_t tableUid; // super/child/common table uid
|
||||||
tb_uid_t dstTbUid; // for dstVgroup
|
tb_uid_t dstTbUid; // for dstVgroup
|
||||||
|
@ -2355,7 +2354,6 @@ typedef struct {
|
||||||
char* dstTbName; // for dstVgroup
|
char* dstTbName; // for dstVgroup
|
||||||
char* expr; // sma expression
|
char* expr; // sma expression
|
||||||
char* tagsFilter;
|
char* tagsFilter;
|
||||||
SVgEpSet* pVgEpSet; // for dstVgroup
|
|
||||||
SSchemaWrapper schemaRow; // for dstVgroup
|
SSchemaWrapper schemaRow; // for dstVgroup
|
||||||
SSchemaWrapper schemaTag; // for dstVgroup
|
SSchemaWrapper schemaTag; // for dstVgroup
|
||||||
} STSma; // Time-range-wise SMA
|
} STSma; // Time-range-wise SMA
|
||||||
|
@ -2442,49 +2440,6 @@ static int32_t tDecodeTSmaWrapper(SDecoder* pDecoder, STSmaWrapper* pReq) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
typedef struct {
|
|
||||||
int64_t indexUid;
|
|
||||||
STimeWindow queryWindow;
|
|
||||||
} SVGetTsmaExpWndsReq;
|
|
||||||
|
|
||||||
#define SMA_WNDS_EXPIRE_FLAG (0x1)
|
|
||||||
#define SMA_WNDS_IS_EXPIRE(flag) (((flag)&SMA_WNDS_EXPIRE_FLAG) != 0)
|
|
||||||
#define SMA_WNDS_SET_EXPIRE(flag) ((flag) |= SMA_WNDS_EXPIRE_FLAG)
|
|
||||||
|
|
||||||
typedef struct {
|
|
||||||
int64_t indexUid;
|
|
||||||
int8_t flags; // 0x1 all window expired
|
|
||||||
int32_t numExpWnds;
|
|
||||||
TSKEY wndSKeys[];
|
|
||||||
} SVGetTsmaExpWndsRsp;
|
|
||||||
|
|
||||||
int32_t tEncodeSVGetTSmaExpWndsReq(SEncoder* pCoder, const SVGetTsmaExpWndsReq* pReq);
|
|
||||||
int32_t tDecodeSVGetTsmaExpWndsReq(SDecoder* pCoder, SVGetTsmaExpWndsReq* pReq);
|
|
||||||
int32_t tEncodeSVGetTSmaExpWndsRsp(SEncoder* pCoder, const SVGetTsmaExpWndsRsp* pReq);
|
|
||||||
int32_t tDecodeSVGetTsmaExpWndsRsp(SDecoder* pCoder, SVGetTsmaExpWndsRsp* pReq);
|
|
||||||
|
|
||||||
typedef struct {
|
|
||||||
int64_t nKeys; // n consecutive keys since skey
|
|
||||||
int64_t skey;
|
|
||||||
} SVTsmaExpWndItem;
|
|
||||||
|
|
||||||
typedef struct {
|
|
||||||
int64_t indexUid;
|
|
||||||
int64_t version; // tsma result version
|
|
||||||
int64_t nItems;
|
|
||||||
SVTsmaExpWndItem items[];
|
|
||||||
} SVClrTsmaExpWndsReq;
|
|
||||||
|
|
||||||
typedef struct {
|
|
||||||
int64_t indexUid;
|
|
||||||
int32_t code;
|
|
||||||
} SVClrTsmaExpWndsRsp;
|
|
||||||
|
|
||||||
int32_t tEncodeSVClrTsmaExpWndsReq(SEncoder* pCoder, const SVClrTsmaExpWndsReq* pReq);
|
|
||||||
int32_t tDecodeSVClrTsmaExpWndsReq(SDecoder* pCoder, SVClrTsmaExpWndsReq* pReq);
|
|
||||||
int32_t tEncodeSVClrTsmaExpWndsRsp(SEncoder* pCoder, const SVClrTsmaExpWndsRsp* pReq);
|
|
||||||
int32_t tDecodeSVClrTsmaExpWndsRsp(SDecoder* pCoder, SVClrTsmaExpWndsRsp* pReq);
|
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
int idx;
|
int idx;
|
||||||
} SMCreateFullTextReq;
|
} SMCreateFullTextReq;
|
||||||
|
|
|
@ -190,8 +190,6 @@ enum {
|
||||||
TD_DEF_MSG_TYPE(TDMT_VND_CANCEL_SMA, "vnode-cancel-sma", NULL, NULL)
|
TD_DEF_MSG_TYPE(TDMT_VND_CANCEL_SMA, "vnode-cancel-sma", NULL, NULL)
|
||||||
TD_DEF_MSG_TYPE(TDMT_VND_DROP_SMA, "vnode-drop-sma", NULL, NULL)
|
TD_DEF_MSG_TYPE(TDMT_VND_DROP_SMA, "vnode-drop-sma", NULL, NULL)
|
||||||
TD_DEF_MSG_TYPE(TDMT_VND_SUBMIT_RSMA, "vnode-submit-rsma", SSubmitReq, SSubmitRsp)
|
TD_DEF_MSG_TYPE(TDMT_VND_SUBMIT_RSMA, "vnode-submit-rsma", SSubmitReq, SSubmitRsp)
|
||||||
TD_DEF_MSG_TYPE(TDMT_VND_GET_TSMA_EXP_WNDS, "vnode-get-tsma-expired-windows", SVGetTsmaExpWndsReq, SVGetTsmaExpWndsRsp)
|
|
||||||
TD_DEF_MSG_TYPE(TDMT_VND_CLR_TSMA_EXP_WNDS, "vnode-clr-tsma-expired-windows", SVClrTsmaExpWndsReq, SVClrTsmaExpWndsRsp)
|
|
||||||
TD_DEF_MSG_TYPE(TDMT_VND_DELETE, "delete-data", SVDeleteReq, SVDeleteRsp)
|
TD_DEF_MSG_TYPE(TDMT_VND_DELETE, "delete-data", SVDeleteReq, SVDeleteRsp)
|
||||||
TD_DEF_MSG_TYPE(TDMT_VND_ALTER_CONFIG, "alter-config", NULL, NULL)
|
TD_DEF_MSG_TYPE(TDMT_VND_ALTER_CONFIG, "alter-config", NULL, NULL)
|
||||||
TD_DEF_MSG_TYPE(TDMT_VND_ALTER_REPLICA, "alter-replica", NULL, NULL)
|
TD_DEF_MSG_TYPE(TDMT_VND_ALTER_REPLICA, "alter-replica", NULL, NULL)
|
||||||
|
|
|
@ -3877,7 +3877,6 @@ int32_t tEncodeTSma(SEncoder *pCoder, const STSma *pSma) {
|
||||||
if (tEncodeCStr(pCoder, pSma->indexName) < 0) return -1;
|
if (tEncodeCStr(pCoder, pSma->indexName) < 0) return -1;
|
||||||
if (tEncodeI32(pCoder, pSma->exprLen) < 0) return -1;
|
if (tEncodeI32(pCoder, pSma->exprLen) < 0) return -1;
|
||||||
if (tEncodeI32(pCoder, pSma->tagsFilterLen) < 0) return -1;
|
if (tEncodeI32(pCoder, pSma->tagsFilterLen) < 0) return -1;
|
||||||
if (tEncodeI32(pCoder, pSma->numOfVgroups) < 0) return -1;
|
|
||||||
if (tEncodeI64(pCoder, pSma->indexUid) < 0) return -1;
|
if (tEncodeI64(pCoder, pSma->indexUid) < 0) return -1;
|
||||||
if (tEncodeI64(pCoder, pSma->tableUid) < 0) return -1;
|
if (tEncodeI64(pCoder, pSma->tableUid) < 0) return -1;
|
||||||
if (tEncodeI64(pCoder, pSma->dstTbUid) < 0) return -1;
|
if (tEncodeI64(pCoder, pSma->dstTbUid) < 0) return -1;
|
||||||
|
@ -3892,22 +3891,8 @@ int32_t tEncodeTSma(SEncoder *pCoder, const STSma *pSma) {
|
||||||
if (tEncodeCStr(pCoder, pSma->tagsFilter) < 0) return -1;
|
if (tEncodeCStr(pCoder, pSma->tagsFilter) < 0) return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pSma->numOfVgroups) { // only needed in dstVgroup
|
tEncodeSSchemaWrapper(pCoder, &pSma->schemaRow);
|
||||||
for (int32_t v = 0; v < pSma->numOfVgroups; ++v) {
|
tEncodeSSchemaWrapper(pCoder, &pSma->schemaTag);
|
||||||
if (tEncodeI32(pCoder, pSma->pVgEpSet[v].vgId) < 0) return -1;
|
|
||||||
if (tEncodeI8(pCoder, pSma->pVgEpSet[v].epSet.inUse) < 0) return -1;
|
|
||||||
int8_t numOfEps = pSma->pVgEpSet[v].epSet.numOfEps;
|
|
||||||
if (tEncodeI8(pCoder, numOfEps) < 0) return -1;
|
|
||||||
for (int32_t n = 0; n < numOfEps; ++n) {
|
|
||||||
const SEp *pEp = &pSma->pVgEpSet[v].epSet.eps[n];
|
|
||||||
if (tEncodeCStr(pCoder, pEp->fqdn) < 0) return -1;
|
|
||||||
if (tEncodeU16(pCoder, pEp->port) < 0) return -1;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
tEncodeSSchemaWrapper(pCoder, &pSma->schemaRow);
|
|
||||||
tEncodeSSchemaWrapper(pCoder, &pSma->schemaTag);
|
|
||||||
}
|
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
@ -3921,7 +3906,6 @@ int32_t tDecodeTSma(SDecoder *pCoder, STSma *pSma) {
|
||||||
if (tDecodeCStrTo(pCoder, pSma->indexName) < 0) return -1;
|
if (tDecodeCStrTo(pCoder, pSma->indexName) < 0) return -1;
|
||||||
if (tDecodeI32(pCoder, &pSma->exprLen) < 0) return -1;
|
if (tDecodeI32(pCoder, &pSma->exprLen) < 0) return -1;
|
||||||
if (tDecodeI32(pCoder, &pSma->tagsFilterLen) < 0) return -1;
|
if (tDecodeI32(pCoder, &pSma->tagsFilterLen) < 0) return -1;
|
||||||
if (tDecodeI32(pCoder, &pSma->numOfVgroups) < 0) return -1;
|
|
||||||
if (tDecodeI64(pCoder, &pSma->indexUid) < 0) return -1;
|
if (tDecodeI64(pCoder, &pSma->indexUid) < 0) return -1;
|
||||||
if (tDecodeI64(pCoder, &pSma->tableUid) < 0) return -1;
|
if (tDecodeI64(pCoder, &pSma->tableUid) < 0) return -1;
|
||||||
if (tDecodeI64(pCoder, &pSma->dstTbUid) < 0) return -1;
|
if (tDecodeI64(pCoder, &pSma->dstTbUid) < 0) return -1;
|
||||||
|
@ -3939,30 +3923,9 @@ int32_t tDecodeTSma(SDecoder *pCoder, STSma *pSma) {
|
||||||
} else {
|
} else {
|
||||||
pSma->tagsFilter = NULL;
|
pSma->tagsFilter = NULL;
|
||||||
}
|
}
|
||||||
if (pSma->numOfVgroups > 0) { // only needed in dstVgroup
|
// only needed in dstVgroup
|
||||||
pSma->pVgEpSet = (SVgEpSet *)tDecoderMalloc(pCoder, pSma->numOfVgroups * sizeof(SVgEpSet));
|
tDecodeSSchemaWrapperEx(pCoder, &pSma->schemaRow);
|
||||||
if (!pSma->pVgEpSet) {
|
tDecodeSSchemaWrapperEx(pCoder, &pSma->schemaTag);
|
||||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
|
|
||||||
memset(pSma->pVgEpSet, 0, pSma->numOfVgroups * sizeof(SVgEpSet));
|
|
||||||
|
|
||||||
for (int32_t v = 0; v < pSma->numOfVgroups; ++v) {
|
|
||||||
if (tDecodeI32(pCoder, &pSma->pVgEpSet[v].vgId) < 0) return -1;
|
|
||||||
if (tDecodeI8(pCoder, &pSma->pVgEpSet[v].epSet.inUse) < 0) return -1;
|
|
||||||
if (tDecodeI8(pCoder, &pSma->pVgEpSet[v].epSet.numOfEps) < 0) return -1;
|
|
||||||
int8_t numOfEps = pSma->pVgEpSet[v].epSet.numOfEps;
|
|
||||||
for (int32_t n = 0; n < numOfEps; ++n) {
|
|
||||||
SEp *pEp = &pSma->pVgEpSet[v].epSet.eps[n];
|
|
||||||
if (tDecodeCStrTo(pCoder, pEp->fqdn) < 0) return -1;
|
|
||||||
if (tDecodeU16(pCoder, &pEp->port) < 0) return -1;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
tDecodeSSchemaWrapperEx(pCoder, &pSma->schemaRow);
|
|
||||||
tDecodeSSchemaWrapperEx(pCoder, &pSma->schemaTag);
|
|
||||||
}
|
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
@ -4005,98 +3968,6 @@ int32_t tDecodeSVDropTSmaReq(SDecoder *pCoder, SVDropTSmaReq *pReq) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t tEncodeSVGetTSmaExpWndsReq(SEncoder *pCoder, const SVGetTsmaExpWndsReq *pReq) {
|
|
||||||
if (tStartEncode(pCoder) < 0) return -1;
|
|
||||||
|
|
||||||
if (tEncodeI64(pCoder, pReq->indexUid) < 0) return -1;
|
|
||||||
if (tEncodeI64(pCoder, pReq->queryWindow.skey) < 0) return -1;
|
|
||||||
if (tEncodeI64(pCoder, pReq->queryWindow.ekey) < 0) return -1;
|
|
||||||
|
|
||||||
tEndEncode(pCoder);
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
int32_t tDecodeSVGetTsmaExpWndsReq(SDecoder *pCoder, SVGetTsmaExpWndsReq *pReq) {
|
|
||||||
if (tStartDecode(pCoder) < 0) return -1;
|
|
||||||
|
|
||||||
if (tDecodeI64(pCoder, &pReq->indexUid) < 0) return -1;
|
|
||||||
if (tDecodeI64(pCoder, &pReq->queryWindow.skey) < 0) return -1;
|
|
||||||
if (tDecodeI64(pCoder, &pReq->queryWindow.ekey) < 0) return -1;
|
|
||||||
|
|
||||||
tEndDecode(pCoder);
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
int32_t tEncodeSVGetTSmaExpWndsRsp(SEncoder *pCoder, const SVGetTsmaExpWndsRsp *pReq) {
|
|
||||||
if (tStartEncode(pCoder) < 0) return -1;
|
|
||||||
|
|
||||||
if (tEncodeI64(pCoder, pReq->indexUid) < 0) return -1;
|
|
||||||
if (tEncodeI8(pCoder, pReq->flags) < 0) return -1;
|
|
||||||
if (tEncodeI32(pCoder, pReq->numExpWnds) < 0) return -1;
|
|
||||||
for (int32_t i = 0; i < pReq->numExpWnds; ++i) {
|
|
||||||
if (tEncodeI64(pCoder, pReq->wndSKeys[i]) < 0) return -1;
|
|
||||||
}
|
|
||||||
tEndEncode(pCoder);
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
int32_t tDecodeSVGetTsmaExpWndsRsp(SDecoder *pCoder, SVGetTsmaExpWndsRsp *pReq) {
|
|
||||||
if (tStartDecode(pCoder) < 0) return -1;
|
|
||||||
|
|
||||||
if (tDecodeI64(pCoder, &pReq->indexUid) < 0) return -1;
|
|
||||||
if (tDecodeI8(pCoder, &pReq->flags) < 0) return -1;
|
|
||||||
if (tDecodeI32(pCoder, &pReq->numExpWnds) < 0) return -1;
|
|
||||||
for (int32_t i = 0; i < pReq->numExpWnds; ++i) {
|
|
||||||
if (tDecodeI64(pCoder, &pReq->wndSKeys[i]) < 0) return -1;
|
|
||||||
}
|
|
||||||
|
|
||||||
tEndDecode(pCoder);
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
int32_t tEncodeSVClrTsmaExpWndsReq(SEncoder *pCoder, const SVClrTsmaExpWndsReq *pReq) {
|
|
||||||
if (tStartEncode(pCoder) < 0) return -1;
|
|
||||||
if (tEncodeI64(pCoder, pReq->indexUid) < 0) return -1;
|
|
||||||
if (tEncodeI64(pCoder, pReq->version) < 0) return -1;
|
|
||||||
if (tEncodeI64v(pCoder, pReq->nItems) < 0) return -1;
|
|
||||||
for (int64_t n = 0; pReq->nItems; ++n) {
|
|
||||||
if (tEncodeI64v(pCoder, pReq->items[n].nKeys) < 0) return -1;
|
|
||||||
if (tEncodeI64(pCoder, pReq->items[n].skey) < 0) return -1;
|
|
||||||
}
|
|
||||||
tEndEncode(pCoder);
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
int32_t tDecodeSVClrTsmaExpWndsReq(SDecoder *pCoder, SVClrTsmaExpWndsReq *pReq) {
|
|
||||||
if (tStartDecode(pCoder) < 0) return -1;
|
|
||||||
if (tDecodeI64(pCoder, &pReq->indexUid) < 0) return -1;
|
|
||||||
if (tDecodeI64(pCoder, &pReq->version) < 0) return -1;
|
|
||||||
if (tDecodeI64v(pCoder, &pReq->nItems) < 0) return -1;
|
|
||||||
|
|
||||||
for (int64_t i = 0; i < pReq->nItems; ++i) {
|
|
||||||
if (tDecodeI64v(pCoder, &pReq->items[i].nKeys) < 0) return -1;
|
|
||||||
if (tDecodeI64(pCoder, &pReq->items[i].skey) < 0) return -1;
|
|
||||||
}
|
|
||||||
tEndDecode(pCoder);
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
int32_t tEncodeSVClrTsmaExpWndsRsp(SEncoder *pCoder, const SVClrTsmaExpWndsRsp *pReq) {
|
|
||||||
if (tStartEncode(pCoder) < 0) return -1;
|
|
||||||
if (tEncodeI64(pCoder, pReq->indexUid) < 0) return -1;
|
|
||||||
if (tEncodeI32v(pCoder, pReq->code) < 0) return -1;
|
|
||||||
tEndEncode(pCoder);
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
int32_t tDecodeSVClrTsmaExpWndsRsp(SDecoder *pCoder, SVClrTsmaExpWndsRsp *pReq) {
|
|
||||||
if (tStartDecode(pCoder) < 0) return -1;
|
|
||||||
if (tDecodeI64(pCoder, &pReq->indexUid) < 0) return -1;
|
|
||||||
if (tDecodeI32v(pCoder, &pReq->code) < 0) return -1;
|
|
||||||
tEndDecode(pCoder);
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
int32_t tSerializeSVDeleteReq(void *buf, int32_t bufLen, SVDeleteReq *pReq) {
|
int32_t tSerializeSVDeleteReq(void *buf, int32_t bufLen, SVDeleteReq *pReq) {
|
||||||
int32_t headLen = sizeof(SMsgHead);
|
int32_t headLen = sizeof(SMsgHead);
|
||||||
if (buf != NULL) {
|
if (buf != NULL) {
|
||||||
|
|
|
@ -347,7 +347,6 @@ SArray *vmGetMsgHandles() {
|
||||||
if (dmSetMgmtHandle(pArray, TDMT_VND_CONSUME, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
|
if (dmSetMgmtHandle(pArray, TDMT_VND_CONSUME, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
|
||||||
if (dmSetMgmtHandle(pArray, TDMT_VND_DELETE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
|
if (dmSetMgmtHandle(pArray, TDMT_VND_DELETE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
|
||||||
if (dmSetMgmtHandle(pArray, TDMT_VND_QUERY_HEARTBEAT, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
|
if (dmSetMgmtHandle(pArray, TDMT_VND_QUERY_HEARTBEAT, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
|
||||||
if (dmSetMgmtHandle(pArray, TDMT_VND_CLR_TSMA_EXP_WNDS, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
|
|
||||||
|
|
||||||
if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_TRIGGER, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
|
if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_TRIGGER, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
|
||||||
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_DEPLOY, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
|
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_DEPLOY, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
|
||||||
|
|
|
@ -318,12 +318,10 @@ typedef struct {
|
||||||
int32_t tagsFilterLen;
|
int32_t tagsFilterLen;
|
||||||
int32_t sqlLen;
|
int32_t sqlLen;
|
||||||
int32_t astLen;
|
int32_t astLen;
|
||||||
int32_t numOfVgroups; // for dstVgroup
|
|
||||||
char* expr;
|
char* expr;
|
||||||
char* tagsFilter;
|
char* tagsFilter;
|
||||||
char* sql;
|
char* sql;
|
||||||
char* ast;
|
char* ast;
|
||||||
SVgEpSet* pVgEpSet; // for dstVgroup
|
|
||||||
SSchemaWrapper schemaRow; // for dstVgroup
|
SSchemaWrapper schemaRow; // for dstVgroup
|
||||||
SSchemaWrapper schemaTag; // for dstVgroup
|
SSchemaWrapper schemaTag; // for dstVgroup
|
||||||
} SSmaObj;
|
} SSmaObj;
|
||||||
|
|
|
@ -272,8 +272,6 @@ static void *mndBuildVCreateSmaReq(SMnode *pMnode, SVgObj *pVgroup, SSmaObj *pSm
|
||||||
req.sliding = pSma->sliding;
|
req.sliding = pSma->sliding;
|
||||||
req.expr = pSma->expr;
|
req.expr = pSma->expr;
|
||||||
req.tagsFilter = pSma->tagsFilter;
|
req.tagsFilter = pSma->tagsFilter;
|
||||||
req.numOfVgroups = pSma->numOfVgroups;
|
|
||||||
req.pVgEpSet = pSma->pVgEpSet;
|
|
||||||
req.schemaRow = pSma->schemaRow;
|
req.schemaRow = pSma->schemaRow;
|
||||||
req.schemaTag = pSma->schemaTag;
|
req.schemaTag = pSma->schemaTag;
|
||||||
req.dstTbName = pSma->dstTbName;
|
req.dstTbName = pSma->dstTbName;
|
||||||
|
@ -435,7 +433,6 @@ static int32_t mndSetCreateSmaVgroupRedoActions(SMnode *pMnode, STrans *pTrans,
|
||||||
mndReleaseDnode(pMnode, pDnode);
|
mndReleaseDnode(pMnode, pDnode);
|
||||||
|
|
||||||
// todo add sma info here
|
// todo add sma info here
|
||||||
#if 1
|
|
||||||
SNode *pAst = NULL;
|
SNode *pAst = NULL;
|
||||||
if (nodesStringToNode(pSma->ast, &pAst) < 0) {
|
if (nodesStringToNode(pSma->ast, &pAst) < 0) {
|
||||||
return -1;
|
return -1;
|
||||||
|
@ -452,7 +449,6 @@ static int32_t mndSetCreateSmaVgroupRedoActions(SMnode *pMnode, STrans *pTrans,
|
||||||
pSma->schemaTag.version = 1;
|
pSma->schemaTag.version = 1;
|
||||||
pSma->schemaTag.pSchema = taosMemoryCalloc(1, sizeof(SSchema));
|
pSma->schemaTag.pSchema = taosMemoryCalloc(1, sizeof(SSchema));
|
||||||
if (!pSma->schemaTag.pSchema) {
|
if (!pSma->schemaTag.pSchema) {
|
||||||
nodesDestroyNode(pAst);
|
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
pSma->schemaTag.pSchema[0].type = TSDB_DATA_TYPE_BIGINT;
|
pSma->schemaTag.pSchema[0].type = TSDB_DATA_TYPE_BIGINT;
|
||||||
|
@ -461,17 +457,7 @@ static int32_t mndSetCreateSmaVgroupRedoActions(SMnode *pMnode, STrans *pTrans,
|
||||||
pSma->schemaTag.pSchema[0].flags = 0;
|
pSma->schemaTag.pSchema[0].flags = 0;
|
||||||
snprintf(pSma->schemaTag.pSchema[0].name, TSDB_COL_NAME_LEN, "groupId");
|
snprintf(pSma->schemaTag.pSchema[0].name, TSDB_COL_NAME_LEN, "groupId");
|
||||||
|
|
||||||
SVgEpSet *pVgEpSet = NULL;
|
|
||||||
int32_t numOfVgroups = 0;
|
|
||||||
if (mndSmaGetVgEpSet(pMnode, pDb, &pVgEpSet, &numOfVgroups) != 0) {
|
|
||||||
nodesDestroyNode(pAst);
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
nodesDestroyNode(pAst);
|
|
||||||
|
|
||||||
pSma->pVgEpSet = pVgEpSet;
|
|
||||||
pSma->numOfVgroups = numOfVgroups;
|
|
||||||
#endif
|
|
||||||
int32_t smaContLen = 0;
|
int32_t smaContLen = 0;
|
||||||
void *pSmaReq = mndBuildVCreateSmaReq(pMnode, pVgroup, pSma, &smaContLen);
|
void *pSmaReq = mndBuildVCreateSmaReq(pMnode, pVgroup, pSma, &smaContLen);
|
||||||
if (pSmaReq == NULL) return -1;
|
if (pSmaReq == NULL) return -1;
|
||||||
|
@ -501,10 +487,7 @@ static int32_t mndCreateSma(SMnode *pMnode, SRpcMsg *pReq, SMCreateSmaReq *pCrea
|
||||||
memcpy(smaObj.stb, pStb->name, TSDB_TABLE_FNAME_LEN);
|
memcpy(smaObj.stb, pStb->name, TSDB_TABLE_FNAME_LEN);
|
||||||
memcpy(smaObj.db, pDb->name, TSDB_DB_FNAME_LEN);
|
memcpy(smaObj.db, pDb->name, TSDB_DB_FNAME_LEN);
|
||||||
smaObj.createdTime = taosGetTimestampMs();
|
smaObj.createdTime = taosGetTimestampMs();
|
||||||
#if 0
|
|
||||||
smaObj.uid = mndGenerateUid(pCreate->name, TSDB_TABLE_FNAME_LEN);
|
smaObj.uid = mndGenerateUid(pCreate->name, TSDB_TABLE_FNAME_LEN);
|
||||||
#endif
|
|
||||||
smaObj.uid = TD_DEBUG_SMA_ID;
|
|
||||||
char resultTbName[TSDB_TABLE_FNAME_LEN + 16] = {0};
|
char resultTbName[TSDB_TABLE_FNAME_LEN + 16] = {0};
|
||||||
snprintf(resultTbName, TSDB_TABLE_FNAME_LEN + 16, "td.tsma.rst.tb.%s", pCreate->name);
|
snprintf(resultTbName, TSDB_TABLE_FNAME_LEN + 16, "td.tsma.rst.tb.%s", pCreate->name);
|
||||||
memcpy(smaObj.dstTbName, resultTbName, TSDB_TABLE_FNAME_LEN);
|
memcpy(smaObj.dstTbName, resultTbName, TSDB_TABLE_FNAME_LEN);
|
||||||
|
@ -514,7 +497,6 @@ static int32_t mndCreateSma(SMnode *pMnode, SRpcMsg *pReq, SMCreateSmaReq *pCrea
|
||||||
smaObj.intervalUnit = pCreate->intervalUnit;
|
smaObj.intervalUnit = pCreate->intervalUnit;
|
||||||
smaObj.slidingUnit = pCreate->slidingUnit;
|
smaObj.slidingUnit = pCreate->slidingUnit;
|
||||||
smaObj.timezone = pCreate->timezone;
|
smaObj.timezone = pCreate->timezone;
|
||||||
// smaObj.dstVgId = pCreate->dstVgId;
|
|
||||||
smaObj.interval = pCreate->interval;
|
smaObj.interval = pCreate->interval;
|
||||||
smaObj.offset = pCreate->offset;
|
smaObj.offset = pCreate->offset;
|
||||||
smaObj.sliding = pCreate->sliding;
|
smaObj.sliding = pCreate->sliding;
|
||||||
|
@ -547,42 +529,6 @@ static int32_t mndCreateSma(SMnode *pMnode, SRpcMsg *pReq, SMCreateSmaReq *pCrea
|
||||||
memcpy(smaObj.ast, pCreate->ast, smaObj.astLen);
|
memcpy(smaObj.ast, pCreate->ast, smaObj.astLen);
|
||||||
}
|
}
|
||||||
|
|
||||||
#if 1 // only for debugging, not needed in common vgroups, only needed in dstVgroup.
|
|
||||||
SNode *pAst = NULL;
|
|
||||||
if (nodesStringToNode(smaObj.ast, &pAst) < 0) {
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
if (qExtractResultSchema(pAst, &smaObj.schemaRow.nCols, &smaObj.schemaRow.pSchema) != 0) {
|
|
||||||
nodesDestroyNode(pAst);
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
smaObj.schemaRow.version = 1;
|
|
||||||
|
|
||||||
smaObj.schemaTag.nCols = 1;
|
|
||||||
smaObj.schemaTag.version = 1;
|
|
||||||
smaObj.schemaTag.pSchema = taosMemoryCalloc(1, sizeof(SSchema));
|
|
||||||
if (!smaObj.schemaTag.pSchema) {
|
|
||||||
nodesDestroyNode(pAst);
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
smaObj.schemaTag.pSchema[0].type = TSDB_DATA_TYPE_BIGINT;
|
|
||||||
smaObj.schemaTag.pSchema[0].bytes = TYPE_BYTES[TSDB_DATA_TYPE_BIGINT];
|
|
||||||
smaObj.schemaTag.pSchema[0].colId = smaObj.schemaRow.nCols + PRIMARYKEY_TIMESTAMP_COL_ID;
|
|
||||||
smaObj.schemaTag.pSchema[0].flags = 0;
|
|
||||||
snprintf(smaObj.schemaTag.pSchema[0].name, TSDB_COL_NAME_LEN, "groupId");
|
|
||||||
|
|
||||||
nodesDestroyNode(pAst);
|
|
||||||
|
|
||||||
SVgEpSet *pVgEpSet = NULL;
|
|
||||||
int32_t numOfVgroups = 0;
|
|
||||||
if (mndSmaGetVgEpSet(pMnode, pDb, &pVgEpSet, &numOfVgroups) != 0) {
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
|
|
||||||
smaObj.pVgEpSet = pVgEpSet;
|
|
||||||
smaObj.numOfVgroups = numOfVgroups;
|
|
||||||
#endif
|
|
||||||
|
|
||||||
SStreamObj streamObj = {0};
|
SStreamObj streamObj = {0};
|
||||||
tstrncpy(streamObj.name, pCreate->name, TSDB_STREAM_FNAME_LEN);
|
tstrncpy(streamObj.name, pCreate->name, TSDB_STREAM_FNAME_LEN);
|
||||||
tstrncpy(streamObj.sourceDb, pDb->name, TSDB_DB_FNAME_LEN);
|
tstrncpy(streamObj.sourceDb, pDb->name, TSDB_DB_FNAME_LEN);
|
||||||
|
@ -1169,52 +1115,3 @@ static void mndCancelGetNextSma(SMnode *pMnode, void *pIter) {
|
||||||
SSdb *pSdb = pMnode->pSdb;
|
SSdb *pSdb = pMnode->pSdb;
|
||||||
sdbCancelFetch(pSdb, pIter);
|
sdbCancelFetch(pSdb, pIter);
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t mndSmaGetVgEpSet(SMnode *pMnode, SDbObj *pDb, SVgEpSet **ppVgEpSet, int32_t *numOfVgroups) {
|
|
||||||
SSdb *pSdb = pMnode->pSdb;
|
|
||||||
SVgObj *pVgroup = NULL;
|
|
||||||
void *pIter = NULL;
|
|
||||||
SVgEpSet *pVgEpSet = NULL;
|
|
||||||
int32_t nAllocVgs = 16;
|
|
||||||
int32_t nVgs = 0;
|
|
||||||
|
|
||||||
pVgEpSet = taosMemoryCalloc(nAllocVgs, sizeof(SVgEpSet));
|
|
||||||
if (!pVgEpSet) {
|
|
||||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
|
|
||||||
while (1) {
|
|
||||||
pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void **)&pVgroup);
|
|
||||||
if (pIter == NULL) break;
|
|
||||||
if (pVgroup->dbUid != pDb->uid) {
|
|
||||||
sdbRelease(pSdb, pVgroup);
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (nVgs >= nAllocVgs) {
|
|
||||||
void *p = taosMemoryRealloc(pVgEpSet, nAllocVgs * 2 * sizeof(SVgEpSet));
|
|
||||||
if (!p) {
|
|
||||||
taosMemoryFree(pVgEpSet);
|
|
||||||
sdbCancelFetch(pSdb, pIter);
|
|
||||||
sdbRelease(pSdb, pVgroup);
|
|
||||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
pVgEpSet = (SVgEpSet *)p;
|
|
||||||
nAllocVgs *= 2;
|
|
||||||
}
|
|
||||||
|
|
||||||
(pVgEpSet + nVgs)->vgId = pVgroup->vgId;
|
|
||||||
(pVgEpSet + nVgs)->epSet = mndGetVgroupEpset(pMnode, pVgroup);
|
|
||||||
|
|
||||||
++nVgs;
|
|
||||||
|
|
||||||
sdbRelease(pSdb, pVgroup);
|
|
||||||
}
|
|
||||||
|
|
||||||
*ppVgEpSet = pVgEpSet;
|
|
||||||
*numOfVgroups = nVgs;
|
|
||||||
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
|
@ -28,7 +28,6 @@ target_sources(
|
||||||
|
|
||||||
# sma
|
# sma
|
||||||
"src/sma/sma.c"
|
"src/sma/sma.c"
|
||||||
"src/sma/smaTDBImpl.c"
|
|
||||||
"src/sma/smaEnv.c"
|
"src/sma/smaEnv.c"
|
||||||
"src/sma/smaOpen.c"
|
"src/sma/smaOpen.c"
|
||||||
"src/sma/smaRollup.c"
|
"src/sma/smaRollup.c"
|
||||||
|
|
|
@ -43,35 +43,17 @@ typedef struct SRSmaInfo SRSmaInfo;
|
||||||
struct SSmaEnv {
|
struct SSmaEnv {
|
||||||
TdThreadRwlock lock;
|
TdThreadRwlock lock;
|
||||||
int8_t type;
|
int8_t type;
|
||||||
TXN txn;
|
|
||||||
void *pPool; // SPoolMem
|
|
||||||
SDiskID did;
|
|
||||||
TDB *dbEnv; // TODO: If it's better to put it in smaIndex level?
|
|
||||||
char *path; // relative path
|
|
||||||
SSmaStat *pStat;
|
SSmaStat *pStat;
|
||||||
};
|
};
|
||||||
|
|
||||||
#define SMA_ENV_LOCK(env) ((env)->lock)
|
#define SMA_ENV_LOCK(env) ((env)->lock)
|
||||||
#define SMA_ENV_TYPE(env) ((env)->type)
|
#define SMA_ENV_TYPE(env) ((env)->type)
|
||||||
#define SMA_ENV_DID(env) ((env)->did)
|
|
||||||
#define SMA_ENV_ENV(env) ((env)->dbEnv)
|
|
||||||
#define SMA_ENV_PATH(env) ((env)->path)
|
|
||||||
#define SMA_ENV_STAT(env) ((env)->pStat)
|
#define SMA_ENV_STAT(env) ((env)->pStat)
|
||||||
#define SMA_ENV_STAT_ITEMS(env) ((env)->pStat->smaStatItems)
|
#define SMA_ENV_STAT_ITEMS(env) ((env)->pStat->smaStatItems)
|
||||||
|
|
||||||
struct SSmaStatItem {
|
struct SSmaStatItem {
|
||||||
/**
|
int8_t state; // ETsdbSmaStat
|
||||||
* @brief The field 'state' is here to demonstrate if one smaIndex is ready to provide service.
|
STSma *pTSma; // cache schema
|
||||||
* - TSDB_SMA_STAT_OK: 1) The sma calculation of history data is finished; 2) Or recevied information from
|
|
||||||
* Streaming Module or TSDB local persistence.
|
|
||||||
* - TSDB_SMA_STAT_EXPIRED: 1) If sma calculation of history TS data is not finished; 2) Or if the TSDB is open,
|
|
||||||
* without information about its previous state.
|
|
||||||
* - TSDB_SMA_STAT_DROPPED: 1)sma dropped
|
|
||||||
* N.B. only applicable to tsma
|
|
||||||
*/
|
|
||||||
int8_t state; // ETsdbSmaStat
|
|
||||||
SHashObj *expireWindows; // key: skey of time window, value: version
|
|
||||||
STSma *pTSma; // cache schema
|
|
||||||
};
|
};
|
||||||
|
|
||||||
struct SSmaStat {
|
struct SSmaStat {
|
||||||
|
@ -84,29 +66,6 @@ struct SSmaStat {
|
||||||
#define SMA_STAT_ITEMS(s) ((s)->smaStatItems)
|
#define SMA_STAT_ITEMS(s) ((s)->smaStatItems)
|
||||||
#define SMA_STAT_INFO_HASH(s) ((s)->rsmaInfoHash)
|
#define SMA_STAT_INFO_HASH(s) ((s)->rsmaInfoHash)
|
||||||
|
|
||||||
struct SSmaKey {
|
|
||||||
TSKEY skey;
|
|
||||||
int64_t groupId;
|
|
||||||
};
|
|
||||||
|
|
||||||
typedef struct SDBFile SDBFile;
|
|
||||||
|
|
||||||
struct SDBFile {
|
|
||||||
int32_t fid;
|
|
||||||
TTB *pDB;
|
|
||||||
char *path;
|
|
||||||
};
|
|
||||||
|
|
||||||
int32_t tdSmaBeginCommit(SSmaEnv *pEnv);
|
|
||||||
int32_t tdSmaEndCommit(SSmaEnv *pEnv);
|
|
||||||
|
|
||||||
int32_t smaOpenDBEnv(TDB **ppEnv, const char *path);
|
|
||||||
int32_t smaCloseDBEnv(TDB *pEnv);
|
|
||||||
int32_t smaOpenDBF(TDB *pEnv, SDBFile *pDBF);
|
|
||||||
int32_t smaCloseDBF(SDBFile *pDBF);
|
|
||||||
int32_t smaSaveSmaToDB(SDBFile *pDBF, void *pKey, int32_t keyLen, void *pVal, int32_t valLen, TXN *txn);
|
|
||||||
void *smaGetSmaDataByKey(SDBFile *pDBF, const void *pKey, int32_t keyLen, int32_t *valLen);
|
|
||||||
|
|
||||||
void tdDestroySmaEnv(SSmaEnv *pSmaEnv);
|
void tdDestroySmaEnv(SSmaEnv *pSmaEnv);
|
||||||
void *tdFreeSmaEnv(SSmaEnv *pSmaEnv);
|
void *tdFreeSmaEnv(SSmaEnv *pSmaEnv);
|
||||||
#if 0
|
#if 0
|
||||||
|
@ -114,13 +73,6 @@ int32_t tbGetTSmaStatus(SSma *pSma, STSma *param, void *result);
|
||||||
int32_t tbRemoveTSmaData(SSma *pSma, STSma *param, STimeWindow *pWin);
|
int32_t tbRemoveTSmaData(SSma *pSma, STSma *param, STimeWindow *pWin);
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
static FORCE_INLINE int32_t tdEncodeTSmaKey(int64_t groupId, TSKEY tsKey, void **pData) {
|
|
||||||
int32_t len = 0;
|
|
||||||
len += taosEncodeFixedI64(pData, tsKey);
|
|
||||||
len += taosEncodeFixedI64(pData, groupId);
|
|
||||||
return len;
|
|
||||||
}
|
|
||||||
|
|
||||||
int32_t tdInitSma(SSma *pSma);
|
int32_t tdInitSma(SSma *pSma);
|
||||||
int32_t tdDropTSma(SSma *pSma, char *pMsg);
|
int32_t tdDropTSma(SSma *pSma, char *pMsg);
|
||||||
int32_t tdDropTSmaData(SSma *pSma, int64_t indexUid);
|
int32_t tdDropTSmaData(SSma *pSma, int64_t indexUid);
|
||||||
|
@ -133,8 +85,6 @@ int32_t tdCheckAndInitSmaEnv(SSma *pSma, int8_t smaType, bool onlyCheck);
|
||||||
int32_t tdLockSma(SSma *pSma);
|
int32_t tdLockSma(SSma *pSma);
|
||||||
int32_t tdUnLockSma(SSma *pSma);
|
int32_t tdUnLockSma(SSma *pSma);
|
||||||
|
|
||||||
int32_t tdProcessTSmaInsertImpl(SSma *pSma, int64_t indexUid, const char *msg);
|
|
||||||
|
|
||||||
static FORCE_INLINE int16_t tdTSmaAdd(SSma *pSma, int16_t n) { return atomic_add_fetch_16(&SMA_TSMA_NUM(pSma), n); }
|
static FORCE_INLINE int16_t tdTSmaAdd(SSma *pSma, int16_t n) { return atomic_add_fetch_16(&SMA_TSMA_NUM(pSma), n); }
|
||||||
static FORCE_INLINE int16_t tdTSmaSub(SSma *pSma, int16_t n) { return atomic_sub_fetch_16(&SMA_TSMA_NUM(pSma), n); }
|
static FORCE_INLINE int16_t tdTSmaSub(SSma *pSma, int16_t n) { return atomic_sub_fetch_16(&SMA_TSMA_NUM(pSma), n); }
|
||||||
|
|
||||||
|
@ -219,12 +169,8 @@ static int32_t tdInitSmaEnv(SSma *pSma, int8_t smaType, const char *path, SDisk
|
||||||
void *tdFreeRSmaInfo(SRSmaInfo *pInfo);
|
void *tdFreeRSmaInfo(SRSmaInfo *pInfo);
|
||||||
|
|
||||||
int32_t tdProcessTSmaCreateImpl(SSma *pSma, int64_t version, const char *pMsg);
|
int32_t tdProcessTSmaCreateImpl(SSma *pSma, int64_t version, const char *pMsg);
|
||||||
int32_t tdUpdateExpireWindowImpl(SSma *pSma, const SSubmitReq *pMsg, int64_t version);
|
int32_t tdProcessTSmaInsertImpl(SSma *pSma, int64_t indexUid, const char *msg);
|
||||||
int32_t tdClearExpireWindowImpl(SSma *pSma, const SVClrTsmaExpWndsReq *pMsg);
|
int32_t tdProcessTSmaGetDaysImpl(SVnodeCfg *pCfg, void *pCont, uint32_t contLen, int32_t *days);
|
||||||
// TODO: This is the basic params, and should wrap the params to a queryHandle.
|
|
||||||
int32_t tdGetTSmaDataImpl(SSma *pSma, char *pData, int64_t indexUid, TSKEY querySKey, int32_t nMaxResult);
|
|
||||||
|
|
||||||
int32_t tdGetTSmaDaysImpl(SVnodeCfg *pCfg, void *pCont, uint32_t contLen, int32_t *days);
|
|
||||||
|
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
}
|
}
|
||||||
|
|
|
@ -150,8 +150,6 @@ int32_t tqProcessTaskRecoverRsp(STQ* pTq, SRpcMsg* pMsg);
|
||||||
int32_t smaOpen(SVnode* pVnode);
|
int32_t smaOpen(SVnode* pVnode);
|
||||||
int32_t smaClose(SSma* pSma);
|
int32_t smaClose(SSma* pSma);
|
||||||
|
|
||||||
int32_t tdUpdateExpireWindow(SSma* pSma, const SSubmitReq* pMsg, int64_t version);
|
|
||||||
int32_t tdClearExpireWindow(SSma* pSma, const SVClrTsmaExpWndsReq* pMsg);
|
|
||||||
int32_t tdProcessTSmaCreate(SSma* pSma, int64_t version, const char* msg);
|
int32_t tdProcessTSmaCreate(SSma* pSma, int64_t version, const char* msg);
|
||||||
int32_t tdProcessTSmaInsert(SSma* pSma, int64_t indexUid, const char* msg);
|
int32_t tdProcessTSmaInsert(SSma* pSma, int64_t indexUid, const char* msg);
|
||||||
|
|
||||||
|
|
|
@ -36,32 +36,9 @@ int32_t tdProcessTSmaCreate(SSma* pSma, int64_t version, const char* msg) {
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t tdUpdateExpireWindow(SSma* pSma, const SSubmitReq* pMsg, int64_t version) {
|
|
||||||
int32_t code = TSDB_CODE_SUCCESS;
|
|
||||||
if ((code = tdUpdateExpireWindowImpl(pSma, pMsg, version)) < 0) {
|
|
||||||
smaWarn("vgId:%d, update expire window failed since %s", SMA_VID(pSma), tstrerror(terrno));
|
|
||||||
}
|
|
||||||
return code;
|
|
||||||
}
|
|
||||||
int32_t tdClearExpireWindow(SSma* pSma, const SVClrTsmaExpWndsReq* pMsg) {
|
|
||||||
int32_t code = TSDB_CODE_SUCCESS;
|
|
||||||
if ((code = tdClearExpireWindowImpl(pSma, pMsg)) < 0) {
|
|
||||||
smaWarn("vgId:%d, update expire window failed since %s", SMA_VID(pSma), tstrerror(terrno));
|
|
||||||
}
|
|
||||||
return code;
|
|
||||||
}
|
|
||||||
|
|
||||||
int32_t tdGetTSmaData(SSma* pSma, char* pData, int64_t indexUid, TSKEY querySKey, int32_t nMaxResult) {
|
|
||||||
int32_t code = TSDB_CODE_SUCCESS;
|
|
||||||
if ((code = tdGetTSmaDataImpl(pSma, pData, indexUid, querySKey, nMaxResult)) < 0) {
|
|
||||||
smaWarn("vgId:%d, get tsma data failed since %s", SMA_VID(pSma), tstrerror(terrno));
|
|
||||||
}
|
|
||||||
return code;
|
|
||||||
}
|
|
||||||
|
|
||||||
int32_t smaGetTSmaDays(SVnodeCfg* pCfg, void* pCont, uint32_t contLen, int32_t* days) {
|
int32_t smaGetTSmaDays(SVnodeCfg* pCfg, void* pCont, uint32_t contLen, int32_t* days) {
|
||||||
int32_t code = TSDB_CODE_SUCCESS;
|
int32_t code = TSDB_CODE_SUCCESS;
|
||||||
if ((code = tdGetTSmaDaysImpl(pCfg, pCont, contLen, days)) < 0) {
|
if ((code = tdProcessTSmaGetDaysImpl(pCfg, pCont, contLen, days)) < 0) {
|
||||||
smaWarn("vgId:%d, get tsma days failed since %s", pCfg->vgId, tstrerror(terrno));
|
smaWarn("vgId:%d, get tsma days failed since %s", pCfg->vgId, tstrerror(terrno));
|
||||||
}
|
}
|
||||||
smaDebug("vgId:%d, get tsma days %d", pCfg->vgId, *days);
|
smaDebug("vgId:%d, get tsma days %d", pCfg->vgId, *days);
|
||||||
|
|
|
@ -151,31 +151,11 @@ static SSmaEnv *tdNewSmaEnv(const SSma *pSma, int8_t smaType, const char *path,
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
ASSERT(path && (strlen(path) > 0));
|
|
||||||
SMA_ENV_PATH(pEnv) = strdup(path);
|
|
||||||
if (!SMA_ENV_PATH(pEnv)) {
|
|
||||||
tdFreeSmaEnv(pEnv);
|
|
||||||
return NULL;
|
|
||||||
}
|
|
||||||
|
|
||||||
SMA_ENV_DID(pEnv) = did;
|
|
||||||
|
|
||||||
if (tdInitSmaStat(&SMA_ENV_STAT(pEnv), smaType) != TSDB_CODE_SUCCESS) {
|
if (tdInitSmaStat(&SMA_ENV_STAT(pEnv), smaType) != TSDB_CODE_SUCCESS) {
|
||||||
tdFreeSmaEnv(pEnv);
|
tdFreeSmaEnv(pEnv);
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
char aname[TSDB_FILENAME_LEN] = {0};
|
|
||||||
tfsAbsoluteName(SMA_TFS(pSma), did, path, aname);
|
|
||||||
if (smaOpenDBEnv(&pEnv->dbEnv, aname) != TSDB_CODE_SUCCESS) {
|
|
||||||
tdFreeSmaEnv(pEnv);
|
|
||||||
return NULL;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (!(pEnv->pPool = openPool())) {
|
|
||||||
tdFreeSmaEnv(pEnv);
|
|
||||||
return NULL;
|
|
||||||
}
|
|
||||||
|
|
||||||
return pEnv;
|
return pEnv;
|
||||||
}
|
}
|
||||||
|
@ -205,10 +185,7 @@ void tdDestroySmaEnv(SSmaEnv *pSmaEnv) {
|
||||||
if (pSmaEnv) {
|
if (pSmaEnv) {
|
||||||
tdDestroySmaState(pSmaEnv->pStat, SMA_ENV_TYPE(pSmaEnv));
|
tdDestroySmaState(pSmaEnv->pStat, SMA_ENV_TYPE(pSmaEnv));
|
||||||
taosMemoryFreeClear(pSmaEnv->pStat);
|
taosMemoryFreeClear(pSmaEnv->pStat);
|
||||||
taosMemoryFreeClear(pSmaEnv->path);
|
|
||||||
taosThreadRwlockDestroy(&(pSmaEnv->lock));
|
taosThreadRwlockDestroy(&(pSmaEnv->lock));
|
||||||
smaCloseDBEnv(pSmaEnv->dbEnv);
|
|
||||||
closePool(pSmaEnv->pPool);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -280,7 +257,6 @@ void *tdFreeSmaStatItem(SSmaStatItem *pSmaStatItem) {
|
||||||
if (pSmaStatItem) {
|
if (pSmaStatItem) {
|
||||||
tDestroyTSma(pSmaStatItem->pTSma);
|
tDestroyTSma(pSmaStatItem->pTSma);
|
||||||
taosMemoryFreeClear(pSmaStatItem->pTSma);
|
taosMemoryFreeClear(pSmaStatItem->pTSma);
|
||||||
taosHashCleanup(pSmaStatItem->expireWindows);
|
|
||||||
taosMemoryFreeClear(pSmaStatItem);
|
taosMemoryFreeClear(pSmaStatItem);
|
||||||
}
|
}
|
||||||
return NULL;
|
return NULL;
|
||||||
|
@ -399,63 +375,3 @@ int32_t tdCheckAndInitSmaEnv(SSma *pSma, int8_t smaType, bool onlyCheck) {
|
||||||
|
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
};
|
};
|
||||||
|
|
||||||
int32_t tdSmaBeginCommit(SSmaEnv *pEnv) {
|
|
||||||
TXN *pTxn = &pEnv->txn;
|
|
||||||
// start a new txn
|
|
||||||
tdbTxnOpen(pTxn, 0, poolMalloc, poolFree, pEnv->pPool, TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED);
|
|
||||||
if (tdbBegin(pEnv->dbEnv, pTxn) != 0) {
|
|
||||||
smaWarn("tdSma tdb begin commit fail");
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
int32_t tdSmaEndCommit(SSmaEnv *pEnv) {
|
|
||||||
TXN *pTxn = &pEnv->txn;
|
|
||||||
|
|
||||||
// Commit current txn
|
|
||||||
if (tdbCommit(pEnv->dbEnv, pTxn) != 0) {
|
|
||||||
smaWarn("tdSma tdb end commit fail");
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
tdbTxnClose(pTxn);
|
|
||||||
clearPool(pEnv->pPool);
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
#if 0
|
|
||||||
/**
|
|
||||||
* @brief Get the start TS key of the last data block of one interval/sliding.
|
|
||||||
*
|
|
||||||
* @param pSma
|
|
||||||
* @param param
|
|
||||||
* @param result
|
|
||||||
* @return int32_t
|
|
||||||
* 1) Return 0 and fill the result if the check procedure is normal;
|
|
||||||
* 2) Return -1 if error occurs during the check procedure.
|
|
||||||
*/
|
|
||||||
int32_t tdGetTSmaStatus(SSma *pSma, void *smaIndex, void *result) {
|
|
||||||
const char *procedure = "";
|
|
||||||
if (strncmp(procedure, "get the start TS key of the last data block", 100) != 0) {
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
// fill the result
|
|
||||||
return TSDB_CODE_SUCCESS;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* @brief Remove the tSma data files related to param between pWin.
|
|
||||||
*
|
|
||||||
* @param pSma
|
|
||||||
* @param param
|
|
||||||
* @param pWin
|
|
||||||
* @return int32_t
|
|
||||||
*/
|
|
||||||
int32_t tdRemoveTSmaData(SSma *pSma, void *smaIndex, STimeWindow *pWin) {
|
|
||||||
// for ("tSmaFiles of param-interval-sliding between pWin") {
|
|
||||||
// // remove the tSmaFile
|
|
||||||
// }
|
|
||||||
return TSDB_CODE_SUCCESS;
|
|
||||||
}
|
|
||||||
#endif
|
|
||||||
|
|
|
@ -1,130 +0,0 @@
|
||||||
/*
|
|
||||||
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
|
|
||||||
*
|
|
||||||
* This program is free software: you can use, redistribute, and/or modify
|
|
||||||
* it under the terms of the GNU Affero General Public License, version 3
|
|
||||||
* or later ("AGPL"), as published by the Free Software Foundation.
|
|
||||||
*
|
|
||||||
* This program is distributed in the hope that it will be useful, but WITHOUT
|
|
||||||
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
|
||||||
* FITNESS FOR A PARTICULAR PURPOSE.
|
|
||||||
*
|
|
||||||
* You should have received a copy of the GNU Affero General Public License
|
|
||||||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
|
||||||
*/
|
|
||||||
|
|
||||||
#define ALLOW_FORBID_FUNC
|
|
||||||
|
|
||||||
#include "sma.h"
|
|
||||||
|
|
||||||
int32_t smaOpenDBEnv(TDB **ppEnv, const char *path) {
|
|
||||||
int ret = 0;
|
|
||||||
|
|
||||||
if (path == NULL) return -1;
|
|
||||||
|
|
||||||
ret = tdbOpen(path, 4096, 256, ppEnv); // use as param
|
|
||||||
|
|
||||||
if (ret != 0) {
|
|
||||||
smaError("failed to create tsdb db env, ret = %d", ret);
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
int32_t smaCloseDBEnv(TDB *pEnv) { return tdbClose(pEnv); }
|
|
||||||
|
|
||||||
static inline int tdSmaKeyCmpr(const void *arg1, int len1, const void *arg2, int len2) {
|
|
||||||
const SSmaKey *pKey1 = (const SSmaKey *)arg1;
|
|
||||||
const SSmaKey *pKey2 = (const SSmaKey *)arg2;
|
|
||||||
|
|
||||||
ASSERT(len1 == len2 && len1 == sizeof(SSmaKey));
|
|
||||||
|
|
||||||
if (pKey1->skey < pKey2->skey) {
|
|
||||||
return -1;
|
|
||||||
} else if (pKey1->skey > pKey2->skey) {
|
|
||||||
return 1;
|
|
||||||
}
|
|
||||||
if (pKey1->groupId < pKey2->groupId) {
|
|
||||||
return -1;
|
|
||||||
} else if (pKey1->groupId > pKey2->groupId) {
|
|
||||||
return 1;
|
|
||||||
}
|
|
||||||
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
static int32_t smaOpenDBDb(TTB **ppDB, TDB *pEnv, const char *pFName) {
|
|
||||||
tdb_cmpr_fn_t compFunc;
|
|
||||||
|
|
||||||
// Create a database
|
|
||||||
compFunc = tdSmaKeyCmpr;
|
|
||||||
if (tdbTbOpen(pFName, -1, -1, compFunc, pEnv, ppDB) < 0) {
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
static int32_t smaCloseDBDb(TTB *pDB) { return tdbTbClose(pDB); }
|
|
||||||
|
|
||||||
int32_t smaOpenDBF(TDB *pEnv, SDBFile *pDBF) {
|
|
||||||
// TEnv is shared by a group of SDBFile
|
|
||||||
if (!pEnv || !pDBF) {
|
|
||||||
terrno = TSDB_CODE_INVALID_PTR;
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
|
|
||||||
// Open DBF
|
|
||||||
if (smaOpenDBDb(&(pDBF->pDB), pEnv, pDBF->path) < 0) {
|
|
||||||
smaError("failed to open DBF: %s", pDBF->path);
|
|
||||||
smaCloseDBDb(pDBF->pDB);
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
int32_t smaCloseDBF(SDBFile *pDBF) {
|
|
||||||
int32_t ret = 0;
|
|
||||||
if (pDBF->pDB) {
|
|
||||||
ret = smaCloseDBDb(pDBF->pDB);
|
|
||||||
pDBF->pDB = NULL;
|
|
||||||
}
|
|
||||||
taosMemoryFreeClear(pDBF->path);
|
|
||||||
return ret;
|
|
||||||
}
|
|
||||||
|
|
||||||
int32_t smaSaveSmaToDB(SDBFile *pDBF, void *pKey, int32_t keyLen, void *pVal, int32_t valLen, TXN *txn) {
|
|
||||||
int32_t ret;
|
|
||||||
|
|
||||||
printf("save tsma data into %s, keyLen:%d valLen:%d txn:%p\n", pDBF->path, keyLen, valLen, txn);
|
|
||||||
ret = tdbTbUpsert(pDBF->pDB, pKey, keyLen, pVal, valLen, txn);
|
|
||||||
if (ret < 0) {
|
|
||||||
smaError("failed to upsert tsma data into db, ret = %d", ret);
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
void *smaGetSmaDataByKey(SDBFile *pDBF, const void *pKey, int32_t keyLen, int32_t *valLen) {
|
|
||||||
void *pVal = NULL;
|
|
||||||
int ret;
|
|
||||||
|
|
||||||
ret = tdbTbGet(pDBF->pDB, pKey, keyLen, &pVal, valLen);
|
|
||||||
|
|
||||||
if (ret < 0) {
|
|
||||||
smaError("failed to get tsma data from db, ret = %d", ret);
|
|
||||||
return NULL;
|
|
||||||
}
|
|
||||||
|
|
||||||
ASSERT(*valLen >= 0);
|
|
||||||
|
|
||||||
// TODO: lock?
|
|
||||||
// TODO: Would the key/value be destoryed during return the data?
|
|
||||||
// TODO: How about the key is updated while value length is changed? The original value buffer would be freed
|
|
||||||
// automatically?
|
|
||||||
|
|
||||||
return pVal;
|
|
||||||
}
|
|
File diff suppressed because it is too large
Load Diff
|
@ -30,54 +30,8 @@ typedef STsdbCfg STSmaKeepCfg;
|
||||||
|
|
||||||
#define SMA_STATE_ITEM_HASH_SLOT 32
|
#define SMA_STATE_ITEM_HASH_SLOT 32
|
||||||
|
|
||||||
typedef struct {
|
|
||||||
SSma *pSma;
|
|
||||||
SDBFile dFile;
|
|
||||||
const SArray *pDataBlocks; // sma data
|
|
||||||
int64_t interval; // interval with the precision of DB
|
|
||||||
} STSmaWriteH;
|
|
||||||
|
|
||||||
typedef struct {
|
|
||||||
int32_t iter;
|
|
||||||
int32_t fid;
|
|
||||||
} SmaFsIter;
|
|
||||||
|
|
||||||
typedef struct {
|
|
||||||
STsdb *pTsdb;
|
|
||||||
SSma *pSma;
|
|
||||||
SDBFile dFile;
|
|
||||||
int64_t interval; // interval with the precision of DB
|
|
||||||
int32_t blockSize; // size of SMA block item
|
|
||||||
int32_t days;
|
|
||||||
int8_t storageLevel;
|
|
||||||
SmaFsIter smaFsIter;
|
|
||||||
} STSmaReadH;
|
|
||||||
|
|
||||||
typedef enum {
|
|
||||||
SMA_STORAGE_LEVEL_TSDB = 0, // use days of self-defined e.g. vnode${N}/tsdb/tsma/sma_index_uid/v2f200.tsma
|
|
||||||
SMA_STORAGE_LEVEL_DFILESET = 1 // use days of TS data e.g. vnode${N}/tsdb/tsma/sma_index_uid/v2f1906.tsma
|
|
||||||
} ESmaStorageLevel;
|
|
||||||
|
|
||||||
// static func
|
// static func
|
||||||
|
|
||||||
static int64_t tdGetIntervalByPrecision(int64_t interval, uint8_t intervalUnit, int8_t precision, bool adjusted);
|
|
||||||
static int32_t tdGetSmaStorageLevel(STSmaKeepCfg *pCfg, int64_t interval);
|
|
||||||
static int32_t tdInitTSmaWriteH(STSmaWriteH *pSmaH, SSma *pSma, const SArray *pDataBlocks, int64_t interval,
|
|
||||||
int8_t intervalUnit);
|
|
||||||
static int32_t tdInitTSmaReadH(STSmaReadH *pSmaH, SSma *pSma, int64_t interval, int8_t intervalUnit);
|
|
||||||
static void tdDestroyTSmaWriteH(STSmaWriteH *pSmaH);
|
|
||||||
static int32_t tdGetTSmaDays(SSma *pSma, int64_t interval, int32_t storageLevel);
|
|
||||||
static int32_t tdSetTSmaDataFile(STSmaWriteH *pSmaH, int64_t indexUid, int32_t fid);
|
|
||||||
static int32_t tdInitTSmaFile(STSmaReadH *pSmaH, int64_t indexUid, TSKEY skey);
|
|
||||||
static bool tdSetAndOpenTSmaFile(STSmaReadH *pReadH, TSKEY *queryKey);
|
|
||||||
static int32_t tdInsertTSmaBlocks(STSmaWriteH *pSmaH, void *smaKey, int32_t keyLen, void *pData, int32_t dataLen,
|
|
||||||
TXN *txn);
|
|
||||||
// expire window
|
|
||||||
|
|
||||||
static int32_t tdSetExpireWindow(SSma *pSma, SHashObj *pItemsHash, int64_t indexUid, int64_t winSKey, int64_t version);
|
|
||||||
static int32_t tdResetExpireWindow(SSma *pSma, SSmaStat *pStat, int64_t indexUid, TSKEY skey);
|
|
||||||
static int32_t tdDropTSmaDataImpl(SSma *pSma, int64_t indexUid);
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @brief Judge the tsma file split days
|
* @brief Judge the tsma file split days
|
||||||
*
|
*
|
||||||
|
@ -87,7 +41,7 @@ static int32_t tdDropTSmaDataImpl(SSma *pSma, int64_t indexUid);
|
||||||
* @param days unit is minute
|
* @param days unit is minute
|
||||||
* @return int32_t
|
* @return int32_t
|
||||||
*/
|
*/
|
||||||
int32_t tdGetTSmaDaysImpl(SVnodeCfg *pCfg, void *pCont, uint32_t contLen, int32_t *days) {
|
int32_t tdProcessTSmaGetDaysImpl(SVnodeCfg *pCfg, void *pCont, uint32_t contLen, int32_t *days) {
|
||||||
SDecoder coder = {0};
|
SDecoder coder = {0};
|
||||||
tDecoderInit(&coder, pCont, contLen);
|
tDecoderInit(&coder, pCont, contLen);
|
||||||
|
|
||||||
|
@ -130,225 +84,6 @@ _err:
|
||||||
|
|
||||||
// implementation
|
// implementation
|
||||||
|
|
||||||
/**
|
|
||||||
* @brief
|
|
||||||
*
|
|
||||||
* @param pSmaH
|
|
||||||
* @param pSma
|
|
||||||
* @param interval
|
|
||||||
* @param intervalUnit
|
|
||||||
* @return int32_t
|
|
||||||
*/
|
|
||||||
static int32_t tdInitTSmaReadH(STSmaReadH *pSmaH, SSma *pSma, int64_t interval, int8_t intervalUnit) {
|
|
||||||
STSmaKeepCfg *pCfg = SMA_TSDB_CFG(pSma);
|
|
||||||
pSmaH->pSma = pSma;
|
|
||||||
pSmaH->interval = tdGetIntervalByPrecision(interval, intervalUnit, SMA_TSDB_CFG(pSma)->precision, true);
|
|
||||||
pSmaH->storageLevel = tdGetSmaStorageLevel(pCfg, interval);
|
|
||||||
pSmaH->days = tdGetTSmaDays(pSma, pSmaH->interval, pSmaH->storageLevel);
|
|
||||||
return TSDB_CODE_SUCCESS;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* @brief Init of tsma FS
|
|
||||||
*
|
|
||||||
* @param pReadH
|
|
||||||
* @param indexUid
|
|
||||||
* @param skey
|
|
||||||
* @return int32_t
|
|
||||||
*/
|
|
||||||
static int32_t tdInitTSmaFile(STSmaReadH *pSmaH, int64_t indexUid, TSKEY skey) {
|
|
||||||
SSma *pSma = pSmaH->pSma;
|
|
||||||
|
|
||||||
int32_t fid = (int32_t)(TSDB_KEY_FID(skey, pSmaH->days, SMA_TSDB_CFG(pSma)->precision));
|
|
||||||
char tSmaFile[TSDB_FILENAME_LEN] = {0};
|
|
||||||
snprintf(tSmaFile, TSDB_FILENAME_LEN, "%" PRIi64 "%sv%df%d.tsma", indexUid, TD_DIRSEP, SMA_VID(pSma), fid);
|
|
||||||
pSmaH->dFile.path = strdup(tSmaFile);
|
|
||||||
pSmaH->smaFsIter.iter = 0;
|
|
||||||
pSmaH->smaFsIter.fid = fid;
|
|
||||||
return TSDB_CODE_SUCCESS;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* @brief Set and open tsma file if it has key locates in queryWin.
|
|
||||||
*
|
|
||||||
* @param pReadH
|
|
||||||
* @param param
|
|
||||||
* @param queryWin
|
|
||||||
* @return true
|
|
||||||
* @return false
|
|
||||||
*/
|
|
||||||
static bool tdSetAndOpenTSmaFile(STSmaReadH *pReadH, TSKEY *queryKey) {
|
|
||||||
// SArray *smaFs = pReadH->pTsdb->fs->cstatus->sf;
|
|
||||||
// int32_t nSmaFs = taosArrayGetSize(smaFs);
|
|
||||||
|
|
||||||
smaCloseDBF(&pReadH->dFile);
|
|
||||||
|
|
||||||
#if 0
|
|
||||||
while (pReadH->smaFsIter.iter < nSmaFs) {
|
|
||||||
void *pSmaFile = taosArrayGet(smaFs, pReadH->smaFsIter.iter);
|
|
||||||
if (pSmaFile) { // match(indexName, queryWindow)
|
|
||||||
// TODO: select the file by index_name ...
|
|
||||||
pReadH->dFile = pSmaFile;
|
|
||||||
++pReadH->smaFsIter.iter;
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
++pReadH->smaFsIter.iter;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (pReadH->pDFile) {
|
|
||||||
tdDebug("vg%d: smaFile %s matched", REPO_ID(pReadH->pTsdb), "[pSmaFile dir]");
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
#endif
|
|
||||||
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* @brief Approximate value for week/month/year.
|
|
||||||
*
|
|
||||||
* @param interval
|
|
||||||
* @param intervalUnit
|
|
||||||
* @param precision
|
|
||||||
* @param adjusted Interval already adjusted according to DB precision
|
|
||||||
* @return int64_t
|
|
||||||
*/
|
|
||||||
static int64_t tdGetIntervalByPrecision(int64_t interval, uint8_t intervalUnit, int8_t precision, bool adjusted) {
|
|
||||||
if (adjusted) {
|
|
||||||
return interval;
|
|
||||||
}
|
|
||||||
|
|
||||||
switch (intervalUnit) {
|
|
||||||
case TIME_UNIT_YEAR: // approximate value
|
|
||||||
interval *= 365 * 86400 * 1e3;
|
|
||||||
break;
|
|
||||||
case TIME_UNIT_MONTH: // approximate value
|
|
||||||
interval *= 30 * 86400 * 1e3;
|
|
||||||
break;
|
|
||||||
case TIME_UNIT_WEEK: // approximate value
|
|
||||||
interval *= 7 * 86400 * 1e3;
|
|
||||||
break;
|
|
||||||
case TIME_UNIT_DAY: // the interval for tSma calculation must <= day
|
|
||||||
interval *= 86400 * 1e3;
|
|
||||||
break;
|
|
||||||
case TIME_UNIT_HOUR:
|
|
||||||
interval *= 3600 * 1e3;
|
|
||||||
break;
|
|
||||||
case TIME_UNIT_MINUTE:
|
|
||||||
interval *= 60 * 1e3;
|
|
||||||
break;
|
|
||||||
case TIME_UNIT_SECOND:
|
|
||||||
interval *= 1e3;
|
|
||||||
break;
|
|
||||||
default:
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
|
|
||||||
switch (precision) {
|
|
||||||
case TSDB_TIME_PRECISION_MILLI:
|
|
||||||
if (TIME_UNIT_MICROSECOND == intervalUnit) { // us
|
|
||||||
return interval / 1e3;
|
|
||||||
} else if (TIME_UNIT_NANOSECOND == intervalUnit) { // nano second
|
|
||||||
return interval / 1e6;
|
|
||||||
} else { // ms
|
|
||||||
return interval;
|
|
||||||
}
|
|
||||||
break;
|
|
||||||
case TSDB_TIME_PRECISION_MICRO:
|
|
||||||
if (TIME_UNIT_MICROSECOND == intervalUnit) { // us
|
|
||||||
return interval;
|
|
||||||
} else if (TIME_UNIT_NANOSECOND == intervalUnit) { // ns
|
|
||||||
return interval / 1e3;
|
|
||||||
} else { // ms
|
|
||||||
return interval * 1e3;
|
|
||||||
}
|
|
||||||
break;
|
|
||||||
case TSDB_TIME_PRECISION_NANO:
|
|
||||||
if (TIME_UNIT_MICROSECOND == intervalUnit) { // us
|
|
||||||
return interval * 1e3;
|
|
||||||
} else if (TIME_UNIT_NANOSECOND == intervalUnit) { // ns
|
|
||||||
return interval;
|
|
||||||
} else { // ms
|
|
||||||
return interval * 1e6;
|
|
||||||
}
|
|
||||||
break;
|
|
||||||
default: // ms
|
|
||||||
if (TIME_UNIT_MICROSECOND == intervalUnit) { // us
|
|
||||||
return interval / 1e3;
|
|
||||||
} else if (TIME_UNIT_NANOSECOND == intervalUnit) { // ns
|
|
||||||
return interval / 1e6;
|
|
||||||
} else { // ms
|
|
||||||
return interval;
|
|
||||||
}
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
return interval;
|
|
||||||
}
|
|
||||||
|
|
||||||
static int32_t tdInitTSmaWriteH(STSmaWriteH *pSmaH, SSma *pSma, const SArray *pDataBlocks, int64_t interval,
|
|
||||||
int8_t intervalUnit) {
|
|
||||||
pSmaH->pSma = pSma;
|
|
||||||
pSmaH->interval = tdGetIntervalByPrecision(interval, intervalUnit, SMA_TSDB_CFG(pSma)->precision, true);
|
|
||||||
pSmaH->pDataBlocks = pDataBlocks;
|
|
||||||
pSmaH->dFile.fid = SMA_IVLD_FID;
|
|
||||||
return TSDB_CODE_SUCCESS;
|
|
||||||
}
|
|
||||||
|
|
||||||
static void tdDestroyTSmaWriteH(STSmaWriteH *pSmaH) {
|
|
||||||
if (pSmaH) {
|
|
||||||
smaCloseDBF(&pSmaH->dFile);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
static int32_t tdSetTSmaDataFile(STSmaWriteH *pSmaH, int64_t indexUid, int32_t fid) {
|
|
||||||
SSma *pSma = pSmaH->pSma;
|
|
||||||
ASSERT(!pSmaH->dFile.path && !pSmaH->dFile.pDB);
|
|
||||||
|
|
||||||
pSmaH->dFile.fid = fid;
|
|
||||||
char tSmaFile[TSDB_FILENAME_LEN] = {0};
|
|
||||||
snprintf(tSmaFile, TSDB_FILENAME_LEN, "%" PRIi64 "%sv%df%d.tsma", indexUid, TD_DIRSEP, SMA_VID(pSma), fid);
|
|
||||||
pSmaH->dFile.path = strdup(tSmaFile);
|
|
||||||
|
|
||||||
return TSDB_CODE_SUCCESS;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* @brief
|
|
||||||
*
|
|
||||||
* @param pSma
|
|
||||||
* @param interval Interval calculated by DB's precision
|
|
||||||
* @param storageLevel
|
|
||||||
* @return int32_t
|
|
||||||
*/
|
|
||||||
static int32_t tdGetTSmaDays(SSma *pSma, int64_t interval, int32_t storageLevel) {
|
|
||||||
STsdbCfg *pCfg = SMA_TSDB_CFG(pSma);
|
|
||||||
int32_t daysPerFile = pCfg->days; // unit is minute
|
|
||||||
|
|
||||||
if (storageLevel == SMA_STORAGE_LEVEL_TSDB) {
|
|
||||||
int32_t minutes = SMA_STORAGE_TSDB_TIMES * (interval / tsTickPerMin[pCfg->precision]);
|
|
||||||
if (minutes > SMA_STORAGE_TSDB_MINUTES) {
|
|
||||||
daysPerFile = SMA_STORAGE_TSDB_MINUTES;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return daysPerFile;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* @brief Judge the tsma storage level
|
|
||||||
*
|
|
||||||
* @param pCfg
|
|
||||||
* @param interval
|
|
||||||
* @return int32_t
|
|
||||||
*/
|
|
||||||
static int32_t tdGetSmaStorageLevel(STSmaKeepCfg *pCfg, int64_t interval) {
|
|
||||||
int64_t mInterval = convertTimeFromPrecisionToUnit(interval, pCfg->precision, TIME_UNIT_MINUTE);
|
|
||||||
if (pCfg->days / mInterval >= SMA_STORAGE_SPLIT_FACTOR) {
|
|
||||||
return SMA_STORAGE_LEVEL_DFILESET;
|
|
||||||
}
|
|
||||||
return SMA_STORAGE_LEVEL_TSDB;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @brief Insert/Update Time-range-wise SMA data.
|
* @brief Insert/Update Time-range-wise SMA data.
|
||||||
* - If interval < SMA_STORAGE_SPLIT_HOURS(e.g. 24), save the SMA data as a part of DFileSet to e.g.
|
* - If interval < SMA_STORAGE_SPLIT_HOURS(e.g. 24), save the SMA data as a part of DFileSet to e.g.
|
||||||
|
@ -363,7 +98,7 @@ static int32_t tdGetSmaStorageLevel(STSmaKeepCfg *pCfg, int64_t interval) {
|
||||||
*/
|
*/
|
||||||
int32_t tdProcessTSmaInsertImpl(SSma *pSma, int64_t indexUid, const char *msg) {
|
int32_t tdProcessTSmaInsertImpl(SSma *pSma, int64_t indexUid, const char *msg) {
|
||||||
STsdbCfg *pCfg = SMA_TSDB_CFG(pSma);
|
STsdbCfg *pCfg = SMA_TSDB_CFG(pSma);
|
||||||
#if 0
|
|
||||||
const SArray *pDataBlocks = (const SArray *)msg;
|
const SArray *pDataBlocks = (const SArray *)msg;
|
||||||
|
|
||||||
// TODO: destroy SSDataBlocks(msg)
|
// TODO: destroy SSDataBlocks(msg)
|
||||||
|
@ -386,7 +121,6 @@ int32_t tdProcessTSmaInsertImpl(SSma *pSma, int64_t indexUid, const char *msg) {
|
||||||
smaWarn("vgId:%d, insert tsma data failed since pDataBlocks is empty", SMA_VID(pSma));
|
smaWarn("vgId:%d, insert tsma data failed since pDataBlocks is empty", SMA_VID(pSma));
|
||||||
return TSDB_CODE_FAILED;
|
return TSDB_CODE_FAILED;
|
||||||
}
|
}
|
||||||
#endif
|
|
||||||
|
|
||||||
SSmaEnv *pEnv = SMA_TSMA_ENV(pSma);
|
SSmaEnv *pEnv = SMA_TSMA_ENV(pSma);
|
||||||
SSmaStat *pStat = SMA_ENV_STAT(pEnv);
|
SSmaStat *pStat = SMA_ENV_STAT(pEnv);
|
||||||
|
@ -411,285 +145,6 @@ int32_t tdProcessTSmaInsertImpl(SSma *pSma, int64_t indexUid, const char *msg) {
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t tdDropTSmaData(SSma *pSma, int64_t indexUid) {
|
|
||||||
int32_t code = TSDB_CODE_SUCCESS;
|
|
||||||
if ((code = tdDropTSmaDataImpl(pSma, indexUid)) < 0) {
|
|
||||||
smaWarn("vgId:%d, drop tsma data failed since %s", SMA_VID(pSma), tstrerror(terrno));
|
|
||||||
}
|
|
||||||
return code;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* @brief Insert TSma data blocks to DB File build by B+Tree
|
|
||||||
*
|
|
||||||
* @param pSmaH
|
|
||||||
* @param smaKey tableUid-colId-skeyOfWindow(8-2-8)
|
|
||||||
* @param keyLen
|
|
||||||
* @param pData
|
|
||||||
* @param dataLen
|
|
||||||
* @return int32_t
|
|
||||||
*/
|
|
||||||
static int32_t tdInsertTSmaBlocks(STSmaWriteH *pSmaH, void *smaKey, int32_t keyLen, void *pData, int32_t dataLen,
|
|
||||||
TXN *txn) {
|
|
||||||
SDBFile *pDBFile = &pSmaH->dFile;
|
|
||||||
|
|
||||||
// TODO: insert tsma data blocks into B+Tree(TTB)
|
|
||||||
if (smaSaveSmaToDB(pDBFile, smaKey, keyLen, pData, dataLen, txn) != 0) {
|
|
||||||
smaWarn("vgId:%d, insert tsma data blocks into %s: smaKey %" PRIx64 "-%" PRIx64 ", dataLen %" PRIu32 " fail",
|
|
||||||
SMA_VID(pSmaH->pSma), pDBFile->path, *(int64_t *)smaKey, *(int64_t *)POINTER_SHIFT(smaKey, 8), dataLen);
|
|
||||||
return TSDB_CODE_FAILED;
|
|
||||||
}
|
|
||||||
smaDebug("vgId:%d, insert tsma data blocks into %s: smaKey %" PRIx64 "-%" PRIx64 ", dataLen %" PRIu32 " succeed",
|
|
||||||
SMA_VID(pSmaH->pSma), pDBFile->path, *(int64_t *)smaKey, *(int64_t *)POINTER_SHIFT(smaKey, 8), dataLen);
|
|
||||||
|
|
||||||
#ifdef _TEST_SMA_PRINT_DEBUG_LOG_
|
|
||||||
uint32_t valueSize = 0;
|
|
||||||
void *data = tdGetSmaDataByKey(pDBFile, smaKey, keyLen, &valueSize);
|
|
||||||
ASSERT(data != NULL);
|
|
||||||
for (uint32_t v = 0; v < valueSize; v += 8) {
|
|
||||||
smaWarn("vgId:%d, insert sma data val[%d] %" PRIi64, REPO_ID(pSmaH->pTsdb), v, *(int64_t *)POINTER_SHIFT(data, v));
|
|
||||||
}
|
|
||||||
#endif
|
|
||||||
return TSDB_CODE_SUCCESS;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* @brief When sma data received from stream computing, make the relative expire window valid.
|
|
||||||
*
|
|
||||||
* @param pSma
|
|
||||||
* @param pStat
|
|
||||||
* @param indexUid
|
|
||||||
* @param skey
|
|
||||||
* @return int32_t
|
|
||||||
*/
|
|
||||||
static int32_t tdResetExpireWindow(SSma *pSma, SSmaStat *pStat, int64_t indexUid, TSKEY skey) {
|
|
||||||
SSmaStatItem *pItem = NULL;
|
|
||||||
|
|
||||||
tdRefSmaStat(pSma, pStat);
|
|
||||||
|
|
||||||
if (pStat && SMA_STAT_ITEMS(pStat)) {
|
|
||||||
pItem = taosHashGet(SMA_STAT_ITEMS(pStat), &indexUid, sizeof(indexUid));
|
|
||||||
}
|
|
||||||
if ((pItem) && ((pItem = *(SSmaStatItem **)pItem))) {
|
|
||||||
// pItem resides in hash buffer all the time unless drop sma index
|
|
||||||
// TODO: multithread protect
|
|
||||||
if (taosHashRemove(pItem->expireWindows, &skey, sizeof(TSKEY)) != 0) {
|
|
||||||
// error handling
|
|
||||||
tdUnRefSmaStat(pSma, pStat);
|
|
||||||
smaWarn("vgId:%d, remove skey %" PRIi64 " from expire window for sma index %" PRIi64 " fail", SMA_VID(pSma), skey,
|
|
||||||
indexUid);
|
|
||||||
return TSDB_CODE_FAILED;
|
|
||||||
}
|
|
||||||
smaDebug("vgId:%d, remove skey %" PRIi64 " from expire window for sma index %" PRIi64 " succeed", SMA_VID(pSma),
|
|
||||||
skey, indexUid);
|
|
||||||
// TODO: use a standalone interface to received state upate notification from stream computing module.
|
|
||||||
/**
|
|
||||||
* @brief state
|
|
||||||
* - When SMA env init in TSDB, its status is TSDB_SMA_STAT_OK.
|
|
||||||
* - In startup phase of stream computing module, it should notify the SMA env in TSDB to expired if needed(e.g.
|
|
||||||
* when batch data caculation not finised)
|
|
||||||
* - When TSDB_SMA_STAT_OK, the stream computing module should also notify that to the SMA env in TSDB.
|
|
||||||
*/
|
|
||||||
pItem->state = TSDB_SMA_STAT_OK;
|
|
||||||
} else {
|
|
||||||
// error handling
|
|
||||||
tdUnRefSmaStat(pSma, pStat);
|
|
||||||
smaWarn("vgId:%d, expire window %" PRIi64 " not exists for sma index %" PRIi64, SMA_VID(pSma), skey, indexUid);
|
|
||||||
return TSDB_CODE_FAILED;
|
|
||||||
}
|
|
||||||
|
|
||||||
tdUnRefSmaStat(pSma, pStat);
|
|
||||||
return TSDB_CODE_SUCCESS;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* @brief Drop tsma data and local cache
|
|
||||||
* - insert/query reference
|
|
||||||
* @param pSma
|
|
||||||
* @param msg
|
|
||||||
* @return int32_t
|
|
||||||
*/
|
|
||||||
static int32_t tdDropTSmaDataImpl(SSma *pSma, int64_t indexUid) {
|
|
||||||
SSmaEnv *pEnv = atomic_load_ptr(&SMA_TSMA_ENV(pSma));
|
|
||||||
|
|
||||||
// clear local cache
|
|
||||||
if (pEnv) {
|
|
||||||
smaDebug("vgId:%d, drop tsma local cache for %" PRIi64, SMA_VID(pSma), indexUid);
|
|
||||||
|
|
||||||
SSmaStatItem *pItem = taosHashGet(SMA_ENV_STAT_ITEMS(pEnv), &indexUid, sizeof(indexUid));
|
|
||||||
if ((pItem) || ((pItem = *(SSmaStatItem **)pItem))) {
|
|
||||||
if (tdSmaStatIsDropped(pItem)) {
|
|
||||||
smaDebug("vgId:%d, tsma stat is already dropped for %" PRIi64, SMA_VID(pSma), indexUid);
|
|
||||||
return TSDB_CODE_TDB_INVALID_ACTION; // TODO: duplicate drop msg would be intercepted by mnode
|
|
||||||
}
|
|
||||||
|
|
||||||
tdWLockSmaEnv(pEnv);
|
|
||||||
if (tdSmaStatIsDropped(pItem)) {
|
|
||||||
tdUnLockSmaEnv(pEnv);
|
|
||||||
smaDebug("vgId:%d, tsma stat is already dropped for %" PRIi64, SMA_VID(pSma), indexUid);
|
|
||||||
return TSDB_CODE_TDB_INVALID_ACTION; // TODO: duplicate drop msg would be intercepted by mnode
|
|
||||||
}
|
|
||||||
tdSmaStatSetDropped(pItem);
|
|
||||||
tdUnLockSmaEnv(pEnv);
|
|
||||||
|
|
||||||
int32_t nSleep = 0;
|
|
||||||
int32_t refVal = INT32_MAX;
|
|
||||||
while (true) {
|
|
||||||
if ((refVal = T_REF_VAL_GET(SMA_ENV_STAT(pEnv))) <= 0) {
|
|
||||||
smaDebug("vgId:%d, drop index %" PRIi64 " since refVal=%d", SMA_VID(pSma), indexUid, refVal);
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
smaDebug("vgId:%d, wait 1s to drop index %" PRIi64 " since refVal=%d", SMA_VID(pSma), indexUid, refVal);
|
|
||||||
taosSsleep(1);
|
|
||||||
if (++nSleep > SMA_DROP_EXPIRED_TIME) {
|
|
||||||
smaDebug("vgId:%d, drop index %" PRIi64 " after wait %d (refVal=%d)", SMA_VID(pSma), indexUid, nSleep,
|
|
||||||
refVal);
|
|
||||||
break;
|
|
||||||
};
|
|
||||||
}
|
|
||||||
|
|
||||||
tdFreeSmaStatItem(pItem);
|
|
||||||
smaDebug("vgId:%d, getTSmaDataImpl failed since no index %" PRIi64 " in local cache", SMA_VID(pSma), indexUid);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
// clear sma data files
|
|
||||||
// TODO:
|
|
||||||
return TSDB_CODE_SUCCESS;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* @brief
|
|
||||||
*
|
|
||||||
* @param pSma Return the data between queryWin and fill the pData.
|
|
||||||
* @param pData
|
|
||||||
* @param indexUid
|
|
||||||
* @param pQuerySKey
|
|
||||||
* @param nMaxResult The query invoker should control the nMaxResult need to return to avoid OOM.
|
|
||||||
* @return int32_t
|
|
||||||
*/
|
|
||||||
int32_t tdGetTSmaDataImpl(SSma *pSma, char *pData, int64_t indexUid, TSKEY querySKey, int32_t nMaxResult) {
|
|
||||||
SSmaEnv *pEnv = atomic_load_ptr(&SMA_TSMA_ENV(pSma));
|
|
||||||
SSmaStat *pStat = NULL;
|
|
||||||
|
|
||||||
if (!pEnv) {
|
|
||||||
terrno = TSDB_CODE_INVALID_PTR;
|
|
||||||
smaWarn("vgId:%d, getTSmaDataImpl failed since pTSmaEnv is NULL", SMA_VID(pSma));
|
|
||||||
return TSDB_CODE_FAILED;
|
|
||||||
}
|
|
||||||
|
|
||||||
pStat = SMA_ENV_STAT(pEnv);
|
|
||||||
|
|
||||||
tdRefSmaStat(pSma, pStat);
|
|
||||||
SSmaStatItem *pItem = taosHashGet(SMA_ENV_STAT_ITEMS(pEnv), &indexUid, sizeof(indexUid));
|
|
||||||
if (!pItem || !(pItem = *(SSmaStatItem **)pItem)) {
|
|
||||||
// Normally pItem should not be NULL, mark all windows as expired and notify query module to fetch raw TS data if
|
|
||||||
// it's NULL.
|
|
||||||
tdUnRefSmaStat(pSma, pStat);
|
|
||||||
terrno = TSDB_CODE_TDB_INVALID_ACTION;
|
|
||||||
smaDebug("vgId:%d, getTSmaDataImpl failed since no index %" PRIi64, SMA_VID(pSma), indexUid);
|
|
||||||
return TSDB_CODE_FAILED;
|
|
||||||
}
|
|
||||||
|
|
||||||
#if 0
|
|
||||||
int32_t nQueryWin = taosArrayGetSize(pQuerySKey);
|
|
||||||
for (int32_t n = 0; n < nQueryWin; ++n) {
|
|
||||||
TSKEY skey = taosArrayGet(pQuerySKey, n);
|
|
||||||
if (taosHashGet(pItem->expireWindows, &skey, sizeof(TSKEY))) {
|
|
||||||
// TODO: mark this window as expired.
|
|
||||||
}
|
|
||||||
}
|
|
||||||
#endif
|
|
||||||
|
|
||||||
#if 1
|
|
||||||
int8_t smaStat = 0;
|
|
||||||
if (!tdSmaStatIsOK(pItem, &smaStat)) { // TODO: multiple check for large scale sma query
|
|
||||||
tdUnRefSmaStat(pSma, pStat);
|
|
||||||
terrno = TSDB_CODE_TSMA_INVALID_STAT;
|
|
||||||
smaWarn("vgId:%d, getTSmaDataImpl failed from index %" PRIi64 " since %s %" PRIi8, SMA_VID(pSma), indexUid,
|
|
||||||
tstrerror(terrno), smaStat);
|
|
||||||
return TSDB_CODE_FAILED;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (taosHashGet(pItem->expireWindows, &querySKey, sizeof(TSKEY))) {
|
|
||||||
// TODO: mark this window as expired.
|
|
||||||
smaDebug("vgId:%d, skey %" PRIi64 " of window exists in expire window for index %" PRIi64, SMA_VID(pSma), querySKey,
|
|
||||||
indexUid);
|
|
||||||
} else {
|
|
||||||
smaDebug("vgId:%d, skey %" PRIi64 " of window not in expire window for index %" PRIi64, SMA_VID(pSma), querySKey,
|
|
||||||
indexUid);
|
|
||||||
}
|
|
||||||
|
|
||||||
STSma *pTSma = pItem->pTSma;
|
|
||||||
#endif
|
|
||||||
|
|
||||||
#if 1
|
|
||||||
STSmaReadH tReadH = {0};
|
|
||||||
tdInitTSmaReadH(&tReadH, pSma, pTSma->interval, pTSma->intervalUnit);
|
|
||||||
smaCloseDBF(&tReadH.dFile);
|
|
||||||
|
|
||||||
tdUnRefSmaStat(pSma, pStat);
|
|
||||||
|
|
||||||
tdInitTSmaFile(&tReadH, indexUid, querySKey);
|
|
||||||
smaDebug("### vgId:%d, read from DBF %s days:%d, interval:%" PRIi64 ", storageLevel:%" PRIi8 " queryKey:%" PRIi64,
|
|
||||||
SMA_VID(pSma), tReadH.dFile.path, tReadH.days, tReadH.interval, tReadH.storageLevel, querySKey);
|
|
||||||
if (smaOpenDBF(pEnv->dbEnv, &tReadH.dFile) != 0) {
|
|
||||||
smaWarn("vgId:%d, open DBF %s failed since %s", SMA_VID(pSma), tReadH.dFile.path, tstrerror(terrno));
|
|
||||||
return TSDB_CODE_FAILED;
|
|
||||||
}
|
|
||||||
|
|
||||||
char smaKey[SMA_KEY_LEN] = {0};
|
|
||||||
void *pSmaKey = &smaKey;
|
|
||||||
int64_t queryGroupId = 0;
|
|
||||||
tdEncodeTSmaKey(queryGroupId, querySKey, (void **)&pSmaKey);
|
|
||||||
|
|
||||||
smaDebug("vgId:%d, get sma data from %s: smaKey %" PRIx64 "-%" PRIx64 ", keyLen %d", SMA_VID(pSma), tReadH.dFile.path,
|
|
||||||
*(int64_t *)smaKey, *(int64_t *)POINTER_SHIFT(smaKey, 8), SMA_KEY_LEN);
|
|
||||||
|
|
||||||
void *result = NULL;
|
|
||||||
int32_t valueSize = 0;
|
|
||||||
if (!(result = smaGetSmaDataByKey(&tReadH.dFile, smaKey, SMA_KEY_LEN, &valueSize))) {
|
|
||||||
smaWarn("vgId:%d, get sma data failed from smaIndex %" PRIi64 ", smaKey %" PRIx64 "-%" PRIx64 " since %s",
|
|
||||||
SMA_VID(pSma), indexUid, *(int64_t *)smaKey, *(int64_t *)POINTER_SHIFT(smaKey, 8), tstrerror(terrno));
|
|
||||||
smaCloseDBF(&tReadH.dFile);
|
|
||||||
return TSDB_CODE_FAILED;
|
|
||||||
}
|
|
||||||
#endif
|
|
||||||
|
|
||||||
#ifdef _TEST_SMA_PRINT_DEBUG_LOG_
|
|
||||||
for (uint32_t v = 0; v < valueSize; v += 8) {
|
|
||||||
smaWarn("vgId:%d, get sma data v[%d]=%" PRIi64, SMA_VID(pSma), v, *(int64_t *)POINTER_SHIFT(result, v));
|
|
||||||
}
|
|
||||||
#endif
|
|
||||||
taosMemoryFreeClear(result); // TODO: fill the result to output
|
|
||||||
|
|
||||||
#if 0
|
|
||||||
int32_t nResult = 0;
|
|
||||||
int64_t lastKey = 0;
|
|
||||||
|
|
||||||
while (true) {
|
|
||||||
if (nResult >= nMaxResult) {
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
|
|
||||||
// set and open the file according to the STSma param
|
|
||||||
if (tdSetAndOpenTSmaFile(&tReadH, queryWin)) {
|
|
||||||
char bTree[100] = "\0";
|
|
||||||
while (strncmp(bTree, "has more nodes", 100) == 0) {
|
|
||||||
if (nResult >= nMaxResult) {
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
// tdGetDataFromBTree(bTree, queryWin, lastKey)
|
|
||||||
// fill the pData
|
|
||||||
++nResult;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
#endif
|
|
||||||
// read data from file and fill the result
|
|
||||||
smaCloseDBF(&tReadH.dFile);
|
|
||||||
return TSDB_CODE_SUCCESS;
|
|
||||||
}
|
|
||||||
|
|
||||||
int32_t tdProcessTSmaCreateImpl(SSma *pSma, int64_t version, const char *pMsg) {
|
int32_t tdProcessTSmaCreateImpl(SSma *pSma, int64_t version, const char *pMsg) {
|
||||||
SSmaCfg *pCfg = (SSmaCfg *)pMsg;
|
SSmaCfg *pCfg = (SSmaCfg *)pMsg;
|
||||||
|
|
||||||
|
@ -713,293 +168,3 @@ int32_t tdProcessTSmaCreateImpl(SSma *pSma, int64_t version, const char *pMsg) {
|
||||||
tdTSmaAdd(pSma, 1);
|
tdTSmaAdd(pSma, 1);
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t tdDropTSma(SSma *pSma, char *pMsg) {
|
|
||||||
#if 0
|
|
||||||
SVDropTSmaReq vDropSmaReq = {0};
|
|
||||||
if (!tDeserializeSVDropTSmaReq(pMsg, &vDropSmaReq)) {
|
|
||||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
|
|
||||||
// TODO: send msg to stream computing to drop tsma
|
|
||||||
// if ((send msg to stream computing) < 0) {
|
|
||||||
// tDestroyTSma(&vCreateSmaReq);
|
|
||||||
// return -1;
|
|
||||||
// }
|
|
||||||
//
|
|
||||||
|
|
||||||
if (metaDropTSma(SMA_META(pSma), vDropSmaReq.indexUid) < 0) {
|
|
||||||
// TODO: handle error
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (tdDropTSmaData(pSma, vDropSmaReq.indexUid) < 0) {
|
|
||||||
// TODO: handle error
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
|
|
||||||
tdTSmaSub(pSma, 1);
|
|
||||||
#endif
|
|
||||||
|
|
||||||
// TODO: return directly or go on follow steps?
|
|
||||||
return TSDB_CODE_SUCCESS;
|
|
||||||
}
|
|
||||||
|
|
||||||
static SSmaStatItem *tdNewSmaStatItem(int8_t state) {
|
|
||||||
SSmaStatItem *pItem = NULL;
|
|
||||||
|
|
||||||
pItem = (SSmaStatItem *)taosMemoryCalloc(1, sizeof(SSmaStatItem));
|
|
||||||
if (!pItem) {
|
|
||||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
|
||||||
return NULL;
|
|
||||||
}
|
|
||||||
|
|
||||||
pItem->state = state;
|
|
||||||
pItem->expireWindows = taosHashInit(SMA_STATE_ITEM_HASH_SLOT, taosGetDefaultHashFunction(TSDB_DATA_TYPE_TIMESTAMP),
|
|
||||||
true, HASH_ENTRY_LOCK);
|
|
||||||
if (!pItem->expireWindows) {
|
|
||||||
taosMemoryFreeClear(pItem);
|
|
||||||
return NULL;
|
|
||||||
}
|
|
||||||
|
|
||||||
return pItem;
|
|
||||||
}
|
|
||||||
|
|
||||||
static int32_t tdSetExpireWindow(SSma *pSma, SHashObj *pItemsHash, int64_t indexUid, int64_t winSKey, int64_t version) {
|
|
||||||
SSmaStatItem *pItem = taosHashGet(pItemsHash, &indexUid, sizeof(indexUid));
|
|
||||||
if (!pItem) {
|
|
||||||
// TODO: use TSDB_SMA_STAT_EXPIRED and update by stream computing later
|
|
||||||
pItem = tdNewSmaStatItem(TSDB_SMA_STAT_OK); // TODO use the real state
|
|
||||||
if (!pItem) {
|
|
||||||
// Response to stream computing: OOM
|
|
||||||
// For query, if the indexUid not found, the TSDB should tell query module to query raw TS data.
|
|
||||||
return TSDB_CODE_FAILED;
|
|
||||||
}
|
|
||||||
|
|
||||||
// cache smaMeta
|
|
||||||
STSma *pTSma = metaGetSmaInfoByIndex(SMA_META(pSma), indexUid);
|
|
||||||
if (!pTSma) {
|
|
||||||
terrno = TSDB_CODE_TSMA_NO_INDEX_IN_META;
|
|
||||||
taosHashCleanup(pItem->expireWindows);
|
|
||||||
taosMemoryFree(pItem);
|
|
||||||
smaWarn("vgId:%d, set expire window, get tsma meta failed for smaIndex %" PRIi64 " since %s", SMA_VID(pSma),
|
|
||||||
indexUid, tstrerror(terrno));
|
|
||||||
return TSDB_CODE_FAILED;
|
|
||||||
}
|
|
||||||
pItem->pTSma = pTSma;
|
|
||||||
|
|
||||||
if (taosHashPut(pItemsHash, &indexUid, sizeof(indexUid), &pItem, sizeof(pItem)) != 0) {
|
|
||||||
// If error occurs during put smaStatItem, free the resources of pItem
|
|
||||||
taosHashCleanup(pItem->expireWindows);
|
|
||||||
taosMemoryFree(pItem);
|
|
||||||
return TSDB_CODE_FAILED;
|
|
||||||
}
|
|
||||||
} else if (!(pItem = *(SSmaStatItem **)pItem)) {
|
|
||||||
terrno = TSDB_CODE_INVALID_PTR;
|
|
||||||
return TSDB_CODE_FAILED;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (taosHashPut(pItem->expireWindows, &winSKey, sizeof(TSKEY), &version, sizeof(version)) != 0) {
|
|
||||||
// If error occurs during taosHashPut expire windows, remove the smaIndex from pSma->pSmaStat, thus TSDB would
|
|
||||||
// tell query module to query raw TS data.
|
|
||||||
// N.B.
|
|
||||||
// 1) It is assumed to be extemely little probability event of fail to taosHashPut.
|
|
||||||
// 2) This would solve the inconsistency to some extent, but not completely, unless we record all expired
|
|
||||||
// windows failed to put into hash table.
|
|
||||||
taosHashCleanup(pItem->expireWindows);
|
|
||||||
taosMemoryFreeClear(pItem->pTSma);
|
|
||||||
taosHashRemove(pItemsHash, &indexUid, sizeof(indexUid));
|
|
||||||
smaWarn("vgId:%d, smaIndex %" PRIi64 ", put skey %" PRIi64 " to expire window fail", SMA_VID(pSma), indexUid,
|
|
||||||
winSKey);
|
|
||||||
return TSDB_CODE_FAILED;
|
|
||||||
}
|
|
||||||
|
|
||||||
smaDebug("vgId:%d, smaIndex %" PRIi64 ", put skey %" PRIi64 " to expire window succeed", SMA_VID(pSma), indexUid,
|
|
||||||
winSKey);
|
|
||||||
return TSDB_CODE_SUCCESS;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* @brief Update expire window according to msg from stream computing module.
|
|
||||||
*
|
|
||||||
* @param pSma
|
|
||||||
* @param msg SSubmitReq
|
|
||||||
* @return int32_t
|
|
||||||
*/
|
|
||||||
int32_t tdUpdateExpireWindowImpl(SSma *pSma, const SSubmitReq *pMsg, int64_t version) {
|
|
||||||
// no time-range-sma, just return success
|
|
||||||
if (atomic_load_16(&SMA_TSMA_NUM(pSma)) <= 0) {
|
|
||||||
smaTrace("vgId:%d, not update expire window since no tsma", SMA_VID(pSma));
|
|
||||||
return TSDB_CODE_SUCCESS;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (!SMA_META(pSma)) {
|
|
||||||
terrno = TSDB_CODE_INVALID_PTR;
|
|
||||||
smaError("vgId:%d, update expire window failed since no meta ptr", SMA_VID(pSma));
|
|
||||||
return TSDB_CODE_FAILED;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (tdCheckAndInitSmaEnv(pSma, TSDB_SMA_TYPE_TIME_RANGE, false) < 0) {
|
|
||||||
smaError("vgId:%d, init tsma env failed since %s", SMA_VID(pSma), terrstr(terrno));
|
|
||||||
terrno = TSDB_CODE_TDB_INIT_FAILED;
|
|
||||||
return TSDB_CODE_FAILED;
|
|
||||||
}
|
|
||||||
|
|
||||||
// Firstly, assume that tsma can only be created on super table/normal table.
|
|
||||||
// getActiveTimeWindow
|
|
||||||
|
|
||||||
SSmaEnv *pEnv = SMA_TSMA_ENV(pSma);
|
|
||||||
SSmaStat *pStat = SMA_ENV_STAT(pEnv);
|
|
||||||
SHashObj *pItemsHash = SMA_ENV_STAT_ITEMS(pEnv);
|
|
||||||
|
|
||||||
TASSERT(pEnv && pStat && pItemsHash);
|
|
||||||
|
|
||||||
// basic procedure
|
|
||||||
// TODO: optimization
|
|
||||||
tdRefSmaStat(pSma, pStat);
|
|
||||||
|
|
||||||
SSubmitMsgIter msgIter = {0};
|
|
||||||
SSubmitBlk *pBlock = NULL;
|
|
||||||
SInterval interval = {0};
|
|
||||||
TSKEY lastWinSKey = INT64_MIN;
|
|
||||||
|
|
||||||
if (tInitSubmitMsgIter(pMsg, &msgIter) < 0) {
|
|
||||||
return TSDB_CODE_FAILED;
|
|
||||||
}
|
|
||||||
|
|
||||||
while (true) {
|
|
||||||
tGetSubmitMsgNext(&msgIter, &pBlock);
|
|
||||||
if (!pBlock) break;
|
|
||||||
|
|
||||||
STSmaWrapper *pSW = NULL;
|
|
||||||
STSma *pTSma = NULL;
|
|
||||||
|
|
||||||
SSubmitBlkIter blkIter = {0};
|
|
||||||
if (tInitSubmitBlkIter(&msgIter, pBlock, &blkIter) < 0) {
|
|
||||||
pSW = tFreeTSmaWrapper(pSW, false);
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
|
|
||||||
while (true) {
|
|
||||||
STSRow *row = tGetSubmitBlkNext(&blkIter);
|
|
||||||
if (!row) {
|
|
||||||
pSW = tFreeTSmaWrapper(pSW, false);
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
if (!pSW || (pTSma && (pTSma->tableUid != msgIter.suid))) {
|
|
||||||
if (pSW) {
|
|
||||||
pSW = tFreeTSmaWrapper(pSW, false);
|
|
||||||
}
|
|
||||||
if (!(pSW = metaGetSmaInfoByTable(SMA_META(pSma), msgIter.suid, false))) {
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
if ((pSW->number) <= 0 || !pSW->tSma) {
|
|
||||||
pSW = tFreeTSmaWrapper(pSW, false);
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
|
|
||||||
pTSma = pSW->tSma;
|
|
||||||
|
|
||||||
interval.interval = pTSma->interval;
|
|
||||||
interval.intervalUnit = pTSma->intervalUnit;
|
|
||||||
interval.offset = pTSma->offset;
|
|
||||||
interval.precision = SMA_TSDB_CFG(pSma)->precision;
|
|
||||||
interval.sliding = pTSma->sliding;
|
|
||||||
interval.slidingUnit = pTSma->slidingUnit;
|
|
||||||
}
|
|
||||||
|
|
||||||
// TODO: process multiple tsma for one table uid
|
|
||||||
TSKEY winSKey = taosTimeTruncate(TD_ROW_KEY(row), &interval, interval.precision);
|
|
||||||
|
|
||||||
if (lastWinSKey != winSKey) {
|
|
||||||
lastWinSKey = winSKey;
|
|
||||||
if (tdSetExpireWindow(pSma, pItemsHash, pTSma->indexUid, winSKey, version) < 0) {
|
|
||||||
pSW = tFreeTSmaWrapper(pSW, false);
|
|
||||||
tdUnRefSmaStat(pSma, pStat);
|
|
||||||
return TSDB_CODE_FAILED;
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
smaDebug("vgId:%d, smaIndex %" PRIi64 ", put skey %" PRIi64 " to expire window ignore as duplicated",
|
|
||||||
SMA_VID(pSma), pTSma->indexUid, winSKey);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
tdUnRefSmaStat(pSma, pStat);
|
|
||||||
|
|
||||||
return TSDB_CODE_SUCCESS;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* @brief Clear skeys from tsma dstVgroups in expire window.
|
|
||||||
*
|
|
||||||
* @param pSma
|
|
||||||
* @param pMsg
|
|
||||||
* @return int32_t
|
|
||||||
*/
|
|
||||||
int32_t tdClearExpireWindowImpl(SSma *pSma, const SVClrTsmaExpWndsReq *pMsg) {
|
|
||||||
int64_t indexUid = pMsg->indexUid;
|
|
||||||
|
|
||||||
if (atomic_load_16(&SMA_TSMA_NUM(pSma)) <= 0) {
|
|
||||||
smaWarn("vgId:%d, not clear expire window since no tsma for smaIndex %" PRIi64, SMA_VID(pSma), indexUid);
|
|
||||||
terrno = TSDB_CODE_TSMA_INVALID_ENV;
|
|
||||||
return TSDB_CODE_FAILED;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (tdCheckAndInitSmaEnv(pSma, TSDB_SMA_TYPE_TIME_RANGE, true) < 0) {
|
|
||||||
smaWarn("vgId:%d, not clear expire window since no tsma env", SMA_VID(pSma));
|
|
||||||
terrno = TSDB_CODE_TSMA_INVALID_ENV;
|
|
||||||
return TSDB_CODE_FAILED;
|
|
||||||
}
|
|
||||||
|
|
||||||
// Firstly, assume that tsma can only be created on super table/normal table.
|
|
||||||
// getActiveTimeWindow
|
|
||||||
|
|
||||||
SSmaEnv *pEnv = SMA_TSMA_ENV(pSma);
|
|
||||||
SSmaStat *pStat = SMA_ENV_STAT(pEnv);
|
|
||||||
SHashObj *pItemsHash = SMA_ENV_STAT_ITEMS(pEnv);
|
|
||||||
|
|
||||||
ASSERT(pEnv && pStat && pItemsHash);
|
|
||||||
|
|
||||||
// basic procedure
|
|
||||||
// TODO: optimization
|
|
||||||
tdRefSmaStat(pSma, pStat);
|
|
||||||
|
|
||||||
SSmaStatItem *pItem = taosHashGet(pItemsHash, &indexUid, sizeof(indexUid));
|
|
||||||
if (!pItem || !(pItem = *(SSmaStatItem **)pItem)) {
|
|
||||||
smaWarn("vgId:%d, no sma item to clear expire window for smaIndex %" PRIi64, SMA_VID(pSma), indexUid);
|
|
||||||
terrno = TSDB_CODE_TSMA_NO_INDEX_IN_CACHE;
|
|
||||||
tdUnRefSmaStat(pSma, pStat);
|
|
||||||
return TSDB_CODE_FAILED;
|
|
||||||
}
|
|
||||||
|
|
||||||
for (int64_t i = 0; i < pMsg->nItems; ++i) {
|
|
||||||
const SVTsmaExpWndItem *pWndItem = &pMsg->items[i];
|
|
||||||
int64_t winSKey = pWndItem->skey;
|
|
||||||
for (int64_t j = 0; j < pWndItem->nKeys; ++j) {
|
|
||||||
winSKey += pItem->pTSma->interval;
|
|
||||||
if (taosHashRemove(pItem->expireWindows, &winSKey, sizeof(winSKey)) != 0) {
|
|
||||||
// If error occurs during taosHashRemove expire windows, remove the smaIndex from pSma->pSmaStat, thus TSDB
|
|
||||||
// would tell query module to query raw TS data. N.B.
|
|
||||||
// 1) It is assumed to be extemely little probability event of fail to taosHashPut.
|
|
||||||
// 2) This would solve the inconsistency to some extent, but not completely, unless we record all expired
|
|
||||||
// windows failed to put into hash table.
|
|
||||||
taosHashCleanup(pItem->expireWindows);
|
|
||||||
taosMemoryFreeClear(pItem->pTSma);
|
|
||||||
taosHashRemove(pItemsHash, &indexUid, sizeof(indexUid));
|
|
||||||
smaWarn("vgId:%d, rm skey %" PRIi64 " in expire window for smaIndex %" PRIi64 " fail", SMA_VID(pSma), winSKey,
|
|
||||||
indexUid);
|
|
||||||
terrno = TSDB_CODE_TSMA_RM_SKEY_IN_HASH;
|
|
||||||
tdUnRefSmaStat(pSma, pStat);
|
|
||||||
return TSDB_CODE_FAILED;
|
|
||||||
}
|
|
||||||
smaDebug("vgId:%d, rm skey %" PRIi64 " in expire window for smaIndex %" PRIi64 " success", SMA_VID(pSma), winSKey,
|
|
||||||
indexUid);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
tdUnRefSmaStat(pSma, pStat);
|
|
||||||
|
|
||||||
return TSDB_CODE_SUCCESS;
|
|
||||||
}
|
|
|
@ -238,16 +238,13 @@ int tqPushMsg(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_t ver)
|
||||||
if (msgType == TDMT_VND_SUBMIT) {
|
if (msgType == TDMT_VND_SUBMIT) {
|
||||||
if (taosHashGetSize(pTq->pStreamTasks) == 0) return 0;
|
if (taosHashGetSize(pTq->pStreamTasks) == 0) return 0;
|
||||||
|
|
||||||
if (tdUpdateExpireWindow(pTq->pVnode->pSma, msg, ver) != 0) {
|
void* data = taosMemoryMalloc(msgLen);
|
||||||
// TODO handle sma error
|
if (data == NULL) {
|
||||||
|
return -1;
|
||||||
}
|
}
|
||||||
// void* data = taosMemoryMalloc(msgLen);
|
memcpy(data, msg, msgLen);
|
||||||
// if (data == NULL) {
|
|
||||||
// return -1;
|
|
||||||
// }
|
|
||||||
// memcpy(data, msg, msgLen);
|
|
||||||
|
|
||||||
// tqProcessStreamTrigger(pTq, data);
|
tqProcessStreamTrigger(pTq, data);
|
||||||
}
|
}
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
|
|
|
@ -26,7 +26,6 @@ static int32_t vnodeProcessCreateTSmaReq(SVnode *pVnode, int64_t version, void *
|
||||||
static int32_t vnodeProcessAlterConfirmReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp);
|
static int32_t vnodeProcessAlterConfirmReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp);
|
||||||
static int32_t vnodeProcessAlterHasnRangeReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp);
|
static int32_t vnodeProcessAlterHasnRangeReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp);
|
||||||
static int32_t vnodeProcessWriteMsg(SVnode *pVnode, int64_t version, SRpcMsg *pMsg, SRpcMsg *pRsp);
|
static int32_t vnodeProcessWriteMsg(SVnode *pVnode, int64_t version, SRpcMsg *pMsg, SRpcMsg *pRsp);
|
||||||
static int32_t vnodeProcessExpWndsClrReq(SVnode *pVnode, void *pReq, int32_t len, SRpcMsg *pRsp);
|
|
||||||
|
|
||||||
int32_t vnodePreprocessReq(SVnode *pVnode, SRpcMsg *pMsg) {
|
int32_t vnodePreprocessReq(SVnode *pVnode, SRpcMsg *pMsg) {
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
|
@ -177,12 +176,10 @@ int32_t vnodeProcessWriteReq(SVnode *pVnode, SRpcMsg *pMsg, int64_t version, SRp
|
||||||
|
|
||||||
vTrace("vgId:%d, process %s request success, index:%" PRId64, TD_VID(pVnode), TMSG_INFO(pMsg->msgType), version);
|
vTrace("vgId:%d, process %s request success, index:%" PRId64, TD_VID(pVnode), TMSG_INFO(pMsg->msgType), version);
|
||||||
|
|
||||||
#if 0
|
|
||||||
if (tqPushMsg(pVnode->pTq, pMsg->pCont, pMsg->contLen, pMsg->msgType, version) < 0) {
|
if (tqPushMsg(pVnode->pTq, pMsg->pCont, pMsg->contLen, pMsg->msgType, version) < 0) {
|
||||||
vError("vgId:%d, failed to push msg to TQ since %s", TD_VID(pVnode), tstrerror(terrno));
|
vError("vgId:%d, failed to push msg to TQ since %s", TD_VID(pVnode), tstrerror(terrno));
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
#endif
|
|
||||||
|
|
||||||
// commit if need
|
// commit if need
|
||||||
if (vnodeShouldCommit(pVnode)) {
|
if (vnodeShouldCommit(pVnode)) {
|
||||||
|
@ -254,8 +251,6 @@ int32_t vnodeProcessFetchMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo) {
|
||||||
return tqProcessTaskDispatchRsp(pVnode->pTq, pMsg);
|
return tqProcessTaskDispatchRsp(pVnode->pTq, pMsg);
|
||||||
case TDMT_STREAM_TASK_RECOVER_RSP:
|
case TDMT_STREAM_TASK_RECOVER_RSP:
|
||||||
return tqProcessTaskRecoverRsp(pVnode->pTq, pMsg);
|
return tqProcessTaskRecoverRsp(pVnode->pTq, pMsg);
|
||||||
case TDMT_VND_CLR_TSMA_EXP_WNDS:
|
|
||||||
return vnodeProcessExpWndsClrReq(pVnode, pMsg, msgLen, NULL);
|
|
||||||
default:
|
default:
|
||||||
vError("unknown msg type:%d in fetch queue", pMsg->msgType);
|
vError("unknown msg type:%d in fetch queue", pMsg->msgType);
|
||||||
return TSDB_CODE_VND_APP_ERROR;
|
return TSDB_CODE_VND_APP_ERROR;
|
||||||
|
@ -283,10 +278,7 @@ void smaHandleRes(void *pVnode, int64_t smaId, const SArray *data) {
|
||||||
// TODO
|
// TODO
|
||||||
|
|
||||||
// blockDebugShowData(data, __func__);
|
// blockDebugShowData(data, __func__);
|
||||||
#if 0
|
|
||||||
tdProcessTSmaInsert(((SVnode *)pVnode)->pSma, smaId, (const char *)data);
|
tdProcessTSmaInsert(((SVnode *)pVnode)->pSma, smaId, (const char *)data);
|
||||||
#endif
|
|
||||||
tdProcessTSmaInsert(((SVnode *)pVnode)->pSma, TD_DEBUG_SMA_ID, NULL);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
void vnodeUpdateMetaRsp(SVnode *pVnode, STableMetaRsp *pMetaRsp) {
|
void vnodeUpdateMetaRsp(SVnode *pVnode, STableMetaRsp *pMetaRsp) {
|
||||||
|
@ -900,45 +892,6 @@ static int32_t vnodeProcessAlterConfirmReq(SVnode *pVnode, int64_t version, void
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t vnodeProcessExpWndsClrReq(SVnode *pVnode, void *pReq, int32_t len, SRpcMsg *pRsp) {
|
|
||||||
SVClrTsmaExpWndsReq req = {0};
|
|
||||||
SDecoder coder = {0};
|
|
||||||
|
|
||||||
if (pRsp) {
|
|
||||||
pRsp->msgType = TDMT_VND_CLR_TSMA_EXP_WNDS_RSP;
|
|
||||||
pRsp->code = TSDB_CODE_SUCCESS;
|
|
||||||
pRsp->pCont = NULL;
|
|
||||||
pRsp->contLen = 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
// decode and process
|
|
||||||
tDecoderInit(&coder, pReq, len);
|
|
||||||
|
|
||||||
if (tDecodeSVClrTsmaExpWndsReq(&coder, &req) < 0) {
|
|
||||||
terrno = TSDB_CODE_MSG_DECODE_ERROR;
|
|
||||||
if (pRsp) pRsp->code = terrno;
|
|
||||||
goto _err;
|
|
||||||
}
|
|
||||||
|
|
||||||
ASSERT(0);
|
|
||||||
|
|
||||||
if (tdClearExpireWindow(pVnode->pSma, (const SVClrTsmaExpWndsReq *)&req) < 0) {
|
|
||||||
if (pRsp) pRsp->code = terrno;
|
|
||||||
goto _err;
|
|
||||||
}
|
|
||||||
|
|
||||||
tDecoderClear(&coder);
|
|
||||||
vDebug("vgId:%d, success to process expWnds clear for tsma %" PRIi64 " version %" PRIi64, TD_VID(pVnode),
|
|
||||||
req.indexUid, req.version);
|
|
||||||
return 0;
|
|
||||||
|
|
||||||
_err:
|
|
||||||
tDecoderClear(&coder);
|
|
||||||
vError("vgId:%d, success to process expWnds clear for tsma %" PRIi64 " version %" PRIi64 " since %s", TD_VID(pVnode),
|
|
||||||
req.indexUid, req.version, terrstr());
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
|
|
||||||
static int32_t vnodeProcessAlterHasnRangeReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp) {
|
static int32_t vnodeProcessAlterHasnRangeReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp) {
|
||||||
vInfo("vgId:%d, alter hashrange msg will be processed", TD_VID(pVnode));
|
vInfo("vgId:%d, alter hashrange msg will be processed", TD_VID(pVnode));
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue