enh: rsma delete logic
This commit is contained in:
parent
572eb691fb
commit
ff3621f382
|
@ -160,7 +160,7 @@ int32_t tqResetStreamTaskStatus(STQ* pTq);
|
|||
int32_t tqStopStreamTasks(STQ* pTq);
|
||||
|
||||
// tq util
|
||||
int32_t extractDelDataBlock(const void* pData, int32_t len, int64_t ver, void** pRefBlock, int8_t type);
|
||||
int32_t extractDelDataBlock(const void* pData, int32_t len, int64_t ver, void** pRefBlock, int32_t type);
|
||||
int32_t tqExtractDataForMq(STQ* pTq, STqHandle* pHandle, const SMqPollReq* pRequest, SRpcMsg* pMsg);
|
||||
int32_t tqDoSendDataRsp(const SRpcHandleInfo* pRpcHandleInfo, const SMqDataRsp* pRsp, int32_t epoch, int64_t consumerId,
|
||||
int32_t type, int64_t sver, int64_t ever);
|
||||
|
|
|
@ -17,13 +17,17 @@
|
|||
#include "tq.h"
|
||||
#include "tstream.h"
|
||||
|
||||
#define RSMA_QTASKEXEC_SMOOTH_SIZE (100) // cnt
|
||||
#define RSMA_SUBMIT_BATCH_SIZE (1024) // cnt
|
||||
#define RSMA_FETCH_DELAY_MAX (120000) // ms
|
||||
#define RSMA_FETCH_ACTIVE_MAX (1000) // ms
|
||||
#define RSMA_FETCH_INTERVAL (5000) // ms
|
||||
#define RSMA_SUBMIT_HEAD_LEN (13) // type(int8_t) + len(int32_t) + version(int64_t)
|
||||
#define RSMA_TASK_FLAG "rsma"
|
||||
#define RSMA_EXEC_SMOOTH_SIZE (100) // cnt
|
||||
#define RSMA_EXEC_BATCH_SIZE (1024) // cnt
|
||||
#define RSMA_FETCH_DELAY_MAX (120000) // ms
|
||||
#define RSMA_FETCH_ACTIVE_MAX (1000) // ms
|
||||
#define RSMA_FETCH_INTERVAL (5000) // ms
|
||||
#define RSMA_EXEC_MSG_HLEN (13) // type(int8_t) + len(int32_t) + version(int64_t)
|
||||
#define RSMA_EXEC_MSG_TYPE(msg) (*(int8_t *)(msg))
|
||||
#define RSMA_EXEC_MSG_LEN(msg) (*(int32_t *)POINTER_SHIFT((msg), sizeof(int8_t)))
|
||||
#define RSMA_EXEC_MSG_VER(msg) (*(int64_t *)POINTER_SHIFT((msg), sizeof(int8_t) + sizeof(int32_t)))
|
||||
#define RSMA_EXEC_MSG_BODY(msg) (POINTER_SHIFT((msg), sizeof(int8_t) + sizeof(int32_t) + sizeof(int64_t)))
|
||||
#define RSMA_TASK_FLAG "rsma"
|
||||
|
||||
#define RSMA_NEED_FETCH(r) (RSMA_INFO_ITEM((r), 0)->fetchLevel || RSMA_INFO_ITEM((r), 1)->fetchLevel)
|
||||
|
||||
|
@ -723,7 +727,7 @@ _exit:
|
|||
*/
|
||||
static int32_t tdExecuteRSmaImplAsync(SSma *pSma, int64_t version, const void *pMsg, int32_t len, int32_t inputType,
|
||||
SRSmaInfo *pInfo, tb_uid_t suid) {
|
||||
int32_t size = RSMA_SUBMIT_HEAD_LEN + len; // header + payload
|
||||
int32_t size = RSMA_EXEC_MSG_HLEN + len; // header + payload
|
||||
void *qItem = taosAllocateQitem(size, DEF_QITEM, 0);
|
||||
|
||||
if (!qItem) {
|
||||
|
@ -752,7 +756,7 @@ static int32_t tdExecuteRSmaImplAsync(SSma *pSma, int64_t version, const void *p
|
|||
}
|
||||
|
||||
// smoothing consume
|
||||
int32_t n = nItems / RSMA_QTASKEXEC_SMOOTH_SIZE;
|
||||
int32_t n = nItems / RSMA_EXEC_SMOOTH_SIZE;
|
||||
if (n > 1) {
|
||||
if (n > 10) {
|
||||
n = 10;
|
||||
|
@ -1379,7 +1383,7 @@ static void tdFreeRSmaSubmitItems(SArray *pItems, int32_t type) {
|
|||
if (type == STREAM_INPUT__MERGED_SUBMIT) {
|
||||
for (int32_t i = 0; i < arrSize; ++i) {
|
||||
SPackedData *packData = TARRAY_GET_ELEM(pItems, i);
|
||||
taosFreeQitem(POINTER_SHIFT(packData->msgStr, -RSMA_SUBMIT_HEAD_LEN));
|
||||
taosFreeQitem(POINTER_SHIFT(packData->msgStr, -RSMA_EXEC_MSG_HLEN));
|
||||
}
|
||||
} else if (type == STREAM_INPUT__REF_DATA_BLOCK) {
|
||||
for (int32_t i = 0; i < arrSize; ++i) {
|
||||
|
@ -1451,11 +1455,6 @@ _err:
|
|||
return TSDB_CODE_FAILED;
|
||||
}
|
||||
|
||||
#define RSMA_SUBMIT_MSG_TYPE(msg) (*(int8_t *)(msg))
|
||||
#define RSMA_SUBMIT_MSG_LEN(msg) (*(int32_t *)POINTER_SHIFT((msg), sizeof(int8_t)))
|
||||
#define RSMA_SUBMIT_MSG_VER(msg) (*(int64_t *)POINTER_SHIFT((msg), sizeof(int8_t) + sizeof(int32_t)))
|
||||
#define RSMA_SUBMIT_MSG_BODY(msg) (POINTER_SHIFT((msg), sizeof(int8_t) + sizeof(int32_t) + sizeof(int64_t)))
|
||||
|
||||
static int32_t tdRSmaBatchExec(SSma *pSma, SRSmaInfo *pInfo, STaosQall *qall, SArray *pSubmitArr, ERsmaExecType type) {
|
||||
void *msg = NULL;
|
||||
int8_t resume = 0;
|
||||
|
@ -1470,16 +1469,16 @@ static int32_t tdRSmaBatchExec(SSma *pSma, SRSmaInfo *pInfo, STaosQall *qall, SA
|
|||
while (1) {
|
||||
taosGetQitem(qall, (void **)&msg);
|
||||
if (msg) {
|
||||
int8_t inputType = RSMA_SUBMIT_MSG_TYPE(msg);
|
||||
int8_t inputType = RSMA_EXEC_MSG_TYPE(msg);
|
||||
if (inputType == STREAM_INPUT__DATA_SUBMIT) {
|
||||
if (nDelete > 0) {
|
||||
resume = 1;
|
||||
break;
|
||||
}
|
||||
_resume_submit:
|
||||
packData.msgLen = RSMA_SUBMIT_MSG_LEN(msg);
|
||||
packData.ver = RSMA_SUBMIT_MSG_VER(msg);
|
||||
packData.msgStr = RSMA_SUBMIT_MSG_BODY(msg);
|
||||
packData.msgLen = RSMA_EXEC_MSG_LEN(msg);
|
||||
packData.ver = RSMA_EXEC_MSG_VER(msg);
|
||||
packData.msgStr = RSMA_EXEC_MSG_BODY(msg);
|
||||
if (!taosArrayPush(pSubmitArr, &packData)) {
|
||||
taosFreeQitem(msg);
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
|
@ -1492,8 +1491,11 @@ static int32_t tdRSmaBatchExec(SSma *pSma, SRSmaInfo *pInfo, STaosQall *qall, SA
|
|||
break;
|
||||
}
|
||||
_resume_delete:
|
||||
extractDelDataBlock(RSMA_SUBMIT_MSG_BODY(msg), RSMA_SUBMIT_MSG_LEN(msg), RSMA_SUBMIT_MSG_VER(msg),
|
||||
&packData.pDataBlock, 1);
|
||||
if ((terrno = extractDelDataBlock(RSMA_EXEC_MSG_BODY(msg), RSMA_EXEC_MSG_LEN(msg), RSMA_EXEC_MSG_VER(msg),
|
||||
&packData.pDataBlock, 1))) {
|
||||
taosFreeQitem(msg);
|
||||
goto _err;
|
||||
}
|
||||
if (!taosArrayPush(pSubmitArr, &packData)) {
|
||||
taosFreeQitem(msg);
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
|
@ -1518,17 +1520,17 @@ static int32_t tdRSmaBatchExec(SSma *pSma, SRSmaInfo *pInfo, STaosQall *qall, SA
|
|||
}
|
||||
tdFreeRSmaSubmitItems(pSubmitArr, inputType);
|
||||
}
|
||||
} else {
|
||||
goto _rtn;
|
||||
}
|
||||
|
||||
if (resume == 0) {
|
||||
goto _rtn;
|
||||
} else if (resume == 1) {
|
||||
if (resume == 1) {
|
||||
nSubmit = 0;
|
||||
nDelete = 0;
|
||||
resume = 0;
|
||||
tdFreeRSmaSubmitItems(pSubmitArr, STREAM_INPUT__REF_DATA_BLOCK);
|
||||
goto _resume_submit;
|
||||
} else {
|
||||
} else if (resume == 2) {
|
||||
nSubmit = 0;
|
||||
nDelete = 0;
|
||||
resume = 0;
|
||||
|
@ -1580,7 +1582,7 @@ int32_t tdRSmaProcessExecImpl(SSma *pSma, ERsmaExecType type) {
|
|||
}
|
||||
|
||||
if (!(pSubmitArr =
|
||||
taosArrayInit(TMIN(RSMA_SUBMIT_BATCH_SIZE, atomic_load_64(&pRSmaStat->nBufItems)), sizeof(SPackedData)))) {
|
||||
taosArrayInit(TMIN(RSMA_EXEC_BATCH_SIZE, atomic_load_64(&pRSmaStat->nBufItems)), sizeof(SPackedData)))) {
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
}
|
||||
|
|
|
@ -399,7 +399,7 @@ int32_t tqDoSendDataRsp(const SRpcHandleInfo* pRpcHandleInfo, const SMqDataRsp*
|
|||
return 0;
|
||||
}
|
||||
|
||||
int32_t extractDelDataBlock(const void* pData, int32_t len, int64_t ver, void** pRefBlock, int8_t type) {
|
||||
int32_t extractDelDataBlock(const void* pData, int32_t len, int64_t ver, void** pRefBlock, int32_t type) {
|
||||
SDecoder* pCoder = &(SDecoder){0};
|
||||
SDeleteRes* pRes = &(SDeleteRes){0};
|
||||
|
||||
|
@ -452,8 +452,10 @@ int32_t extractDelDataBlock(const void* pData, int32_t len, int64_t ver, void**
|
|||
|
||||
((SStreamRefDataBlock*)(*pRefBlock))->type = STREAM_INPUT__REF_DATA_BLOCK;
|
||||
((SStreamRefDataBlock*)(*pRefBlock))->pBlock = pDelBlock;
|
||||
} else {
|
||||
} else if (type == 1) {
|
||||
*pRefBlock = pDelBlock;
|
||||
} else {
|
||||
ASSERTS(0, "unknown type:%d", type);
|
||||
}
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
|
|
Loading…
Reference in New Issue