diff --git a/include/common/tmsg.h b/include/common/tmsg.h index 72f36a6995..de64c5157c 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -2455,6 +2455,28 @@ int32_t tDecodeSVGetTsmaExpWndsReq(SDecoder* pCoder, SVGetTsmaExpWndsReq* pReq); int32_t tEncodeSVGetTSmaExpWndsRsp(SEncoder* pCoder, const SVGetTsmaExpWndsRsp* pReq); int32_t tDecodeSVGetTsmaExpWndsRsp(SDecoder* pCoder, SVGetTsmaExpWndsRsp* pReq); +typedef struct { + int64_t nKeys; // n consecutive keys since skey + int64_t skey; +} SVTsmaExpWndItem; + +typedef struct { + int64_t indexUid; + int64_t version; // tsma result version + int64_t nItems; + SVTsmaExpWndItem items[]; +} SVClrTsmaExpWndsReq; + +typedef struct { + int64_t indexUid; + int32_t code; +} SVClrTsmaExpWndsRsp; + +int32_t tEncodeSVClrTsmaExpWndsReq(SEncoder* pCoder, const SVClrTsmaExpWndsReq* pReq); +int32_t tDecodeSVClrTsmaExpWndsReq(SDecoder* pCoder, SVClrTsmaExpWndsReq* pReq); +int32_t tEncodeSVClrTsmaExpWndsRsp(SEncoder* pCoder, const SVClrTsmaExpWndsRsp* pReq); +int32_t tDecodeSVClrTsmaExpWndsRsp(SDecoder* pCoder, SVClrTsmaExpWndsRsp* pReq); + typedef struct { int idx; } SMCreateFullTextReq; diff --git a/include/common/tmsgdef.h b/include/common/tmsgdef.h index e922df64b3..81ddc68d5d 100644 --- a/include/common/tmsgdef.h +++ b/include/common/tmsgdef.h @@ -190,6 +190,7 @@ enum { TD_DEF_MSG_TYPE(TDMT_VND_DROP_SMA, "vnode-drop-sma", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_VND_SUBMIT_RSMA, "vnode-submit-rsma", SSubmitReq, SSubmitRsp) TD_DEF_MSG_TYPE(TDMT_VND_GET_TSMA_EXP_WNDS, "vnode-get-tsma-expired-windows", SVGetTsmaExpWndsReq, SVGetTsmaExpWndsRsp) + TD_DEF_MSG_TYPE(TDMT_VND_CLR_TSMA_EXP_WNDS, "vnode-clr-tsma-expired-windows", SVClrTsmaExpWndsReq, SVClrTsmaExpWndsRsp) TD_DEF_MSG_TYPE(TDMT_VND_DELETE, "delete-data", SVDeleteReq, SVDeleteRsp) TD_DEF_MSG_TYPE(TDMT_VND_ALTER_CONFIG, "alter-config", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_VND_ALTER_REPLICA, "alter-replica", NULL, NULL) diff --git a/source/common/src/tmsg.c b/source/common/src/tmsg.c index bbd7c2e7a0..d7ebf03fe1 100644 --- a/source/common/src/tmsg.c +++ b/source/common/src/tmsg.c @@ -2419,7 +2419,7 @@ int32_t tDeserializeSTableIndexReq(void *buf, int32_t bufLen, STableIndexReq *pR return 0; } -int32_t tSerializeSTableIndexInfo(SEncoder *pEncoder, STableIndexInfo* pInfo) { +int32_t tSerializeSTableIndexInfo(SEncoder *pEncoder, STableIndexInfo *pInfo) { if (tEncodeI8(pEncoder, pInfo->intervalUnit) < 0) return -1; if (tEncodeI8(pEncoder, pInfo->slidingUnit) < 0) return -1; if (tEncodeI64(pEncoder, pInfo->interval) < 0) return -1; @@ -2440,7 +2440,7 @@ int32_t tSerializeSTableIndexRsp(void *buf, int32_t bufLen, const STableIndexRsp if (tEncodeI32(&encoder, num) < 0) return -1; if (num > 0) { for (int32_t i = 0; i < num; ++i) { - STableIndexInfo* pInfo = (STableIndexInfo*)taosArrayGet(pRsp->pIndex, i); + STableIndexInfo *pInfo = (STableIndexInfo *)taosArrayGet(pRsp->pIndex, i); if (tSerializeSTableIndexInfo(&encoder, pInfo) < 0) return -1; } } @@ -2489,7 +2489,6 @@ int32_t tDeserializeSTableIndexRsp(void *buf, int32_t bufLen, STableIndexRsp *pR return 0; } - int32_t tSerializeSShowReq(void *buf, int32_t bufLen, SShowReq *pReq) { SEncoder encoder = {0}; tEncoderInit(&encoder, buf, bufLen); @@ -3856,7 +3855,7 @@ int32_t tEncodeTSma(SEncoder *pCoder, const STSma *pSma) { if (tEncodeCStr(pCoder, pSma->tagsFilter) < 0) return -1; } - if (pSma->numOfVgroups) { // only needed in dstVgroup + if (pSma->numOfVgroups) { // only needed in dstVgroup for (int32_t v = 0; v < pSma->numOfVgroups; ++v) { if (tEncodeI32(pCoder, pSma->pVgEpSet[v].vgId) < 0) return -1; if (tEncodeI8(pCoder, pSma->pVgEpSet[v].epSet.inUse) < 0) return -1; @@ -3903,7 +3902,7 @@ int32_t tDecodeTSma(SDecoder *pCoder, STSma *pSma) { } else { pSma->tagsFilter = NULL; } - if (pSma->numOfVgroups > 0) { // only needed in dstVgroup + if (pSma->numOfVgroups > 0) { // only needed in dstVgroup pSma->pVgEpSet = (SVgEpSet *)tDecoderMalloc(pCoder, pSma->numOfVgroups * sizeof(SVgEpSet)); if (!pSma->pVgEpSet) { terrno = TSDB_CODE_OUT_OF_MEMORY; @@ -4018,6 +4017,49 @@ int32_t tDecodeSVGetTsmaExpWndsRsp(SDecoder *pCoder, SVGetTsmaExpWndsRsp *pReq) return 0; } +int32_t tEncodeSVClrTsmaExpWndsReq(SEncoder *pCoder, const SVClrTsmaExpWndsReq *pReq) { + if (tStartEncode(pCoder) < 0) return -1; + if (tEncodeI64(pCoder, pReq->indexUid) < 0) return -1; + if (tEncodeI64(pCoder, pReq->version) < 0) return -1; + if (tEncodeI64v(pCoder, pReq->nItems) < 0) return -1; + for (int64_t n = 0; pReq->nItems; ++n) { + if (tEncodeI64v(pCoder, pReq->items[n].nKeys) < 0) return -1; + if (tEncodeI64(pCoder, pReq->items[n].skey) < 0) return -1; + } + tEndEncode(pCoder); + return 0; +} + +int32_t tDecodeSVClrTsmaExpWndsReq(SDecoder *pCoder, SVClrTsmaExpWndsReq *pReq) { + if (tStartDecode(pCoder) < 0) return -1; + if (tDecodeI64(pCoder, &pReq->indexUid) < 0) return -1; + if (tDecodeI64(pCoder, &pReq->version) < 0) return -1; + if (tDecodeI64v(pCoder, &pReq->nItems) < 0) return -1; + + for (int64_t i = 0; i < pReq->nItems; ++i) { + if (tDecodeI64v(pCoder, &pReq->items[i].nKeys) < 0) return -1; + if (tDecodeI64(pCoder, &pReq->items[i].skey) < 0) return -1; + } + tEndDecode(pCoder); + return 0; +} + +int32_t tEncodeSVClrTsmaExpWndsRsp(SEncoder *pCoder, const SVClrTsmaExpWndsRsp *pReq) { + if (tStartEncode(pCoder) < 0) return -1; + if (tEncodeI64(pCoder, pReq->indexUid) < 0) return -1; + if (tEncodeI32(pCoder, pReq->code) < 0) return -1; + tEndEncode(pCoder); + return 0; +} + +int32_t tDecodeSVClrTsmaExpWndsRsp(SDecoder *pCoder, SVClrTsmaExpWndsRsp *pReq) { + if (tStartDecode(pCoder) < 0) return -1; + if (tDecodeI64(pCoder, &pReq->indexUid) < 0) return -1; + if (tDecodeI32(pCoder, &pReq->code) < 0) return -1; + tEndDecode(pCoder); + return 0; +} + int32_t tSerializeSVDeleteReq(void *buf, int32_t bufLen, SVDeleteReq *pReq) { int32_t headLen = sizeof(SMsgHead); if (buf != NULL) { diff --git a/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c b/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c index 4c5a32536f..9c3c0af986 100644 --- a/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c +++ b/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c @@ -348,6 +348,7 @@ SArray *vmGetMsgHandles() { if (dmSetMgmtHandle(pArray, TDMT_VND_CONSUME, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_DELETE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_QUERY_HEARTBEAT, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_VND_CLR_TSMA_EXP_WNDS, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_TRIGGER, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_DEPLOY, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER; diff --git a/source/dnode/vnode/src/vnd/vnodeSvr.c b/source/dnode/vnode/src/vnd/vnodeSvr.c index dd705011c4..157162161e 100644 --- a/source/dnode/vnode/src/vnd/vnodeSvr.c +++ b/source/dnode/vnode/src/vnd/vnodeSvr.c @@ -25,6 +25,7 @@ static int32_t vnodeProcessSubmitReq(SVnode *pVnode, int64_t version, void *pReq static int32_t vnodeProcessCreateTSmaReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp); static int32_t vnodeProcessAlterConfirmReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp); static int32_t vnodeProcessWriteMsg(SVnode *pVnode, int64_t version, SRpcMsg *pMsg, SRpcMsg *pRsp); +static int32_t vnodeProcessExpWndsClrReq(SVnode *pVnode, void *pReq, int32_t len, SRpcMsg *pRsp); int32_t vnodePreprocessReq(SVnode *pVnode, SRpcMsg *pMsg) { int32_t code = 0; @@ -174,7 +175,8 @@ int32_t vnodeProcessWriteReq(SVnode *pVnode, SRpcMsg *pMsg, int64_t version, SRp } vTrace("vgId:%d, process %s request success, version: %" PRId64, TD_VID(pVnode), TMSG_INFO(pMsg->msgType), version); -#if 1 + +#if 0 if (tqPushMsg(pVnode->pTq, pMsg->pCont, pMsg->contLen, pMsg->msgType, version) < 0) { vError("vgId:%d, failed to push msg to TQ since %s", TD_VID(pVnode), tstrerror(terrno)); return -1; @@ -225,6 +227,7 @@ int32_t vnodeProcessFetchMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo) { vTrace("message in fetch queue is processing"); char *msgstr = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)); int32_t msgLen = pMsg->contLen - sizeof(SMsgHead); + switch (pMsg->msgType) { case TDMT_VND_FETCH: return qWorkerProcessFetchMsg(pVnode, pVnode->pQuery, pMsg, 0); @@ -236,13 +239,10 @@ int32_t vnodeProcessFetchMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo) { return qWorkerProcessDropMsg(pVnode, pVnode->pQuery, pMsg, 0); case TDMT_VND_QUERY_HEARTBEAT: return qWorkerProcessHbMsg(pVnode, pVnode->pQuery, pMsg, 0); - case TDMT_VND_TABLE_META: return vnodeGetTableMeta(pVnode, pMsg); - case TDMT_VND_CONSUME: return tqProcessPollReq(pVnode->pTq, pMsg, pInfo->workerId); - case TDMT_STREAM_TASK_RUN: return tqProcessTaskRunReq(pVnode->pTq, pMsg); case TDMT_STREAM_TASK_DISPATCH: @@ -253,6 +253,8 @@ int32_t vnodeProcessFetchMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo) { return tqProcessTaskDispatchRsp(pVnode->pTq, pMsg); case TDMT_STREAM_TASK_RECOVER_RSP: return tqProcessTaskRecoverRsp(pVnode->pTq, pMsg); + case TDMT_VND_CLR_TSMA_EXP_WNDS: + return vnodeProcessExpWndsClrReq(pVnode, pMsg, msgLen, NULL); default: vError("unknown msg type:%d in fetch queue", pMsg->msgType); return TSDB_CODE_VND_APP_ERROR; @@ -896,3 +898,42 @@ static int32_t vnodeProcessAlterConfirmReq(SVnode *pVnode, int64_t version, void return 0; } + +static int32_t vnodeProcessExpWndsClrReq(SVnode *pVnode, void *pReq, int32_t len, SRpcMsg *pRsp) { + SVClrTsmaExpWndsReq req = {0}; + SDecoder coder = {0}; + + if (pRsp) { + pRsp->msgType = TDMT_VND_CLR_TSMA_EXP_WNDS_RSP; + pRsp->code = TSDB_CODE_SUCCESS; + pRsp->pCont = NULL; + pRsp->contLen = 0; + } + + // decode and process + tDecoderInit(&coder, pReq, len); + + if (tDecodeSVClrTsmaExpWndsReq(&coder, &req) < 0) { + terrno = TSDB_CODE_MSG_DECODE_ERROR; + if (pRsp) pRsp->code = terrno; + goto _err; + } + + ASSERT(0); + + // if (tdProcess(pVnode->pSma, version, (const char *)&req) < 0) { + // if (pRsp) pRsp->code = terrno; + // goto _err; + // } + + tDecoderClear(&coder); + vDebug("vgId:%d, success to process expWnds clear for tsma %" PRIi64 " version %" PRIi64, TD_VID(pVnode), + req.indexUid, req.version); + return 0; + +_err: + tDecoderClear(&coder); + vError("vgId:%d, success to process expWnds clear for tsma %" PRIi64 " version %" PRIi64 " since %s", TD_VID(pVnode), + req.indexUid, req.version, terrstr()); + return -1; +} \ No newline at end of file