Merge pull request #18768 from taosdata/fix/TD-20978
fix: client retry insert based on some error code
This commit is contained in:
commit
b9447601fe
|
@ -796,9 +796,10 @@ static void doAsyncQueryFromParse(SMetaData *pResultMeta, void *param, int32_t c
|
|||
SQuery *pQuery = pRequest->pQuery;
|
||||
|
||||
pRequest->metric.ctgEnd = taosGetTimestampUs();
|
||||
qDebug("0x%" PRIx64 " start to continue parse, reqId:0x%" PRIx64, pRequest->self, pRequest->requestId);
|
||||
qDebug("0x%" PRIx64 " start to continue parse, reqId:0x%" PRIx64 ", code:%s", pRequest->self, pRequest->requestId, tstrerror(code));
|
||||
|
||||
if (code == TSDB_CODE_SUCCESS) {
|
||||
pWrapper->pCatalogReq->forceUpdate = false;
|
||||
code = qContinueParseSql(pWrapper->pParseCtx, pWrapper->pCatalogReq, pResultMeta, pQuery);
|
||||
}
|
||||
|
||||
|
|
|
@ -547,6 +547,14 @@ typedef struct SCtgOperation {
|
|||
#define ctgDebug(param, ...) qDebug("CTG:%p " param, pCtg, __VA_ARGS__)
|
||||
#define ctgTrace(param, ...) qTrace("CTG:%p " param, pCtg, __VA_ARGS__)
|
||||
|
||||
#define ctgTaskFatal(param, ...) qFatal("QID:%" PRIx64 " CTG:%p " param, pTask->pJob->queryId, pCtg, __VA_ARGS__)
|
||||
#define ctgTaskError(param, ...) qError("QID:%" PRIx64 " CTG:%p " param, pTask->pJob->queryId, pCtg, __VA_ARGS__)
|
||||
#define ctgTaskWarn(param, ...) qWarn("QID:%" PRIx64 " CTG:%p " param, pTask->pJob->queryId, pCtg, __VA_ARGS__)
|
||||
#define ctgTaskInfo(param, ...) qInfo("QID:%" PRIx64 " CTG:%p " param, pTask->pJob->queryId, pCtg, __VA_ARGS__)
|
||||
#define ctgTaskDebug(param, ...) qDebug("QID:%" PRIx64 " CTG:%p " param, pTask->pJob->queryId, pCtg, __VA_ARGS__)
|
||||
#define ctgTaskTrace(param, ...) qTrace("QID:%" PRIx64 " CTG:%p " param, pTask->pJob->queryId, pCtg, __VA_ARGS__)
|
||||
|
||||
|
||||
#define CTG_LOCK_DEBUG(...) \
|
||||
do { \
|
||||
if (gCTGDebug.lockEnable) { \
|
||||
|
|
|
@ -1094,6 +1094,9 @@ _return:
|
|||
ctgReleaseVgInfoToCache(pCtg, dbCache);
|
||||
}
|
||||
|
||||
if (code) {
|
||||
ctgTaskError("Get table %d.%s.%s meta failed with error %s", pName->acctId, pName->dbname, pName->tname, tstrerror(code));
|
||||
}
|
||||
if (pTask->res || code) {
|
||||
ctgHandleTaskEnd(pTask, code);
|
||||
}
|
||||
|
@ -1124,7 +1127,7 @@ int32_t ctgHandleGetTbMetasRsp(SCtgTaskReq* tReq, int32_t reqType, const SDataBu
|
|||
SVgroupInfo vgInfo = {0};
|
||||
CTG_ERR_JRET(ctgGetVgInfoFromHashValue(pCtg, pOut->dbVgroup, pName, &vgInfo));
|
||||
|
||||
ctgDebug("will refresh tbmeta, not supposed to be stb, tbName:%s, flag:%d", tNameGetTableName(pName), flag);
|
||||
ctgTaskDebug("will refresh tbmeta, not supposed to be stb, tbName:%s, flag:%d", tNameGetTableName(pName), flag);
|
||||
|
||||
*vgId = vgInfo.vgId;
|
||||
CTG_ERR_JRET(ctgGetTbMetaFromVnode(pCtg, pConn, pName, &vgInfo, NULL, tReq));
|
||||
|
@ -1144,7 +1147,7 @@ int32_t ctgHandleGetTbMetasRsp(SCtgTaskReq* tReq, int32_t reqType, const SDataBu
|
|||
SVgroupInfo vgInfo = {0};
|
||||
CTG_ERR_JRET(ctgGetVgInfoFromHashValue(pCtg, dbCache->vgCache.vgInfo, pName, &vgInfo));
|
||||
|
||||
ctgDebug("will refresh tbmeta, supposed to be stb, tbName:%s, flag:%d", tNameGetTableName(pName), flag);
|
||||
ctgTaskDebug("will refresh tbmeta, supposed to be stb, tbName:%s, flag:%d", tNameGetTableName(pName), flag);
|
||||
|
||||
*vgId = vgInfo.vgId;
|
||||
CTG_ERR_JRET(ctgGetTbMetaFromVnode(pCtg, pConn, pName, &vgInfo, NULL, tReq));
|
||||
|
@ -1162,7 +1165,7 @@ int32_t ctgHandleGetTbMetasRsp(SCtgTaskReq* tReq, int32_t reqType, const SDataBu
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
ctgError("no tbmeta got, tbName:%s", tNameGetTableName(pName));
|
||||
ctgTaskError("no tbmeta got, tbName:%s", tNameGetTableName(pName));
|
||||
ctgRemoveTbMetaFromCache(pCtg, pName, false);
|
||||
|
||||
CTG_ERR_JRET(CTG_ERR_CODE_TABLE_NOT_EXIST);
|
||||
|
@ -1180,7 +1183,7 @@ int32_t ctgHandleGetTbMetasRsp(SCtgTaskReq* tReq, int32_t reqType, const SDataBu
|
|||
STableMetaOutput* pOut = (STableMetaOutput*)pMsgCtx->out;
|
||||
|
||||
if (CTG_IS_META_NULL(pOut->metaType)) {
|
||||
ctgError("no tbmeta got, tbNmae:%s", tNameGetTableName(pName));
|
||||
ctgTaskError("no tbmeta got, tbNmae:%s", tNameGetTableName(pName));
|
||||
ctgRemoveTbMetaFromCache(pCtg, pName, false);
|
||||
CTG_ERR_JRET(CTG_ERR_CODE_TABLE_NOT_EXIST);
|
||||
}
|
||||
|
@ -1190,7 +1193,7 @@ int32_t ctgHandleGetTbMetasRsp(SCtgTaskReq* tReq, int32_t reqType, const SDataBu
|
|||
}
|
||||
|
||||
if (CTG_IS_META_TABLE(pOut->metaType) && TSDB_SUPER_TABLE == pOut->tbMeta->tableType) {
|
||||
ctgDebug("will continue to refresh tbmeta since got stb, tbName:%s", tNameGetTableName(pName));
|
||||
ctgTaskDebug("will continue to refresh tbmeta since got stb, tbName:%s", tNameGetTableName(pName));
|
||||
|
||||
taosMemoryFreeClear(pOut->tbMeta);
|
||||
|
||||
|
@ -1207,11 +1210,11 @@ int32_t ctgHandleGetTbMetasRsp(SCtgTaskReq* tReq, int32_t reqType, const SDataBu
|
|||
STableMeta* stbMeta = NULL;
|
||||
(void)ctgReadTbMetaFromCache(pCtg, &stbCtx, &stbMeta);
|
||||
if (stbMeta && stbMeta->sversion >= pOut->tbMeta->sversion) {
|
||||
ctgDebug("use cached stb meta, tbName:%s", tNameGetTableName(pName));
|
||||
ctgTaskDebug("use cached stb meta, tbName:%s", tNameGetTableName(pName));
|
||||
exist = 1;
|
||||
taosMemoryFreeClear(stbMeta);
|
||||
} else {
|
||||
ctgDebug("need to get/update stb meta, tbName:%s", tNameGetTableName(pName));
|
||||
ctgTaskDebug("need to get/update stb meta, tbName:%s", tNameGetTableName(pName));
|
||||
taosMemoryFreeClear(pOut->tbMeta);
|
||||
taosMemoryFreeClear(stbMeta);
|
||||
}
|
||||
|
@ -1225,7 +1228,7 @@ int32_t ctgHandleGetTbMetasRsp(SCtgTaskReq* tReq, int32_t reqType, const SDataBu
|
|||
break;
|
||||
}
|
||||
default:
|
||||
ctgError("invalid reqType %d", reqType);
|
||||
ctgTaskError("invalid reqType %d", reqType);
|
||||
CTG_ERR_JRET(TSDB_CODE_INVALID_MSG);
|
||||
}
|
||||
|
||||
|
@ -1280,6 +1283,7 @@ _return:
|
|||
TSWAP(pTask->res, ctx->pResList);
|
||||
taskDone = true;
|
||||
}
|
||||
ctgTaskError("Get table %d.%s.%s meta failed with error %s", pName->acctId, pName->dbname, pName->tname, tstrerror(code));
|
||||
}
|
||||
|
||||
if (pTask->res && taskDone) {
|
||||
|
|
|
@ -44,6 +44,7 @@ typedef struct SInsertParseContext {
|
|||
SParsedDataColInfo tags; // for stmt
|
||||
bool missCache;
|
||||
bool usingDuplicateTable;
|
||||
bool forceUpdate;
|
||||
} SInsertParseContext;
|
||||
|
||||
typedef int32_t (*_row_append_fn_t)(SMsgBuf* pMsgBuf, const void* value, int32_t len, void* param);
|
||||
|
@ -829,6 +830,11 @@ static int32_t getTableVgroup(SParseContext* pCxt, SVnodeModifOpStmt* pStmt, boo
|
|||
}
|
||||
|
||||
static int32_t getTargetTableSchema(SInsertParseContext* pCxt, SVnodeModifOpStmt* pStmt) {
|
||||
if (pCxt->forceUpdate) {
|
||||
pCxt->missCache = true;
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t code = checkAuth(pCxt->pComCxt, &pStmt->targetTableName, &pCxt->missCache);
|
||||
if (TSDB_CODE_SUCCESS == code && !pCxt->missCache) {
|
||||
code = getTableMeta(pCxt, &pStmt->targetTableName, false, &pStmt->pTableMeta, &pCxt->missCache);
|
||||
|
@ -844,6 +850,11 @@ static int32_t preParseUsingTableName(SInsertParseContext* pCxt, SVnodeModifOpSt
|
|||
}
|
||||
|
||||
static int32_t getUsingTableSchema(SInsertParseContext* pCxt, SVnodeModifOpStmt* pStmt) {
|
||||
if (pCxt->forceUpdate) {
|
||||
pCxt->missCache = true;
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t code = checkAuth(pCxt->pComCxt, &pStmt->targetTableName, &pCxt->missCache);
|
||||
if (TSDB_CODE_SUCCESS == code && !pCxt->missCache) {
|
||||
code = getTableMeta(pCxt, &pStmt->usingTableName, true, &pStmt->pTableMeta, &pCxt->missCache);
|
||||
|
@ -1909,6 +1920,7 @@ int32_t parseInsertSql(SParseContext* pCxt, SQuery** pQuery, SCatalogReq* pCatal
|
|||
.msg = {.buf = pCxt->pMsg, .len = pCxt->msgLen},
|
||||
.missCache = false,
|
||||
.usingDuplicateTable = false,
|
||||
.forceUpdate = (NULL != pCatalogReq ? pCatalogReq->forceUpdate : false)
|
||||
};
|
||||
|
||||
int32_t code = initInsertQuery(&context, pCatalogReq, pMetaData, pQuery);
|
||||
|
|
Loading…
Reference in New Issue