[td-5175]
This commit is contained in:
parent
c7ae4f5789
commit
f12d252116
|
@ -211,13 +211,6 @@ void taos_fetch_rows_a(TAOS_RES *tres, __async_cb_func_t fp, void *param) {
|
||||||
pSql->fp = tscAsyncFetchRowsProxy;
|
pSql->fp = tscAsyncFetchRowsProxy;
|
||||||
pSql->param = param;
|
pSql->param = param;
|
||||||
|
|
||||||
if (pRes->qId == 0) {
|
|
||||||
tscError("qhandle is invalid");
|
|
||||||
pRes->code = TSDB_CODE_TSC_INVALID_QHANDLE;
|
|
||||||
tscAsyncResultOnError(pSql);
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
tscResetForNextRetrieve(pRes);
|
tscResetForNextRetrieve(pRes);
|
||||||
|
|
||||||
// handle outer query based on the already retrieved nest query results.
|
// handle outer query based on the already retrieved nest query results.
|
||||||
|
@ -232,6 +225,13 @@ void taos_fetch_rows_a(TAOS_RES *tres, __async_cb_func_t fp, void *param) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (pRes->qId == 0) {
|
||||||
|
tscError("qhandle is invalid");
|
||||||
|
pRes->code = TSDB_CODE_TSC_INVALID_QHANDLE;
|
||||||
|
tscAsyncResultOnError(pSql);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
if (pCmd->command == TSDB_SQL_TABLE_JOIN_RETRIEVE) {
|
if (pCmd->command == TSDB_SQL_TABLE_JOIN_RETRIEVE) {
|
||||||
tscFetchDatablockForSubquery(pSql);
|
tscFetchDatablockForSubquery(pSql);
|
||||||
} else if (pRes->completed) {
|
} else if (pRes->completed) {
|
||||||
|
|
|
@ -392,14 +392,17 @@ void tscProcessMsgFromServer(SRpcMsg *rpcMsg, SRpcEpSet *pEpSet) {
|
||||||
|
|
||||||
// single table query error need to be handled here.
|
// single table query error need to be handled here.
|
||||||
if ((cmd == TSDB_SQL_SELECT || cmd == TSDB_SQL_UPDATE_TAGS_VAL) &&
|
if ((cmd == TSDB_SQL_SELECT || cmd == TSDB_SQL_UPDATE_TAGS_VAL) &&
|
||||||
(((rpcMsg->code == TSDB_CODE_TDB_INVALID_TABLE_ID || // change the retry procedure
|
(((rpcMsg->code == TSDB_CODE_TDB_INVALID_TABLE_ID ||
|
||||||
rpcMsg->code == TSDB_CODE_VND_INVALID_VGROUP_ID)) ||
|
rpcMsg->code == TSDB_CODE_VND_INVALID_VGROUP_ID)) ||
|
||||||
rpcMsg->code == TSDB_CODE_RPC_NETWORK_UNAVAIL || // change the retry procedure
|
rpcMsg->code == TSDB_CODE_RPC_NETWORK_UNAVAIL ||
|
||||||
rpcMsg->code == TSDB_CODE_APP_NOT_READY)) {
|
rpcMsg->code == TSDB_CODE_APP_NOT_READY)) {
|
||||||
|
|
||||||
if (TSDB_QUERY_HAS_TYPE(pQueryInfo->type, (TSDB_QUERY_TYPE_STABLE_SUBQUERY | TSDB_QUERY_TYPE_SUBQUERY |
|
// 1. super table subquery
|
||||||
|
// 2. nest queries are all not updated the tablemeta and retry parse the sql after cleanup local tablemeta/vgroup id buffer
|
||||||
|
if ((TSDB_QUERY_HAS_TYPE(pQueryInfo->type, (TSDB_QUERY_TYPE_STABLE_SUBQUERY | TSDB_QUERY_TYPE_SUBQUERY |
|
||||||
TSDB_QUERY_TYPE_TAG_FILTER_QUERY)) &&
|
TSDB_QUERY_TYPE_TAG_FILTER_QUERY)) &&
|
||||||
!TSDB_QUERY_HAS_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_PROJECTION_QUERY)) {
|
!TSDB_QUERY_HAS_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_PROJECTION_QUERY)) ||
|
||||||
|
(pQueryInfo->pUpstream != NULL)) {
|
||||||
// do nothing in case of super table subquery
|
// do nothing in case of super table subquery
|
||||||
} else {
|
} else {
|
||||||
pSql->retry += 1;
|
pSql->retry += 1;
|
||||||
|
|
|
@ -2705,7 +2705,6 @@ void tscHandleSubqueryError(SRetrieveSupport *trsupport, SSqlObj *pSql, int numO
|
||||||
|
|
||||||
// release allocated resource
|
// release allocated resource
|
||||||
tscDestroyGlobalMergerEnv(trsupport->pExtMemBuffer, trsupport->pOrderDescriptor, pState->numOfSub);
|
tscDestroyGlobalMergerEnv(trsupport->pExtMemBuffer, trsupport->pOrderDescriptor, pState->numOfSub);
|
||||||
|
|
||||||
tscFreeRetrieveSup(pSql);
|
tscFreeRetrieveSup(pSql);
|
||||||
|
|
||||||
// in case of second stage join subquery, invoke its callback function instead of regular QueueAsyncRes
|
// in case of second stage join subquery, invoke its callback function instead of regular QueueAsyncRes
|
||||||
|
@ -2716,10 +2715,13 @@ void tscHandleSubqueryError(SRetrieveSupport *trsupport, SSqlObj *pSql, int numO
|
||||||
int32_t code = pParentSql->res.code;
|
int32_t code = pParentSql->res.code;
|
||||||
if ((code == TSDB_CODE_TDB_INVALID_TABLE_ID || code == TSDB_CODE_VND_INVALID_VGROUP_ID) && pParentSql->retry < pParentSql->maxRetry) {
|
if ((code == TSDB_CODE_TDB_INVALID_TABLE_ID || code == TSDB_CODE_VND_INVALID_VGROUP_ID) && pParentSql->retry < pParentSql->maxRetry) {
|
||||||
// remove the cached tableMeta and vgroup id list, and then parse the sql again
|
// remove the cached tableMeta and vgroup id list, and then parse the sql again
|
||||||
STableMetaInfo* pTableMetaInfo = tscGetTableMetaInfoFromCmd(&pParentSql->cmd, 0);
|
SSqlCmd* pParentCmd = &pParentSql->cmd;
|
||||||
|
STableMetaInfo* pTableMetaInfo = tscGetTableMetaInfoFromCmd(pParentCmd, 0);
|
||||||
tscRemoveTableMetaBuf(pTableMetaInfo, pParentSql->self);
|
tscRemoveTableMetaBuf(pTableMetaInfo, pParentSql->self);
|
||||||
|
|
||||||
tscResetSqlCmd(&pParentSql->cmd, true);
|
pParentCmd->pTableMetaMap = tscCleanupTableMetaMap(pParentCmd->pTableMetaMap);
|
||||||
|
pParentCmd->pTableMetaMap = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK);
|
||||||
|
|
||||||
pParentSql->res.code = TSDB_CODE_SUCCESS;
|
pParentSql->res.code = TSDB_CODE_SUCCESS;
|
||||||
pParentSql->retry++;
|
pParentSql->retry++;
|
||||||
|
|
||||||
|
|
|
@ -3829,13 +3829,64 @@ static void tscSubqueryRetrieveCallback(void* param, TAOS_RES* tres, int code) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// todo handle the failure
|
|
||||||
static void tscSubqueryCompleteCallback(void* param, TAOS_RES* tres, int code) {
|
static void tscSubqueryCompleteCallback(void* param, TAOS_RES* tres, int code) {
|
||||||
|
SSqlObj* pSql = tres;
|
||||||
|
SRetrieveSupport* ps = param;
|
||||||
|
|
||||||
|
if (pSql->res.code != TSDB_CODE_SUCCESS) {
|
||||||
|
SSqlObj* pParentSql = ps->pParentSql;
|
||||||
|
|
||||||
|
int32_t index = ps->subqueryIndex;
|
||||||
|
bool ret = subAndCheckDone(pSql, pParentSql, index);
|
||||||
|
|
||||||
|
tfree(ps);
|
||||||
|
pSql->param = NULL;
|
||||||
|
|
||||||
|
if (!ret) {
|
||||||
|
tscDebug("0x%"PRIx64" sub:0x%"PRIx64" orderOfSub:%d completed, not all subquery finished", pParentSql->self, pSql->self, index);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
// todo refactor
|
||||||
|
tscDebug("0x%"PRIx64" all subquery response received, retry", pParentSql->self);
|
||||||
|
|
||||||
|
SSqlCmd* pParentCmd = &pParentSql->cmd;
|
||||||
|
STableMetaInfo* pTableMetaInfo = tscGetTableMetaInfoFromCmd(pParentCmd, 0);
|
||||||
|
tscRemoveTableMetaBuf(pTableMetaInfo, pParentSql->self);
|
||||||
|
|
||||||
|
pParentCmd->pTableMetaMap = tscCleanupTableMetaMap(pParentCmd->pTableMetaMap);
|
||||||
|
pParentCmd->pTableMetaMap = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK);
|
||||||
|
|
||||||
|
pParentSql->res.code = TSDB_CODE_SUCCESS;
|
||||||
|
pParentSql->retry++;
|
||||||
|
|
||||||
|
tscDebug("0x%"PRIx64" retry parse sql and send query, prev error: %s, retry:%d", pParentSql->self,
|
||||||
|
tstrerror(code), pParentSql->retry);
|
||||||
|
|
||||||
|
code = tsParseSql(pParentSql, true);
|
||||||
|
if (code == TSDB_CODE_TSC_ACTION_IN_PROGRESS) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
pParentSql->res.code = code;
|
||||||
|
tscAsyncResultOnError(pParentSql);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
SQueryInfo *pQueryInfo = tscGetQueryInfo(pParentCmd);
|
||||||
|
executeQuery(pParentSql, pQueryInfo);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
taos_fetch_rows_a(tres, tscSubqueryRetrieveCallback, param);
|
taos_fetch_rows_a(tres, tscSubqueryRetrieveCallback, param);
|
||||||
}
|
}
|
||||||
|
|
||||||
// do execute the query according to the query execution plan
|
// do execute the query according to the query execution plan
|
||||||
void executeQuery(SSqlObj* pSql, SQueryInfo* pQueryInfo) {
|
void executeQuery(SSqlObj* pSql, SQueryInfo* pQueryInfo) {
|
||||||
|
int32_t code = TSDB_CODE_SUCCESS;
|
||||||
|
int32_t numOfInit = 0;
|
||||||
|
|
||||||
if (pSql->cmd.command == TSDB_SQL_RETRIEVE_EMPTY_RESULT) {
|
if (pSql->cmd.command == TSDB_SQL_RETRIEVE_EMPTY_RESULT) {
|
||||||
(*pSql->fp)(pSql->param, pSql, 0);
|
(*pSql->fp)(pSql->param, pSql, 0);
|
||||||
return;
|
return;
|
||||||
|
@ -3850,7 +3901,12 @@ void executeQuery(SSqlObj* pSql, SQueryInfo* pQueryInfo) {
|
||||||
|
|
||||||
pSql->pSubs = calloc(pSql->subState.numOfSub, POINTER_BYTES);
|
pSql->pSubs = calloc(pSql->subState.numOfSub, POINTER_BYTES);
|
||||||
pSql->subState.states = calloc(pSql->subState.numOfSub, sizeof(int8_t));
|
pSql->subState.states = calloc(pSql->subState.numOfSub, sizeof(int8_t));
|
||||||
pthread_mutex_init(&pSql->subState.mutex, NULL);
|
code = pthread_mutex_init(&pSql->subState.mutex, NULL);
|
||||||
|
|
||||||
|
if (pSql->pSubs == NULL || pSql->subState.states == NULL || code != TSDB_CODE_SUCCESS) {
|
||||||
|
code = TSDB_CODE_TSC_OUT_OF_MEMORY;
|
||||||
|
goto _error;
|
||||||
|
}
|
||||||
|
|
||||||
for(int32_t i = 0; i < pSql->subState.numOfSub; ++i) {
|
for(int32_t i = 0; i < pSql->subState.numOfSub; ++i) {
|
||||||
SQueryInfo* pSub = taosArrayGetP(pQueryInfo->pUpstream, i);
|
SQueryInfo* pSub = taosArrayGetP(pQueryInfo->pUpstream, i);
|
||||||
|
@ -3858,45 +3914,69 @@ void executeQuery(SSqlObj* pSql, SQueryInfo* pQueryInfo) {
|
||||||
pSql->cmd.active = pSub;
|
pSql->cmd.active = pSub;
|
||||||
pSql->cmd.command = TSDB_SQL_SELECT;
|
pSql->cmd.command = TSDB_SQL_SELECT;
|
||||||
|
|
||||||
// TODO handle memory failure
|
|
||||||
SSqlObj* pNew = (SSqlObj*)calloc(1, sizeof(SSqlObj));
|
SSqlObj* pNew = (SSqlObj*)calloc(1, sizeof(SSqlObj));
|
||||||
if (pNew == NULL) {
|
if (pNew == NULL) {
|
||||||
terrno = TSDB_CODE_TSC_OUT_OF_MEMORY;
|
code = TSDB_CODE_TSC_OUT_OF_MEMORY;
|
||||||
// return NULL;
|
goto _error;
|
||||||
}
|
}
|
||||||
|
|
||||||
pNew->pTscObj = pSql->pTscObj;
|
pNew->pTscObj = pSql->pTscObj;
|
||||||
pNew->signature = pNew;
|
pNew->signature = pNew;
|
||||||
pNew->sqlstr = strdup(pSql->sqlstr); // todo refactor
|
pNew->sqlstr = strdup(pSql->sqlstr);
|
||||||
pNew->fp = tscSubqueryCompleteCallback;
|
pNew->fp = tscSubqueryCompleteCallback;
|
||||||
|
pNew->maxRetry = pSql->maxRetry;
|
||||||
tsem_init(&pNew->rspSem, 0, 0);
|
tsem_init(&pNew->rspSem, 0, 0);
|
||||||
|
|
||||||
SRetrieveSupport* ps = calloc(1, sizeof(SRetrieveSupport)); // todo use object id
|
SRetrieveSupport* ps = calloc(1, sizeof(SRetrieveSupport)); // todo use object id
|
||||||
|
if (ps == NULL) {
|
||||||
|
tscFreeSqlObj(pNew);
|
||||||
|
goto _error;
|
||||||
|
}
|
||||||
|
|
||||||
ps->pParentSql = pSql;
|
ps->pParentSql = pSql;
|
||||||
ps->subqueryIndex = i;
|
ps->subqueryIndex = i;
|
||||||
|
|
||||||
pNew->param = ps;
|
pNew->param = ps;
|
||||||
pSql->pSubs[i] = pNew;
|
pSql->pSubs[i] = pNew;
|
||||||
registerSqlObj(pNew);
|
|
||||||
|
|
||||||
SSqlCmd* pCmd = &pNew->cmd;
|
SSqlCmd* pCmd = &pNew->cmd;
|
||||||
pCmd->command = TSDB_SQL_SELECT;
|
pCmd->command = TSDB_SQL_SELECT;
|
||||||
if (tscAddQueryInfo(pCmd) != TSDB_CODE_SUCCESS) {
|
if ((code = tscAddQueryInfo(pCmd)) != TSDB_CODE_SUCCESS) {
|
||||||
|
goto _error;
|
||||||
}
|
}
|
||||||
|
|
||||||
SQueryInfo* pNewQueryInfo = tscGetQueryInfo(pCmd);
|
SQueryInfo* pNewQueryInfo = tscGetQueryInfo(pCmd);
|
||||||
tscQueryInfoCopy(pNewQueryInfo, pSub);
|
tscQueryInfoCopy(pNewQueryInfo, pSub);
|
||||||
|
numOfInit++;
|
||||||
// create sub query to handle the sub query.
|
}
|
||||||
executeQuery(pNew, pNewQueryInfo);
|
|
||||||
|
for(int32_t i = 0; i < pSql->subState.numOfSub; ++i) {
|
||||||
|
SSqlObj* psub = pSql->pSubs[i];
|
||||||
|
registerSqlObj(psub);
|
||||||
|
|
||||||
|
// create sub query to handle the sub query.
|
||||||
|
SQueryInfo* pq = tscGetQueryInfo(&psub->cmd);
|
||||||
|
executeQuery(psub, pq);
|
||||||
}
|
}
|
||||||
|
|
||||||
// merge sub query result and generate final results
|
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
pSql->cmd.active = pQueryInfo;
|
pSql->cmd.active = pQueryInfo;
|
||||||
doExecuteQuery(pSql, pQueryInfo);
|
doExecuteQuery(pSql, pQueryInfo);
|
||||||
|
return;
|
||||||
|
|
||||||
|
_error:
|
||||||
|
for(int32_t i = 0; i < numOfInit; ++i) {
|
||||||
|
SSqlObj* p = pSql->pSubs[i];
|
||||||
|
tscFreeSqlObj(p);
|
||||||
|
}
|
||||||
|
|
||||||
|
pSql->res.code = code;
|
||||||
|
pSql->subState.numOfSub = 0; // not initialized sub query object will not be freed
|
||||||
|
tfree(pSql->subState.states);
|
||||||
|
tfree(pSql->pSubs);
|
||||||
|
tscAsyncResultOnError(pSql);
|
||||||
}
|
}
|
||||||
|
|
||||||
int16_t tscGetJoinTagColIdByUid(STagCond* pTagCond, uint64_t uid) {
|
int16_t tscGetJoinTagColIdByUid(STagCond* pTagCond, uint64_t uid) {
|
||||||
|
|
Loading…
Reference in New Issue