fix:add excluded msg for delete in tmq

This commit is contained in:
wangmm0220 2024-03-04 14:58:13 +08:00
parent d62b82c295
commit c7d115d8aa
8 changed files with 19 additions and 3 deletions

View File

@ -3813,8 +3813,10 @@ typedef struct {
uint32_t phyLen;
char* sql;
char* msg;
int8_t source;
} SVDeleteReq;
extern int8_t deleteFromTaosx;
int32_t tSerializeSVDeleteReq(void* buf, int32_t bufLen, SVDeleteReq* pReq);
int32_t tDeserializeSVDeleteReq(void* buf, int32_t bufLen, SVDeleteReq* pReq);
@ -3834,6 +3836,7 @@ typedef struct SDeleteRes {
char tableFName[TSDB_TABLE_NAME_LEN];
char tsColName[TSDB_COL_NAME_LEN];
int64_t ctimeMs; // fill by vnode
int8_t source;
} SDeleteRes;
int32_t tEncodeDeleteRes(SEncoder* pCoder, const SDeleteRes* pRes);

View File

@ -1256,7 +1256,9 @@ static int32_t taosDeleteData(TAOS* taos, void* meta, int32_t metaLen) {
snprintf(sql, sizeof(sql), "delete from `%s` where `%s` >= %" PRId64 " and `%s` <= %" PRId64, req.tableFName,
req.tsColName, req.skey, req.tsColName, req.ekey);
deleteFromTaosx = TD_REQ_FROM_TAOX;
TAOS_RES* res = taos_query(taos, sql);
deleteFromTaosx = TD_REQ_FROM_APP;
SRequestObj* pRequest = (SRequestObj*)res;
code = pRequest->code;
if (code == TSDB_CODE_PAR_TABLE_NOT_EXIST || code == TSDB_CODE_PAR_GET_META_ERROR) {

View File

@ -7148,6 +7148,7 @@ int32_t tDecodeSVDropTSmaReq(SDecoder *pCoder, SVDropTSmaReq *pReq) {
return 0;
}
int8_t deleteFromTaosx = TD_REQ_FROM_APP;
int32_t tSerializeSVDeleteReq(void *buf, int32_t bufLen, SVDeleteReq *pReq) {
int32_t headLen = sizeof(SMsgHead);
if (buf != NULL) {
@ -7165,6 +7166,7 @@ int32_t tSerializeSVDeleteReq(void *buf, int32_t bufLen, SVDeleteReq *pReq) {
if (tEncodeU32(&encoder, pReq->sqlLen) < 0) return -1;
if (tEncodeCStr(&encoder, pReq->sql) < 0) return -1;
if (tEncodeBinary(&encoder, pReq->msg, pReq->phyLen) < 0) return -1;
if (tEncodeI8(&encoder, pReq->source) < 0) return -1;
tEndEncode(&encoder);
int32_t tlen = encoder.pos;
@ -7201,6 +7203,9 @@ int32_t tDeserializeSVDeleteReq(void *buf, int32_t bufLen, SVDeleteReq *pReq) {
if (tDecodeBinaryAlloc(&decoder, (void **)&pReq->msg, &msgLen) < 0) return -1;
pReq->phyLen = msgLen;
if (!tDecodeIsEnd(&decoder)) {
if (tDecodeI8(&decoder, &pReq->source) < 0) return -1;
}
tEndDecode(&decoder);
tDecoderClear(&decoder);
@ -8400,6 +8405,7 @@ int32_t tEncodeDeleteRes(SEncoder *pCoder, const SDeleteRes *pRes) {
if (tEncodeCStr(pCoder, pRes->tableFName) < 0) return -1;
if (tEncodeCStr(pCoder, pRes->tsColName) < 0) return -1;
if (tEncodeI64(pCoder, pRes->ctimeMs) < 0) return -1;
if (tEncodeI8(pCoder, pRes->source) < 0) return -1;
return 0;
}
@ -8424,6 +8430,9 @@ int32_t tDecodeDeleteRes(SDecoder *pCoder, SDeleteRes *pRes) {
if (!tDecodeIsEnd(pCoder)) {
if (tDecodeI64(pCoder, &pRes->ctimeMs) < 0) return -1;
}
if (!tDecodeIsEnd(pCoder)) {
if (tDecodeI8(pCoder, &pRes->source) < 0) return -1;
}
return 0;
}

View File

@ -263,8 +263,7 @@ static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle,
} else if (pHead->msgType == TDMT_VND_CREATE_STB || pHead->msgType == TDMT_VND_ALTER_STB) {
PROCESS_EXCLUDED_MSG(SVCreateStbReq, tDecodeSVCreateStbReq)
} else if (pHead->msgType == TDMT_VND_DELETE) {
fetchVer++;
continue;
PROCESS_EXCLUDED_MSG(SDeleteRes, tDecodeDeleteRes)
}
}

View File

@ -715,6 +715,7 @@ int32_t qWorkerProcessDeleteMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, SD
uint64_t tId = req.taskId;
int64_t rId = 0;
int32_t eId = -1;
pRes->source = req.source;
SQWMsg qwMsg = {.node = node, .msg = req.msg, .msgLen = req.phyLen, .connInfo = pMsg->info};
QW_SCH_TASK_DLOG("processDelete start, node:%p, handle:%p, sql:%s", node, pMsg->info.handle, req.sql);

View File

@ -1086,6 +1086,7 @@ int32_t schBuildAndSendMsg(SSchJob *pJob, SSchTask *pTask, SQueryNodeAddr *addr,
req.sqlLen = strlen(pJob->sql);
req.sql = (char *)pJob->sql;
req.msg = pTask->msg;
req.source = deleteFromTaosx;
msgSize = tSerializeSVDeleteReq(NULL, 0, &req);
msg = taosMemoryCalloc(1, msgSize);
if (NULL == msg) {

View File

@ -117,7 +117,7 @@ echo "supportVnodes 1024" >> $TAOS_CFG
echo "statusInterval 1" >> $TAOS_CFG
echo "dataDir $DATA_DIR" >> $TAOS_CFG
echo "logDir $LOG_DIR" >> $TAOS_CFG
echo "debugFlag 0" >> $TAOS_CFG
echo "debugFlag 135" >> $TAOS_CFG
echo "tmrDebugFlag 131" >> $TAOS_CFG
echo "uDebugFlag 143" >> $TAOS_CFG
echo "rpcDebugFlag 143" >> $TAOS_CFG

View File

@ -574,6 +574,7 @@ tmq_t* build_consumer() {
tmq_conf_set(conf, "msg.with.table.name", "true");
tmq_conf_set(conf, "enable.auto.commit", "true");
tmq_conf_set(conf, "auto.offset.reset", "earliest");
tmq_conf_set(conf, "msg.consume.excluded", "1");
if (g_conf.snapShot) {
tmq_conf_set(conf, "experimental.snapshot.enable", "true");