enh: support delete tsma interval
This commit is contained in:
parent
72cdca8570
commit
4097b860d3
|
@ -29,7 +29,7 @@ int32_t tdProcessTSmaInsert(SSma *pSma, int64_t indexUid, const char *msg) {
|
|||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
|
||||
if ((code = tdProcessTSmaInsertImpl(pSma, indexUid, msg)) < 0) {
|
||||
smaWarn("vgId:%d, insert tsma data failed since %s", SMA_VID(pSma), tstrerror(terrno));
|
||||
smaError("vgId:%d, insert tsma data failed since %s", SMA_VID(pSma), tstrerror(terrno));
|
||||
}
|
||||
|
||||
return code;
|
||||
|
@ -346,6 +346,43 @@ _end:
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static int32_t tsmaProcessDelReq(SSma *pSma, int64_t indexUid, SBatchDeleteReq *pDelReq) {
|
||||
int32_t code = 0;
|
||||
int32_t lino = 0;
|
||||
|
||||
if (taosArrayGetSize(pDelReq->deleteReqs) > 0) {
|
||||
int32_t len = 0;
|
||||
tEncodeSize(tEncodeSBatchDeleteReq, pDelReq, len, code);
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
|
||||
void *pBuf = rpcMallocCont(len + sizeof(SMsgHead));
|
||||
if (!pBuf) {
|
||||
code = terrno;
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
}
|
||||
|
||||
SEncoder encoder;
|
||||
tEncoderInit(&encoder, POINTER_SHIFT(pBuf, sizeof(SMsgHead)), len);
|
||||
tEncodeSBatchDeleteReq(&encoder, pDelReq);
|
||||
tEncoderClear(&encoder);
|
||||
|
||||
((SMsgHead *)pBuf)->vgId = TD_VID(pSma->pVnode);
|
||||
|
||||
SRpcMsg delMsg = {.msgType = TDMT_VND_BATCH_DEL, .pCont = pBuf, .contLen = len + sizeof(SMsgHead)};
|
||||
code = tmsgPutToQueue(&pSma->pVnode->msgCb, WRITE_QUEUE, &delMsg);
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
}
|
||||
|
||||
_exit:
|
||||
taosArrayDestroy(pDelReq->deleteReqs);
|
||||
if (code) {
|
||||
smaError("vgId:%d, failed to process delete req for smaIndex %" PRIi64 " since %s", SMA_VID(pSma), indexUid,
|
||||
tstrerror(code));
|
||||
}
|
||||
|
||||
return code;
|
||||
}
|
||||
|
||||
/**
|
||||
* @brief Insert/Update Time-range-wise SMA data.
|
||||
*
|
||||
|
@ -355,7 +392,6 @@ _end:
|
|||
*/
|
||||
static int32_t tdProcessTSmaInsertImpl(SSma *pSma, int64_t indexUid, const char *msg) {
|
||||
const SArray *pDataBlocks = (const SArray *)msg;
|
||||
// TODO: destroy SSDataBlocks(msg)
|
||||
if (!pDataBlocks) {
|
||||
terrno = TSDB_CODE_TSMA_INVALID_PTR;
|
||||
smaWarn("vgId:%d, insert tsma data failed since pDataBlocks is NULL", SMA_VID(pSma));
|
||||
|
@ -419,8 +455,10 @@ static int32_t tdProcessTSmaInsertImpl(SSma *pSma, int64_t indexUid, const char
|
|||
goto _err;
|
||||
}
|
||||
|
||||
// TODO deleteReq
|
||||
taosArrayDestroy(deleteReq.deleteReqs);
|
||||
if ((terrno = tsmaProcessDelReq(pSma, indexUid, &deleteReq)) != 0) {
|
||||
goto _err;
|
||||
}
|
||||
|
||||
#if 0
|
||||
if (!strncasecmp("td.tsma.rst.tb", pTsmaStat->pTSma->dstTbName, 14)) {
|
||||
terrno = TSDB_CODE_APP_ERROR;
|
||||
|
|
|
@ -593,9 +593,7 @@ int32_t vnodeProcessFetchMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo) {
|
|||
}
|
||||
}
|
||||
|
||||
// TODO: remove the function
|
||||
void smaHandleRes(void *pVnode, int64_t smaId, const SArray *data) {
|
||||
// TODO
|
||||
// blockDebugShowDataBlocks(data, __func__);
|
||||
tdProcessTSmaInsert(((SVnode *)pVnode)->pSma, smaId, (const char *)data);
|
||||
}
|
||||
|
|
|
@ -414,6 +414,21 @@ if $data05 != 30.000000000 then
|
|||
return -1
|
||||
endi
|
||||
|
||||
system sh/exec.sh -n dnode1 -s stop -x SIGINT
|
||||
sql delete from stb;
|
||||
|
||||
print =============== query after delete in common vgroups
|
||||
sql select _wstart, _wend, min(c1),max(c2),max(c1),max(c3) from stb interval(5m,10s) sliding(5m) order by _wstart;
|
||||
if $rows != 0 then
|
||||
print rows $rows != 0
|
||||
return -1
|
||||
endi
|
||||
|
||||
sleep 2000
|
||||
print =============== query after delete in designated vgroups
|
||||
sql select _wend, min(c1),max(c2),max(c1) from stb interval(5m,10s) sliding(5m) order by _wstart;
|
||||
if $rows != 0 then
|
||||
print rows $rows != 0
|
||||
return -1
|
||||
endi
|
||||
|
||||
system sh/exec.sh -n dnode1 -s stop -x SIGINT
|
Loading…
Reference in New Issue