[td-5175]<fix>: refactor the retry procedure while the error caused by tablemeta cache occurs.
This commit is contained in:
parent
3ee29d8aec
commit
e0046dbb25
|
@ -354,6 +354,8 @@ char* strdup_throw(const char* str);
|
|||
bool vgroupInfoIdentical(SNewVgroupInfo *pExisted, SVgroupMsg* src);
|
||||
SNewVgroupInfo createNewVgroupInfo(SVgroupMsg *pVgroupMsg);
|
||||
|
||||
void tscRemoveTableMetaBuf(STableMetaInfo* pTableMetaInfo, uint64_t id);
|
||||
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
#endif
|
||||
|
|
|
@ -325,61 +325,6 @@ void tscAsyncResultOnError(SSqlObj* pSql) {
|
|||
|
||||
int tscSendMsgToServer(SSqlObj *pSql);
|
||||
|
||||
static int32_t updateMetaBeforeRetryQuery(SSqlObj* pSql, STableMetaInfo* pTableMetaInfo, SQueryInfo* pQueryInfo) {
|
||||
// handle the invalid table error code for super table.
|
||||
// update the pExpr info, colList info, number of table columns
|
||||
// TODO Re-parse this sql and issue the corresponding subquery as an alternative for this case.
|
||||
if (pSql->retryReason == TSDB_CODE_TDB_INVALID_TABLE_ID) {
|
||||
int32_t numOfExprs = (int32_t) tscNumOfExprs(pQueryInfo);
|
||||
int32_t numOfCols = tscGetNumOfColumns(pTableMetaInfo->pTableMeta);
|
||||
int32_t numOfTags = tscGetNumOfTags(pTableMetaInfo->pTableMeta);
|
||||
|
||||
SSchema *pSchema = tscGetTableSchema(pTableMetaInfo->pTableMeta);
|
||||
SSchema *pTagSchema = tscGetTableTagSchema(pTableMetaInfo->pTableMeta);
|
||||
|
||||
for (int32_t i = 0; i < numOfExprs; ++i) {
|
||||
SSqlExpr *pExpr = &(tscExprGet(pQueryInfo, i)->base);
|
||||
|
||||
// update the table uid
|
||||
pExpr->uid = pTableMetaInfo->pTableMeta->id.uid;
|
||||
|
||||
if (pExpr->colInfo.colIndex >= 0) {
|
||||
int32_t index = pExpr->colInfo.colIndex;
|
||||
|
||||
if ((TSDB_COL_IS_NORMAL_COL(pExpr->colInfo.flag) && index >= numOfCols) ||
|
||||
(TSDB_COL_IS_TAG(pExpr->colInfo.flag) && (index < 0 || index >= numOfTags))) {
|
||||
return pSql->retryReason;
|
||||
}
|
||||
|
||||
if (TSDB_COL_IS_TAG(pExpr->colInfo.flag)) {
|
||||
if ((pTagSchema[pExpr->colInfo.colIndex].colId != pExpr->colInfo.colId) &&
|
||||
strcasecmp(pExpr->colInfo.name, pTagSchema[pExpr->colInfo.colIndex].name) != 0) {
|
||||
return pSql->retryReason;
|
||||
}
|
||||
} else if (TSDB_COL_IS_NORMAL_COL(pExpr->colInfo.flag)) {
|
||||
if ((pSchema[pExpr->colInfo.colIndex].colId != pExpr->colInfo.colId) &&
|
||||
strcasecmp(pExpr->colInfo.name, pSchema[pExpr->colInfo.colIndex].name) != 0) {
|
||||
return pSql->retryReason;
|
||||
}
|
||||
} else { // do nothing for udc
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// validate the table columns information
|
||||
for (int32_t i = 0; i < taosArrayGetSize(pQueryInfo->colList); ++i) {
|
||||
SColumn *pCol = taosArrayGetP(pQueryInfo->colList, i);
|
||||
if (pCol->columnIndex >= numOfCols) {
|
||||
return pSql->retryReason;
|
||||
}
|
||||
}
|
||||
} else {
|
||||
// do nothing
|
||||
}
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
void tscTableMetaCallBack(void *param, TAOS_RES *res, int code) {
|
||||
SSqlObj* pSql = (SSqlObj*)taosAcquireRef(tscObjRef, (int64_t)param);
|
||||
if (pSql == NULL) return;
|
||||
|
@ -391,7 +336,7 @@ void tscTableMetaCallBack(void *param, TAOS_RES *res, int code) {
|
|||
pRes->code = code;
|
||||
|
||||
SSqlObj *sub = (SSqlObj*) res;
|
||||
const char* msg = (sub->cmd.command == TSDB_SQL_STABLEVGROUP)? "vgroup-list":"table-meta";
|
||||
const char* msg = (sub->cmd.command == TSDB_SQL_STABLEVGROUP)? "vgroup-list":"[multi-]tableMeta";
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
tscError("0x%"PRIx64" get %s failed, code:%s", pSql->self, msg, tstrerror(code));
|
||||
goto _error;
|
||||
|
@ -401,31 +346,11 @@ void tscTableMetaCallBack(void *param, TAOS_RES *res, int code) {
|
|||
if (pSql->pStream == NULL) {
|
||||
SQueryInfo *pQueryInfo = tscGetQueryInfo(pCmd);
|
||||
|
||||
// check if it is a sub-query of super table query first, if true, enter another routine
|
||||
if (TSDB_QUERY_HAS_TYPE(pQueryInfo->type, (TSDB_QUERY_TYPE_STABLE_SUBQUERY | TSDB_QUERY_TYPE_SUBQUERY |
|
||||
TSDB_QUERY_TYPE_TAG_FILTER_QUERY))) {
|
||||
tscDebug("0x%" PRIx64 " update cached table-meta, continue to process sql and send the corresponding query", pSql->self);
|
||||
STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
|
||||
// assert(TSDB_QUERY_HAS_TYPE(pQueryInfo->type, (TSDB_QUERY_TYPE_STABLE_SUBQUERY | TSDB_QUERY_TYPE_SUBQUERY | TSDB_QUERY_TYPE_TAG_FILTER_QUERY)) == 0);
|
||||
|
||||
code = tscGetTableMeta(pSql, pTableMetaInfo);
|
||||
assert(code == TSDB_CODE_TSC_ACTION_IN_PROGRESS || code == TSDB_CODE_SUCCESS);
|
||||
|
||||
if (code == TSDB_CODE_TSC_ACTION_IN_PROGRESS) {
|
||||
taosReleaseRef(tscObjRef, pSql->self);
|
||||
return;
|
||||
}
|
||||
|
||||
assert((tscGetNumOfTags(pTableMetaInfo->pTableMeta) != 0));
|
||||
code = updateMetaBeforeRetryQuery(pSql, pTableMetaInfo, pQueryInfo);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
goto _error;
|
||||
}
|
||||
|
||||
// tscBuildAndSendRequest can add error into async res
|
||||
tscBuildAndSendRequest(pSql, NULL);
|
||||
taosReleaseRef(tscObjRef, pSql->self);
|
||||
return;
|
||||
} else { // continue to process normal async query
|
||||
// super table subquery failure will be ignored
|
||||
// if (!TSDB_QUERY_HAS_TYPE(pQueryInfo->type, (TSDB_QUERY_TYPE_STABLE_SUBQUERY | TSDB_QUERY_TYPE_SUBQUERY |
|
||||
// TSDB_QUERY_TYPE_TAG_FILTER_QUERY))) {
|
||||
if (TSDB_QUERY_HAS_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_INSERT)) {
|
||||
tscDebug("0x%" PRIx64 " continue parse sql after get table-meta", pSql->self);
|
||||
|
||||
|
@ -437,7 +362,7 @@ void tscTableMetaCallBack(void *param, TAOS_RES *res, int code) {
|
|||
goto _error;
|
||||
}
|
||||
|
||||
if (TSDB_QUERY_HAS_TYPE(pCmd->insertParam.insertType, TSDB_QUERY_TYPE_STMT_INSERT)) {
|
||||
if (TSDB_QUERY_HAS_TYPE(pCmd->insertParam.insertType, TSDB_QUERY_TYPE_STMT_INSERT)) { // stmt insert
|
||||
STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
|
||||
code = tscGetTableMeta(pSql, pTableMetaInfo);
|
||||
if (code == TSDB_CODE_TSC_ACTION_IN_PROGRESS) {
|
||||
|
@ -448,17 +373,14 @@ void tscTableMetaCallBack(void *param, TAOS_RES *res, int code) {
|
|||
}
|
||||
|
||||
(*pSql->fp)(pSql->param, pSql, code);
|
||||
} else {
|
||||
if (TSDB_QUERY_HAS_TYPE(pCmd->insertParam.insertType, TSDB_QUERY_TYPE_FILE_INSERT)) {
|
||||
tscImportDataFromFile(pSql);
|
||||
} else {
|
||||
tscHandleMultivnodeInsert(pSql);
|
||||
}
|
||||
} else if (TSDB_QUERY_HAS_TYPE(pCmd->insertParam.insertType, TSDB_QUERY_TYPE_FILE_INSERT)) { // file insert
|
||||
tscImportDataFromFile(pSql);
|
||||
} else { // sql string insert
|
||||
tscHandleMultivnodeInsert(pSql);
|
||||
}
|
||||
} else {
|
||||
if (pSql->retryReason != TSDB_CODE_SUCCESS) {
|
||||
tscDebug("0x%" PRIx64 " update cached table-meta, re-validate sql statement and send query again",
|
||||
pSql->self);
|
||||
tscDebug("0x%" PRIx64 " update cached table-meta, re-validate sql statement and send query again", pSql->self);
|
||||
tscResetSqlCmd(pCmd, false);
|
||||
pSql->retryReason = TSDB_CODE_SUCCESS;
|
||||
} else {
|
||||
|
@ -479,7 +401,7 @@ void tscTableMetaCallBack(void *param, TAOS_RES *res, int code) {
|
|||
|
||||
taosReleaseRef(tscObjRef, pSql->self);
|
||||
return;
|
||||
}
|
||||
// }
|
||||
} else { // stream computing
|
||||
tscDebug("0x%"PRIx64" stream:%p meta is updated, start new query, command:%d", pSql->self, pSql->pStream, pCmd->command);
|
||||
|
||||
|
|
|
@ -390,33 +390,40 @@ void tscProcessMsgFromServer(SRpcMsg *rpcMsg, SRpcEpSet *pEpSet) {
|
|||
pSql->cmd.insertParam.schemaAttached = 1;
|
||||
}
|
||||
|
||||
// single table query error need to be handled here.
|
||||
if ((cmd == TSDB_SQL_SELECT || cmd == TSDB_SQL_UPDATE_TAGS_VAL) &&
|
||||
(rpcMsg->code == TSDB_CODE_TDB_INVALID_TABLE_ID || // change the retry procedure
|
||||
/*(*/rpcMsg->code == TSDB_CODE_VND_INVALID_VGROUP_ID ||
|
||||
(((rpcMsg->code == TSDB_CODE_TDB_INVALID_TABLE_ID || // change the retry procedure
|
||||
rpcMsg->code == TSDB_CODE_VND_INVALID_VGROUP_ID)) ||
|
||||
rpcMsg->code == TSDB_CODE_RPC_NETWORK_UNAVAIL || // change the retry procedure
|
||||
rpcMsg->code == TSDB_CODE_APP_NOT_READY)) {
|
||||
|
||||
pSql->retry++;
|
||||
tscWarn("0x%"PRIx64" it shall renew table meta, code:%s, retry:%d", pSql->self, tstrerror(rpcMsg->code), pSql->retry);
|
||||
|
||||
pSql->res.code = rpcMsg->code; // keep the previous error code
|
||||
if (pSql->retry > pSql->maxRetry) {
|
||||
tscError("0x%"PRIx64" max retry %d reached, give up", pSql->self, pSql->maxRetry);
|
||||
} else {
|
||||
// wait for a little bit moment and then retry
|
||||
// todo do not sleep in rpc callback thread, add this process into queue to process
|
||||
if (rpcMsg->code == TSDB_CODE_APP_NOT_READY || rpcMsg->code == TSDB_CODE_VND_INVALID_VGROUP_ID) {
|
||||
int32_t duration = getWaitingTimeInterval(pSql->retry);
|
||||
taosMsleep(duration);
|
||||
}
|
||||
if (TSDB_QUERY_HAS_TYPE(pQueryInfo->type, (TSDB_QUERY_TYPE_STABLE_SUBQUERY | TSDB_QUERY_TYPE_SUBQUERY |
|
||||
TSDB_QUERY_TYPE_TAG_FILTER_QUERY)) &&
|
||||
!TSDB_QUERY_HAS_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_PROJECTION_QUERY)) {
|
||||
// do nothing in case of super table subquery
|
||||
} else {
|
||||
pSql->retry += 1;
|
||||
tscWarn("0x%" PRIx64 " it shall renew table meta, code:%s, retry:%d", pSql->self, tstrerror(rpcMsg->code), pSql->retry);
|
||||
|
||||
pSql->retryReason = rpcMsg->code;
|
||||
rpcMsg->code = tscRenewTableMeta(pSql, 0);
|
||||
// if there is an error occurring, proceed to the following error handling procedure.
|
||||
if (rpcMsg->code == TSDB_CODE_TSC_ACTION_IN_PROGRESS) {
|
||||
taosReleaseRef(tscObjRef, handle);
|
||||
rpcFreeCont(rpcMsg->pCont);
|
||||
return;
|
||||
pSql->res.code = rpcMsg->code; // keep the previous error code
|
||||
if (pSql->retry > pSql->maxRetry) {
|
||||
tscError("0x%" PRIx64 " max retry %d reached, give up", pSql->self, pSql->maxRetry);
|
||||
} else {
|
||||
// wait for a little bit moment and then retry
|
||||
// todo do not sleep in rpc callback thread, add this process into queue to process
|
||||
if (rpcMsg->code == TSDB_CODE_APP_NOT_READY || rpcMsg->code == TSDB_CODE_VND_INVALID_VGROUP_ID) {
|
||||
int32_t duration = getWaitingTimeInterval(pSql->retry);
|
||||
taosMsleep(duration);
|
||||
}
|
||||
|
||||
pSql->retryReason = rpcMsg->code;
|
||||
rpcMsg->code = tscRenewTableMeta(pSql, 0);
|
||||
// if there is an error occurring, proceed to the following error handling procedure.
|
||||
if (rpcMsg->code == TSDB_CODE_TSC_ACTION_IN_PROGRESS) {
|
||||
taosReleaseRef(tscObjRef, handle);
|
||||
rpcFreeCont(rpcMsg->pCont);
|
||||
return;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -2521,14 +2528,7 @@ int tscProcessDropDbRsp(SSqlObj *pSql) {
|
|||
|
||||
int tscProcessDropTableRsp(SSqlObj *pSql) {
|
||||
STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(&pSql->cmd, 0);
|
||||
|
||||
//The cached tableMeta is expired in this case, so clean it in hash table
|
||||
char name[TSDB_TABLE_FNAME_LEN] = {0};
|
||||
tNameExtractFullName(&pTableMetaInfo->name, name);
|
||||
|
||||
taosHashRemove(tscTableMetaMap, name, strnlen(name, TSDB_TABLE_FNAME_LEN));
|
||||
tscDebug("0x%"PRIx64" remove table meta after drop table:%s, numOfRemain:%d", pSql->self, name, (int32_t) taosHashGetSize(tscTableMetaMap));
|
||||
|
||||
tscRemoveTableMetaBuf(pTableMetaInfo, pSql->self);
|
||||
tfree(pTableMetaInfo->pTableMeta);
|
||||
return 0;
|
||||
}
|
||||
|
@ -2915,9 +2915,7 @@ static void freeElem(void* p) {
|
|||
* @return status code
|
||||
*/
|
||||
int tscRenewTableMeta(SSqlObj *pSql, int32_t tableIndex) {
|
||||
SSqlCmd *pCmd = &pSql->cmd;
|
||||
|
||||
SQueryInfo *pQueryInfo = tscGetQueryInfo(pCmd);
|
||||
SQueryInfo *pQueryInfo = tscGetQueryInfo(&pSql->cmd);
|
||||
STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, tableIndex);
|
||||
|
||||
char name[TSDB_TABLE_FNAME_LEN] = {0};
|
||||
|
@ -2934,15 +2932,7 @@ int tscRenewTableMeta(SSqlObj *pSql, int32_t tableIndex) {
|
|||
}
|
||||
|
||||
// remove stored tableMeta info in hash table
|
||||
size_t len = strlen(name);
|
||||
taosHashRemove(tscTableMetaMap, name, len);
|
||||
|
||||
if (pTableMeta->tableType == TSDB_SUPER_TABLE) {
|
||||
void* pv = taosCacheAcquireByKey(tscVgroupListBuf, name, len);
|
||||
if (pv != NULL) {
|
||||
taosCacheRelease(tscVgroupListBuf, &pv, true);
|
||||
}
|
||||
}
|
||||
tscRemoveTableMetaBuf(pTableMetaInfo, pSql->self);
|
||||
|
||||
SArray* pNameList = taosArrayInit(1, POINTER_BYTES);
|
||||
SArray* vgroupList = taosArrayInit(1, POINTER_BYTES);
|
||||
|
|
|
@ -2704,8 +2704,7 @@ void tscHandleSubqueryError(SRetrieveSupport *trsupport, SSqlObj *pSql, int numO
|
|||
tstrerror(pParentSql->res.code));
|
||||
|
||||
// release allocated resource
|
||||
tscDestroyGlobalMergerEnv(trsupport->pExtMemBuffer, trsupport->pOrderDescriptor,
|
||||
pState->numOfSub);
|
||||
tscDestroyGlobalMergerEnv(trsupport->pExtMemBuffer, trsupport->pOrderDescriptor, pState->numOfSub);
|
||||
|
||||
tscFreeRetrieveSup(pSql);
|
||||
|
||||
|
@ -2713,7 +2712,35 @@ void tscHandleSubqueryError(SRetrieveSupport *trsupport, SSqlObj *pSql, int numO
|
|||
SQueryInfo *pQueryInfo = tscGetQueryInfo(&pParentSql->cmd);
|
||||
|
||||
if (!TSDB_QUERY_HAS_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_JOIN_SEC_STAGE)) {
|
||||
(*pParentSql->fp)(pParentSql->param, pParentSql, pParentSql->res.code);
|
||||
|
||||
int32_t code = pParentSql->res.code;
|
||||
if ((code == TSDB_CODE_TDB_INVALID_TABLE_ID || code == TSDB_CODE_VND_INVALID_VGROUP_ID) && pParentSql->retry < pParentSql->maxRetry) {
|
||||
// remove the cached tableMeta and vgroup id list, and then parse the sql again
|
||||
STableMetaInfo* pTableMetaInfo = tscGetTableMetaInfoFromCmd(&pParentSql->cmd, 0);
|
||||
tscRemoveTableMetaBuf(pTableMetaInfo, pParentSql->self);
|
||||
|
||||
tscResetSqlCmd(&pParentSql->cmd, true);
|
||||
pParentSql->res.code = TSDB_CODE_SUCCESS;
|
||||
pParentSql->retry++;
|
||||
|
||||
tscDebug("0x%"PRIx64" retry parse sql and send query, prev error: %s, retry:%d", pParentSql->self,
|
||||
tstrerror(code), pParentSql->retry);
|
||||
|
||||
code = tsParseSql(pParentSql, true);
|
||||
if (code == TSDB_CODE_TSC_ACTION_IN_PROGRESS) {
|
||||
return;
|
||||
}
|
||||
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
pParentSql->res.code = code;
|
||||
tscAsyncResultOnError(pParentSql);
|
||||
return;
|
||||
}
|
||||
|
||||
executeQuery(pParentSql, pQueryInfo);
|
||||
} else {
|
||||
(*pParentSql->fp)(pParentSql->param, pParentSql, pParentSql->res.code);
|
||||
}
|
||||
} else { // regular super table query
|
||||
if (pParentSql->res.code != TSDB_CODE_SUCCESS) {
|
||||
tscAsyncResultOnError(pParentSql);
|
||||
|
@ -2996,7 +3023,7 @@ void tscRetrieveDataRes(void *param, TAOS_RES *tres, int code) {
|
|||
if (taos_errno(pSql) != TSDB_CODE_SUCCESS) {
|
||||
assert(code == taos_errno(pSql));
|
||||
|
||||
if (trsupport->numOfRetry++ < MAX_NUM_OF_SUBQUERY_RETRY) {
|
||||
if (trsupport->numOfRetry++ < MAX_NUM_OF_SUBQUERY_RETRY && (code != TSDB_CODE_TDB_INVALID_TABLE_ID)) {
|
||||
tscError("0x%"PRIx64" sub:0x%"PRIx64" failed code:%s, retry:%d", pParentSql->self, pSql->self, tstrerror(code), trsupport->numOfRetry);
|
||||
|
||||
int32_t sent = 0;
|
||||
|
@ -3005,7 +3032,7 @@ void tscRetrieveDataRes(void *param, TAOS_RES *tres, int code) {
|
|||
return;
|
||||
}
|
||||
} else {
|
||||
tscError("0x%"PRIx64" sub:0x%"PRIx64" reach the max retry times, set global code:%s", pParentSql->self, pSql->self, tstrerror(code));
|
||||
tscError("0x%"PRIx64" sub:0x%"PRIx64" reach the max retry times or no need to retry, set global code:%s", pParentSql->self, pSql->self, tstrerror(code));
|
||||
atomic_val_compare_exchange_32(&pParentSql->res.code, TSDB_CODE_SUCCESS, code); // set global code and abort
|
||||
}
|
||||
|
||||
|
@ -3129,8 +3156,6 @@ static void multiVnodeInsertFinalize(void* param, TAOS_RES* tres, int numOfRows)
|
|||
}
|
||||
|
||||
pParentObj->res.code = TSDB_CODE_SUCCESS;
|
||||
// pParentObj->cmd.parseFinished = false;
|
||||
|
||||
tscResetSqlCmd(&pParentObj->cmd, false);
|
||||
|
||||
// in case of insert, redo parsing the sql string and build new submit data block for two reasons:
|
||||
|
|
|
@ -4857,3 +4857,19 @@ SNewVgroupInfo createNewVgroupInfo(SVgroupMsg *pVgroupMsg) {
|
|||
|
||||
return info;
|
||||
}
|
||||
|
||||
void tscRemoveTableMetaBuf(STableMetaInfo* pTableMetaInfo, uint64_t id) {
|
||||
char fname[TSDB_TABLE_FNAME_LEN] = {0};
|
||||
tNameExtractFullName(&pTableMetaInfo->name, fname);
|
||||
|
||||
int32_t len = (int32_t) strnlen(fname, TSDB_TABLE_FNAME_LEN);
|
||||
if (UTIL_TABLE_IS_SUPER_TABLE(pTableMetaInfo)) {
|
||||
void* pv = taosCacheAcquireByKey(tscVgroupListBuf, fname, len);
|
||||
if (pv != NULL) {
|
||||
taosCacheRelease(tscVgroupListBuf, &pv, true);
|
||||
}
|
||||
}
|
||||
|
||||
taosHashRemove(tscTableMetaMap, fname, len);
|
||||
tscDebug("0x%"PRIx64" remove table meta %s, numOfRemain:%d", id, fname, (int32_t) taosHashGetSize(tscTableMetaMap));
|
||||
}
|
Loading…
Reference in New Issue