[td-1250]
This commit is contained in:
parent
580448e381
commit
9b75283859
|
@ -30,7 +30,7 @@ void tscJoinQueryCallback(void* param, TAOS_RES* tres, int code);
|
|||
|
||||
SJoinSupporter* tscCreateJoinSupporter(SSqlObj* pSql, SSubqueryState* pState, int32_t index);
|
||||
|
||||
int32_t tscHandleMasterJoinQuery(SSqlObj* pSql);
|
||||
void tscHandleMasterJoinQuery(SSqlObj* pSql);
|
||||
|
||||
int32_t tscHandleMasterSTableQuery(SSqlObj *pSql);
|
||||
|
||||
|
|
|
@ -1159,7 +1159,7 @@ static void tscRetrieveDataRes(void *param, TAOS_RES *tres, int code);
|
|||
static SSqlObj *tscCreateSTableSubquery(SSqlObj *pSql, SRetrieveSupport *trsupport, SSqlObj *prevSqlObj);
|
||||
|
||||
// TODO
|
||||
int32_t tscLaunchJoinSubquery(SSqlObj *pSql, int16_t tableIndex, SJoinSupporter *pSupporter) {
|
||||
int32_t tscCreateJoinSubquery(SSqlObj *pSql, int16_t tableIndex, SJoinSupporter *pSupporter) {
|
||||
SSqlCmd * pCmd = &pSql->cmd;
|
||||
SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
|
||||
|
||||
|
@ -1304,52 +1304,75 @@ int32_t tscLaunchJoinSubquery(SSqlObj *pSql, int16_t tableIndex, SJoinSupporter
|
|||
pNewQueryInfo->type |= TSDB_QUERY_TYPE_SUBQUERY;
|
||||
}
|
||||
|
||||
return tscProcessSql(pNew);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t tscHandleMasterJoinQuery(SSqlObj* pSql) {
|
||||
void tscHandleMasterJoinQuery(SSqlObj* pSql) {
|
||||
SSqlCmd* pCmd = &pSql->cmd;
|
||||
SSqlRes* pRes = &pSql->res;
|
||||
|
||||
SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
|
||||
assert((pQueryInfo->type & TSDB_QUERY_TYPE_SUBQUERY) == 0);
|
||||
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
|
||||
// todo add test
|
||||
SSubqueryState *pState = calloc(1, sizeof(SSubqueryState));
|
||||
if (pState == NULL) {
|
||||
pSql->res.code = TSDB_CODE_TSC_OUT_OF_MEMORY;
|
||||
return pSql->res.code;
|
||||
code = TSDB_CODE_TSC_OUT_OF_MEMORY;
|
||||
goto _error;
|
||||
}
|
||||
|
||||
pState->numOfTotal = pQueryInfo->numOfTables;
|
||||
pState->numOfRemain = pState->numOfTotal;
|
||||
|
||||
bool hasEmptySub = false;
|
||||
|
||||
tscDebug("%p start subquery, total:%d", pSql, pQueryInfo->numOfTables);
|
||||
for (int32_t i = 0; i < pQueryInfo->numOfTables; ++i) {
|
||||
SJoinSupporter *pSupporter = tscCreateJoinSupporter(pSql, pState, i);
|
||||
|
||||
if (pSupporter == NULL) { // failed to create support struct, abort current query
|
||||
tscError("%p tableIndex:%d, failed to allocate join support object, abort further query", pSql, i);
|
||||
pState->numOfRemain = i;
|
||||
pSql->res.code = TSDB_CODE_TSC_OUT_OF_MEMORY;
|
||||
if (0 == i) {
|
||||
taosTFree(pState);
|
||||
}
|
||||
return pSql->res.code;
|
||||
code = TSDB_CODE_TSC_OUT_OF_MEMORY;
|
||||
goto _error;
|
||||
}
|
||||
|
||||
int32_t code = tscLaunchJoinSubquery(pSql, i, pSupporter);
|
||||
code = tscCreateJoinSubquery(pSql, i, pSupporter);
|
||||
if (code != TSDB_CODE_SUCCESS) { // failed to create subquery object, quit query
|
||||
tscDestroyJoinSupporter(pSupporter);
|
||||
pSql->res.code = TSDB_CODE_TSC_OUT_OF_MEMORY;
|
||||
if (0 == i) {
|
||||
taosTFree(pState);
|
||||
goto _error;
|
||||
}
|
||||
|
||||
SSqlObj* pSub = pSql->pSubs[i];
|
||||
STableMetaInfo* pTableMetaInfo = tscGetTableMetaInfoFromCmd(&pSub->cmd, 0, 0);
|
||||
if (UTIL_TABLE_IS_SUPER_TABLE(pTableMetaInfo) && (pTableMetaInfo->vgroupList->numOfVgroups == 0)) {
|
||||
hasEmptySub = true;
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
pSql->cmd.command = (pSql->numOfSubs <= 0)? TSDB_SQL_RETRIEVE_EMPTY_RESULT:TSDB_SQL_TABLE_JOIN_RETRIEVE;
|
||||
if (hasEmptySub) { // at least one subquery is empty, do nothing and return
|
||||
freeJoinSubqueryObj(pSql);
|
||||
pSql->cmd.command = TSDB_SQL_RETRIEVE_EMPTY_RESULT;
|
||||
(*pSql->fp)(pSql->param, pSql, 0);
|
||||
} else {
|
||||
for (int32_t i = 0; i < pSql->numOfSubs; ++i) {
|
||||
SSqlObj* pSub = pSql->pSubs[i];
|
||||
if ((code = tscProcessSql(pSub)) != TSDB_CODE_SUCCESS) {
|
||||
pState->numOfRemain = i - 1; // the already sent reques will continue and do not go to the error process routine
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
pSql->cmd.command = TSDB_SQL_TABLE_JOIN_RETRIEVE;
|
||||
}
|
||||
|
||||
return;
|
||||
|
||||
_error:
|
||||
pRes->code = code;
|
||||
tscQueueAsyncRes(pSql);
|
||||
}
|
||||
|
||||
static void doCleanupSubqueries(SSqlObj *pSql, int32_t numOfSubs, SSubqueryState* pState) {
|
||||
|
|
|
@ -482,15 +482,31 @@ sql insert into um2 using m2 tags(9) values(1000001, 10)(2000000, 20);
|
|||
|
||||
sql_error select count(*) from m1,m2 where m1.a=m2.a and m1.ts=m2.ts;
|
||||
|
||||
#empty table join test, add for no result join test
|
||||
print ====> empty table/empty super-table join test, add for no result join test
|
||||
sql create database ux1;
|
||||
sql use ux1;
|
||||
sql create table m1(ts timestamp, k int) tags(a binary(12), b int);
|
||||
sql create table tm0 using m1 tags('abc', 1);
|
||||
sql create table m2(ts timestamp, k int) tags(a int, b binary(12));
|
||||
|
||||
sql select count(*) from m1, m2 where m1.ts=m2.ts and m1.b=m2.a;
|
||||
if $rows != 0 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
sql create table tm2 using m2 tags(2, 'abc');
|
||||
sql select count(*) from tm0, tm2 where tm0.ts=tm2.ts;
|
||||
sql select count(*) from m1, m2 where m1.ts=m2.ts and m1.b=m2.a
|
||||
if $rows != 0 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
sql select count(*) from m1, m2 where m1.ts=m2.ts and m1.b=m2.a;
|
||||
if $rows != 0 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
sql drop table tm2;
|
||||
sql select count(*) from m1, m2 where m1.ts=m2.ts and m1.b=m2.a;
|
||||
sql drop database ux1;
|
||||
|
||||
system sh/exec.sh -n dnode1 -s stop -x SIGINT
|
Loading…
Reference in New Issue