[td-535]
This commit is contained in:
parent
6e0556d540
commit
b3389e268c
|
@ -443,15 +443,17 @@ void tscTableMetaCallBack(void *param, TAOS_RES *res, int code) {
|
|||
}
|
||||
|
||||
if (pSql->pStream == NULL) {
|
||||
// check if it is a sub-query of super table query first, if true, enter another routine
|
||||
SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
|
||||
|
||||
if ((pQueryInfo->type & TSDB_QUERY_TYPE_STABLE_SUBQUERY) == TSDB_QUERY_TYPE_STABLE_SUBQUERY) {
|
||||
|
||||
// check if it is a sub-query of super table query first, if true, enter another routine
|
||||
if (TSDB_QUERY_HAS_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_STABLE_SUBQUERY)) {
|
||||
tscTrace("%p update table meta in local cache, continue to process sql and send corresponding subquery", pSql);
|
||||
|
||||
STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
|
||||
if (pTableMetaInfo->pTableMeta == NULL){
|
||||
code = tscGetTableMeta(pSql, pTableMetaInfo);
|
||||
assert(code == TSDB_CODE_SUCCESS);
|
||||
}
|
||||
}
|
||||
|
||||
assert((tscGetNumOfTags(pTableMetaInfo->pTableMeta) != 0) && pTableMetaInfo->vgroupIndex >= 0 && pSql->param != NULL);
|
||||
|
||||
|
@ -461,32 +463,37 @@ void tscTableMetaCallBack(void *param, TAOS_RES *res, int code) {
|
|||
assert(pParObj->signature == pParObj && trs->subqueryIndex == pTableMetaInfo->vgroupIndex &&
|
||||
tscGetNumOfTags(pTableMetaInfo->pTableMeta) != 0);
|
||||
|
||||
tscTrace("%p get metricMeta during super table query successfully", pSql);
|
||||
|
||||
code = tscGetSTableVgroupInfo(pSql, 0);
|
||||
pRes->code = code;
|
||||
|
||||
if (code == TSDB_CODE_ACTION_IN_PROGRESS) return;
|
||||
} else { // normal async query continues
|
||||
// NOTE: the vgroupInfo for the queried super table must be existed here.
|
||||
assert(pTableMetaInfo->vgroupList != NULL);
|
||||
if ((code = tscProcessSql(pSql)) == TSDB_CODE_SUCCESS) {
|
||||
return;
|
||||
}
|
||||
} else { // continue to process normal async query
|
||||
if (pCmd->parseFinished) {
|
||||
tscTrace("%p re-send data to vnode in table Meta callback since sql parsed completed", pSql);
|
||||
|
||||
tscTrace("%p update table meta in local cache, continue to process sql and send corresponding query", pSql);
|
||||
|
||||
STableMetaInfo* pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);
|
||||
code = tscGetTableMeta(pSql, pTableMetaInfo);
|
||||
assert(code == TSDB_CODE_SUCCESS);
|
||||
|
||||
if (pTableMetaInfo->pTableMeta) {
|
||||
// todo update the submit message according to the new table meta
|
||||
// 1. table uid, 2. ip address
|
||||
code = tscSendMsgToServer(pSql);
|
||||
if (code == TSDB_CODE_SUCCESS) return;
|
||||
|
||||
// if failed to process sql, go to error handler
|
||||
if ((code = tscProcessSql(pSql)) == TSDB_CODE_SUCCESS) {
|
||||
return;
|
||||
}
|
||||
// // todo update the submit message according to the new table meta
|
||||
// // 1. table uid, 2. ip address
|
||||
// code = tscSendMsgToServer(pSql);
|
||||
// if (code == TSDB_CODE_SUCCESS) return;
|
||||
// }
|
||||
} else {
|
||||
tscTrace("%p continue parse sql after get table meta", pSql);
|
||||
|
||||
code = tsParseSql(pSql, false);
|
||||
if ((pQueryInfo->type & TSDB_QUERY_TYPE_STMT_INSERT) == TSDB_QUERY_TYPE_STMT_INSERT) {
|
||||
if (TSDB_QUERY_HAS_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_STMT_INSERT)) {
|
||||
STableMetaInfo* pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);
|
||||
code = tscGetTableMeta(pSql, pTableMetaInfo);
|
||||
assert(code == TSDB_CODE_SUCCESS && pTableMetaInfo->pTableMeta != NULL);
|
||||
|
||||
(*pSql->fp)(pSql->param, pSql, code);
|
||||
return;
|
||||
}
|
||||
|
|
|
@ -430,7 +430,7 @@ void tscKillSTableQuery(SSqlObj *pSql) {
|
|||
|
||||
/*
|
||||
* here, we cannot set the command = TSDB_SQL_KILL_QUERY. Otherwise, it may cause
|
||||
* sub-queries not correctly released and master sql object of metric query reaches an abnormal state.
|
||||
* sub-queries not correctly released and master sql object of super table query reaches an abnormal state.
|
||||
*/
|
||||
pSql->pSubs[i]->res.code = TSDB_CODE_QUERY_CANCELLED;
|
||||
//taosStopRpcConn(pSql->pSubs[i]->thandle);
|
||||
|
@ -564,7 +564,7 @@ static char *doSerializeTableInfo(SQueryTableMsg* pQueryMsg, SSqlObj *pSql, char
|
|||
|
||||
pQueryMsg->numOfTables = htonl(1); // set the number of tables
|
||||
pMsg += sizeof(STableIdInfo);
|
||||
} else {
|
||||
} else { // it is a subquery of the super table query, this IP info is acquired from vgroupInfo
|
||||
int32_t index = pTableMetaInfo->vgroupIndex;
|
||||
int32_t numOfVgroups = taosArrayGetSize(pTableMetaInfo->pVgroupTables);
|
||||
assert(index >= 0 && index < numOfVgroups);
|
||||
|
@ -1821,7 +1821,7 @@ int tscProcessTableMetaRsp(SSqlObj *pSql) {
|
|||
return TSDB_CODE_CLI_OUT_OF_MEMORY;
|
||||
}
|
||||
|
||||
tscTrace("%p recv table meta: %"PRId64 ", tid:%d, name:%s", pSql, pTableMeta->uid, pTableMeta->sid, pTableMetaInfo->name);
|
||||
tscTrace("%p recv table meta, uid:%"PRId64 ", tid:%d, name:%s", pSql, pTableMeta->uid, pTableMeta->sid, pTableMetaInfo->name);
|
||||
free(pTableMeta);
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
|
@ -2388,56 +2388,26 @@ int tscGetMeterMetaEx(SSqlObj *pSql, STableMetaInfo *pTableMetaInfo, bool create
|
|||
return tscGetTableMeta(pSql, pTableMetaInfo);
|
||||
}
|
||||
|
||||
/*
|
||||
* in handling the renew metermeta problem during insertion,
|
||||
*
|
||||
* If the meter is created on demand during insertion, the routine usually waits for a short
|
||||
* period to re-issue the getMeterMeta msg, in which makes a greater change that vnode has
|
||||
* successfully created the corresponding table.
|
||||
*/
|
||||
static void tscWaitingForCreateTable(SSqlCmd *pCmd) {
|
||||
if (pCmd->command == TSDB_SQL_INSERT) {
|
||||
taosMsleep(50); // todo: global config
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* in renew metermeta, do not retrieve metadata in cache.
|
||||
* retrieve table meta from mnode, and update the local table meta cache.
|
||||
* @param pSql sql object
|
||||
* @param tableId meter id
|
||||
* @param tableId table full name
|
||||
* @return status code
|
||||
*/
|
||||
int tscRenewMeterMeta(SSqlObj *pSql, char *tableId) {
|
||||
int code = 0;
|
||||
|
||||
// handle table meta renew process
|
||||
SSqlCmd *pCmd = &pSql->cmd;
|
||||
|
||||
SQueryInfo * pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);
|
||||
STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
|
||||
|
||||
/*
|
||||
* 1. only update the metermeta in force model metricmeta is not updated
|
||||
* 2. if get metermeta failed, still get the metermeta
|
||||
*/
|
||||
if (pTableMetaInfo->pTableMeta == NULL || !tscQueryOnSTable(pCmd)) {
|
||||
STableMeta* pTableMeta = pTableMetaInfo->pTableMeta;
|
||||
if (pTableMetaInfo->pTableMeta) {
|
||||
tscTrace("%p update table meta, old: numOfTags:%d, numOfCols:%d, uid:%" PRId64 ", addr:%p", pSql,
|
||||
tscGetNumOfTags(pTableMeta), tscGetNumOfColumns(pTableMeta), pTableMeta->uid, pTableMeta);
|
||||
}
|
||||
|
||||
tscWaitingForCreateTable(pCmd);
|
||||
taosCacheRelease(tscCacheHandle, (void **)&(pTableMetaInfo->pTableMeta), true);
|
||||
|
||||
code = getTableMetaFromMgmt(pSql, pTableMetaInfo); // todo ??
|
||||
} else {
|
||||
tscTrace("%p metric query not update metric meta, numOfTags:%d, numOfCols:%d, uid:%" PRId64 ", addr:%p", pSql,
|
||||
tscGetNumOfTags(pTableMetaInfo->pTableMeta), pCmd->numOfCols, pTableMetaInfo->pTableMeta->uid,
|
||||
pTableMetaInfo->pTableMeta);
|
||||
STableMeta* pTableMeta = pTableMetaInfo->pTableMeta;
|
||||
if (pTableMetaInfo->pTableMeta) {
|
||||
tscTrace("%p update table meta, old meta numOfTags:%d, numOfCols:%d, uid:%" PRId64 ", addr:%p", pSql,
|
||||
tscGetNumOfTags(pTableMeta), tscGetNumOfColumns(pTableMeta), pTableMeta->uid, pTableMeta);
|
||||
}
|
||||
|
||||
return code;
|
||||
taosCacheRelease(tscCacheHandle, (void **)&(pTableMetaInfo->pTableMeta), true);
|
||||
return getTableMetaFromMgmt(pSql, pTableMetaInfo);
|
||||
}
|
||||
|
||||
static bool allVgroupInfoRetrieved(SSqlCmd* pCmd, int32_t clauseIndex) {
|
||||
|
|
Loading…
Reference in New Issue