fix:add excluded msg for delete in tmq

This commit is contained in:
wangmm0220 2024-03-05 11:25:43 +08:00
parent 5d2d8d5f48
commit af6d6b69c3
10 changed files with 17 additions and 14 deletions

View File

@ -3816,7 +3816,6 @@ typedef struct {
int8_t source;
} SVDeleteReq;
extern int8_t deleteFromTaosx;
int32_t tSerializeSVDeleteReq(void* buf, int32_t bufLen, SVDeleteReq* pReq);
int32_t tDeserializeSVDeleteReq(void* buf, int32_t bufLen, SVDeleteReq* pReq);

View File

@ -78,6 +78,7 @@ typedef struct SSchedulerReq {
void* chkKillParam;
SExecResult* pExecRes;
void** pFetchRes;
int8_t source;
} SSchedulerReq;
int32_t schedulerInit(void);

View File

@ -284,6 +284,7 @@ typedef struct SRequestObj {
void* pWrapper;
SMetaData parseMeta;
char* effectiveUser;
int8_t source;
} SRequestObj;
typedef struct SSyncQueryParam {
@ -306,10 +307,10 @@ void doFreeReqResultInfo(SReqResultInfo* pResInfo);
int32_t transferTableNameList(const char* tbList, int32_t acctId, char* dbName, SArray** pReq);
void syncCatalogFn(SMetaData* pResult, void* param, int32_t code);
TAOS_RES* taosQueryImpl(TAOS* taos, const char* sql, bool validateOnly);
TAOS_RES* taosQueryImpl(TAOS* taos, const char* sql, bool validateOnly, int8_t source);
TAOS_RES* taosQueryImplWithReqid(TAOS* taos, const char* sql, bool validateOnly, int64_t reqid);
void taosAsyncQueryImpl(uint64_t connId, const char* sql, __taos_async_fn_t fp, void* param, bool validateOnly);
void taosAsyncQueryImpl(uint64_t connId, const char* sql, __taos_async_fn_t fp, void* param, bool validateOnly, int8_t source);
void taosAsyncQueryImplWithReqid(uint64_t connId, const char* sql, __taos_async_fn_t fp, void* param, bool validateOnly,
int64_t reqid);
void taosAsyncFetchImpl(SRequestObj *pRequest, __taos_async_fn_t fp, void *param);

View File

@ -729,6 +729,7 @@ int32_t scheduleQuery(SRequestObj* pRequest, SQueryPlan* pDag, SArray* pNodeList
.chkKillFp = chkRequestKilled,
.chkKillParam = (void*)pRequest->self,
.pExecRes = &res,
.source = pRequest->source,
};
int32_t code = schedulerExecJob(&req, &pRequest->body.queryJob);
@ -1198,6 +1199,7 @@ static int32_t asyncExecSchQuery(SRequestObj* pRequest, SQuery* pQuery, SMetaDat
.chkKillFp = chkRequestKilled,
.chkKillParam = (void*)pRequest->self,
.pExecRes = NULL,
.source = pRequest->source,
};
code = schedulerExecJob(&req, &pRequest->body.queryJob);
taosArrayDestroy(pNodeList);
@ -2461,7 +2463,7 @@ void syncQueryFn(void* param, void* res, int32_t code) {
tsem_post(&pParam->sem);
}
void taosAsyncQueryImpl(uint64_t connId, const char* sql, __taos_async_fn_t fp, void* param, bool validateOnly) {
void taosAsyncQueryImpl(uint64_t connId, const char* sql, __taos_async_fn_t fp, void* param, bool validateOnly, int8_t source) {
if (sql == NULL || NULL == fp) {
terrno = TSDB_CODE_INVALID_PARA;
if (fp) {
@ -2487,6 +2489,7 @@ void taosAsyncQueryImpl(uint64_t connId, const char* sql, __taos_async_fn_t fp,
return;
}
pRequest->source = source;
pRequest->body.queryFp = fp;
doAsyncQuery(pRequest, false);
}
@ -2521,7 +2524,7 @@ void taosAsyncQueryImplWithReqid(uint64_t connId, const char* sql, __taos_async_
doAsyncQuery(pRequest, false);
}
TAOS_RES* taosQueryImpl(TAOS* taos, const char* sql, bool validateOnly) {
TAOS_RES* taosQueryImpl(TAOS* taos, const char* sql, bool validateOnly, int8_t source) {
if (NULL == taos) {
terrno = TSDB_CODE_TSC_DISCONNECTED;
return NULL;
@ -2536,7 +2539,7 @@ TAOS_RES* taosQueryImpl(TAOS* taos, const char* sql, bool validateOnly) {
}
tsem_init(&param->sem, 0, 0);
taosAsyncQueryImpl(*(int64_t*)taos, sql, syncQueryFn, param, validateOnly);
taosAsyncQueryImpl(*(int64_t*)taos, sql, syncQueryFn, param, validateOnly, source);
tsem_wait(&param->sem);
SRequestObj* pRequest = NULL;

View File

@ -401,7 +401,7 @@ TAOS_FIELD *taos_fetch_fields(TAOS_RES *res) {
return pResInfo->userFields;
}
TAOS_RES *taos_query(TAOS *taos, const char *sql) { return taosQueryImpl(taos, sql, false); }
TAOS_RES *taos_query(TAOS *taos, const char *sql) { return taosQueryImpl(taos, sql, false, TD_REQ_FROM_APP); }
TAOS_RES *taos_query_with_reqid(TAOS *taos, const char *sql, int64_t reqid) {
return taosQueryImplWithReqid(taos, sql, false, reqid);
}
@ -827,7 +827,7 @@ int *taos_get_column_data_offset(TAOS_RES *res, int columnIndex) {
}
int taos_validate_sql(TAOS *taos, const char *sql) {
TAOS_RES *pObj = taosQueryImpl(taos, sql, true);
TAOS_RES *pObj = taosQueryImpl(taos, sql, true, TD_REQ_FROM_APP);
int code = taos_errno(pObj);
@ -1125,7 +1125,7 @@ void continueInsertFromCsv(SSqlCallbackWrapper *pWrapper, SRequestObj *pRequest)
void taos_query_a(TAOS *taos, const char *sql, __taos_async_fn_t fp, void *param) {
int64_t connId = *(int64_t *)taos;
tscDebug("taos_query_a start with sql:%s", sql);
taosAsyncQueryImpl(connId, sql, fp, param, false);
taosAsyncQueryImpl(connId, sql, fp, param, false, TD_REQ_FROM_APP);
tscDebug("taos_query_a end with sql:%s", sql);
}

View File

@ -1256,9 +1256,7 @@ static int32_t taosDeleteData(TAOS* taos, void* meta, int32_t metaLen) {
snprintf(sql, sizeof(sql), "delete from `%s` where `%s` >= %" PRId64 " and `%s` <= %" PRId64, req.tableFName,
req.tsColName, req.skey, req.tsColName, req.ekey);
deleteFromTaosx = TD_REQ_FROM_TAOX;
TAOS_RES* res = taos_query(taos, sql);
deleteFromTaosx = TD_REQ_FROM_APP;
TAOS_RES* res = taosQueryImpl(taos, sql, false, TD_REQ_FROM_TAOX);
SRequestObj* pRequest = (SRequestObj*)res;
code = pRequest->code;
if (code == TSDB_CODE_PAR_TABLE_NOT_EXIST || code == TSDB_CODE_PAR_GET_META_ERROR) {

View File

@ -7148,7 +7148,6 @@ int32_t tDecodeSVDropTSmaReq(SDecoder *pCoder, SVDropTSmaReq *pReq) {
return 0;
}
int8_t deleteFromTaosx = TD_REQ_FROM_APP;
int32_t tSerializeSVDeleteReq(void *buf, int32_t bufLen, SVDeleteReq *pReq) {
int32_t headLen = sizeof(SMsgHead);
if (buf != NULL) {

View File

@ -304,6 +304,7 @@ typedef struct SSchJob {
SSchResInfo userRes;
char *sql;
SQueryProfileSummary summary;
int8_t source;
} SSchJob;
typedef struct SSchTaskCtx {

View File

@ -746,6 +746,7 @@ int32_t schInitJob(int64_t *pJobId, SSchedulerReq *pReq) {
pJob->chkKillParam = pReq->chkKillParam;
pJob->userRes.execFp = pReq->execFp;
pJob->userRes.cbParam = pReq->cbParam;
pJob->source = pReq->source;
if (pReq->pNodeList == NULL || taosArrayGetSize(pReq->pNodeList) <= 0) {
qDebug("QID:0x%" PRIx64 " input exec nodeList is empty", pReq->pDag->queryId);

View File

@ -1086,7 +1086,7 @@ int32_t schBuildAndSendMsg(SSchJob *pJob, SSchTask *pTask, SQueryNodeAddr *addr,
req.sqlLen = strlen(pJob->sql);
req.sql = (char *)pJob->sql;
req.msg = pTask->msg;
req.source = deleteFromTaosx;
req.source = pJob->source;
msgSize = tSerializeSVDeleteReq(NULL, 0, &req);
msg = taosMemoryCalloc(1, msgSize);
if (NULL == msg) {