|
|
|
@ -55,6 +55,72 @@ static void skipRemainValue(STSBuf* pTSBuf, tVariant* tag1) {
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
void tscSpinLock(int32_t *lock) {
|
|
|
|
|
int i = 0;
|
|
|
|
|
while (atomic_val_compare_exchange_32(lock, 0, 1) != 0) {
|
|
|
|
|
if (++i % 100 == 0) {
|
|
|
|
|
sched_yield();
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
void tscSpinUnlock(int32_t *lock) {
|
|
|
|
|
if (atomic_val_compare_exchange_32(lock, 1, 0) != 1) {
|
|
|
|
|
assert(false);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
static void subquerySetState(SSqlObj *pSql, SSubqueryState *subState, int idx, int8_t state) {
|
|
|
|
|
assert(idx < subState->numOfSub);
|
|
|
|
|
assert(subState->states);
|
|
|
|
|
|
|
|
|
|
tscSpinLock(&subState->subLock);
|
|
|
|
|
|
|
|
|
|
tscDebug("subquery:%p,%d state set to %d", pSql, idx, state);
|
|
|
|
|
|
|
|
|
|
subState->states[idx] = state;
|
|
|
|
|
|
|
|
|
|
tscSpinUnlock(&subState->subLock);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
static bool allSubqueryDone(SSubqueryState *subState) {
|
|
|
|
|
bool done = true;
|
|
|
|
|
|
|
|
|
|
//lock in caller
|
|
|
|
|
|
|
|
|
|
for (int i = 0; i < subState->numOfSub; i++) {
|
|
|
|
|
if (0 == subState->states[i]) {
|
|
|
|
|
tscDebug("subquery:%d is NOT finished, total:%d", i, subState->numOfSub);
|
|
|
|
|
done = false;
|
|
|
|
|
break;
|
|
|
|
|
} else {
|
|
|
|
|
tscDebug("subquery:%d is finished, total:%d", i, subState->numOfSub);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
return done;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
static bool subAndCheckDone(SSqlObj *pSql, SSubqueryState *subState, int idx) {
|
|
|
|
|
assert(idx < subState->numOfSub);
|
|
|
|
|
|
|
|
|
|
tscSpinLock(&subState->subLock);
|
|
|
|
|
|
|
|
|
|
tscDebug("subquery:%p,%d state set to 1", pSql, idx);
|
|
|
|
|
|
|
|
|
|
subState->states[idx] = 1;
|
|
|
|
|
|
|
|
|
|
bool done = allSubqueryDone(subState);
|
|
|
|
|
|
|
|
|
|
tscSpinUnlock(&subState->subLock);
|
|
|
|
|
|
|
|
|
|
return done;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
static int64_t doTSBlockIntersect(SSqlObj* pSql, SJoinSupporter* pSupporter1, SJoinSupporter* pSupporter2, STimeWindow * win) {
|
|
|
|
|
SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, pSql->cmd.clauseIndex);
|
|
|
|
|
|
|
|
|
@ -367,10 +433,6 @@ static int32_t tscLaunchRealSubqueries(SSqlObj* pSql) {
|
|
|
|
|
// scan all subquery, if one sub query has only ts, ignore it
|
|
|
|
|
tscDebug("%p start to launch secondary subqueries, %d out of %d needs to query", pSql, numOfSub, pSql->subState.numOfSub);
|
|
|
|
|
|
|
|
|
|
//the subqueries that do not actually launch the secondary query to virtual node is set as completed.
|
|
|
|
|
SSubqueryState* pState = &pSql->subState;
|
|
|
|
|
pState->numOfRemain = numOfSub;
|
|
|
|
|
|
|
|
|
|
bool success = true;
|
|
|
|
|
|
|
|
|
|
for (int32_t i = 0; i < pSql->subState.numOfSub; ++i) {
|
|
|
|
@ -403,6 +465,8 @@ static int32_t tscLaunchRealSubqueries(SSqlObj* pSql) {
|
|
|
|
|
success = false;
|
|
|
|
|
break;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
subquerySetState(pNew, &pSql->subState, i, 0);
|
|
|
|
|
|
|
|
|
|
tscClearSubqueryInfo(&pNew->cmd);
|
|
|
|
|
pSql->pSubs[i] = pNew;
|
|
|
|
@ -517,23 +581,23 @@ void freeJoinSubqueryObj(SSqlObj* pSql) {
|
|
|
|
|
SJoinSupporter* p = pSub->param;
|
|
|
|
|
tscDestroyJoinSupporter(p);
|
|
|
|
|
|
|
|
|
|
if (pSub->res.code == TSDB_CODE_SUCCESS) {
|
|
|
|
|
taos_free_result(pSub);
|
|
|
|
|
}
|
|
|
|
|
taos_free_result(pSub);
|
|
|
|
|
pSql->pSubs[i] = NULL;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
tfree(pSql->subState.states);
|
|
|
|
|
|
|
|
|
|
pSql->subState.numOfSub = 0;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
static void quitAllSubquery(SSqlObj* pSqlObj, SJoinSupporter* pSupporter) {
|
|
|
|
|
assert(pSqlObj->subState.numOfRemain > 0);
|
|
|
|
|
|
|
|
|
|
if (atomic_sub_fetch_32(&pSqlObj->subState.numOfRemain, 1) <= 0) {
|
|
|
|
|
tscError("%p all subquery return and query failed, global code:%s", pSqlObj, tstrerror(pSqlObj->res.code));
|
|
|
|
|
static void quitAllSubquery(SSqlObj* pSqlSub, SSqlObj* pSqlObj, SJoinSupporter* pSupporter) {
|
|
|
|
|
if (subAndCheckDone(pSqlSub, &pSqlObj->subState, pSupporter->subqueryIndex)) {
|
|
|
|
|
tscError("%p all subquery return and query failed, global code:%s", pSqlObj, tstrerror(pSqlObj->res.code));
|
|
|
|
|
freeJoinSubqueryObj(pSqlObj);
|
|
|
|
|
return;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
tscDestroyJoinSupporter(pSupporter);
|
|
|
|
|
//tscDestroyJoinSupporter(pSupporter);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// update the query time range according to the join results on timestamp
|
|
|
|
@ -785,7 +849,7 @@ static void tidTagRetrieveCallback(void* param, TAOS_RES* tres, int32_t numOfRow
|
|
|
|
|
tscError("%p sub query failed, code:%s, index:%d", pSql, tstrerror(numOfRows), pSupporter->subqueryIndex);
|
|
|
|
|
|
|
|
|
|
pParentSql->res.code = numOfRows;
|
|
|
|
|
quitAllSubquery(pParentSql, pSupporter);
|
|
|
|
|
quitAllSubquery(pSql, pParentSql, pSupporter);
|
|
|
|
|
|
|
|
|
|
tscAsyncResultOnError(pParentSql);
|
|
|
|
|
return;
|
|
|
|
@ -802,7 +866,7 @@ static void tidTagRetrieveCallback(void* param, TAOS_RES* tres, int32_t numOfRow
|
|
|
|
|
tscError("%p failed to malloc memory", pSql);
|
|
|
|
|
|
|
|
|
|
pParentSql->res.code = TAOS_SYSTEM_ERROR(errno);
|
|
|
|
|
quitAllSubquery(pParentSql, pSupporter);
|
|
|
|
|
quitAllSubquery(pSql, pParentSql, pSupporter);
|
|
|
|
|
|
|
|
|
|
tscAsyncResultOnError(pParentSql);
|
|
|
|
|
return;
|
|
|
|
@ -844,9 +908,10 @@ static void tidTagRetrieveCallback(void* param, TAOS_RES* tres, int32_t numOfRow
|
|
|
|
|
|
|
|
|
|
// no data exists in next vnode, mark the <tid, tags> query completed
|
|
|
|
|
// only when there is no subquery exits any more, proceeds to get the intersect of the <tid, tags> tuple sets.
|
|
|
|
|
if (atomic_sub_fetch_32(&pParentSql->subState.numOfRemain, 1) > 0) {
|
|
|
|
|
if (!subAndCheckDone(pSql, &pParentSql->subState, pSupporter->subqueryIndex)) {
|
|
|
|
|
tscDebug("%p tagRetrieve:%p,%d completed, total:%d", pParentSql, tres, pSupporter->subqueryIndex, pParentSql->subState.numOfSub);
|
|
|
|
|
return;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
SArray *s1 = NULL, *s2 = NULL;
|
|
|
|
|
int32_t code = getIntersectionOfTableTuple(pQueryInfo, pParentSql, &s1, &s2);
|
|
|
|
@ -891,7 +956,7 @@ static void tidTagRetrieveCallback(void* param, TAOS_RES* tres, int32_t numOfRow
|
|
|
|
|
((SJoinSupporter*)psub2->param)->pVgroupTables = tscVgroupTableInfoClone(pTableMetaInfo2->pVgroupTables);
|
|
|
|
|
|
|
|
|
|
pParentSql->subState.numOfSub = 2;
|
|
|
|
|
pParentSql->subState.numOfRemain = pParentSql->subState.numOfSub;
|
|
|
|
|
memset(pParentSql->subState.states, 0, sizeof(pParentSql->subState.states[0]) * pParentSql->subState.numOfSub);
|
|
|
|
|
|
|
|
|
|
for (int32_t m = 0; m < pParentSql->subState.numOfSub; ++m) {
|
|
|
|
|
SSqlObj* sub = pParentSql->pSubs[m];
|
|
|
|
@ -922,7 +987,7 @@ static void tsCompRetrieveCallback(void* param, TAOS_RES* tres, int32_t numOfRow
|
|
|
|
|
tscError("%p sub query failed, code:%s, index:%d", pSql, tstrerror(numOfRows), pSupporter->subqueryIndex);
|
|
|
|
|
|
|
|
|
|
pParentSql->res.code = numOfRows;
|
|
|
|
|
quitAllSubquery(pParentSql, pSupporter);
|
|
|
|
|
quitAllSubquery(pSql, pParentSql, pSupporter);
|
|
|
|
|
|
|
|
|
|
tscAsyncResultOnError(pParentSql);
|
|
|
|
|
return;
|
|
|
|
@ -937,7 +1002,7 @@ static void tsCompRetrieveCallback(void* param, TAOS_RES* tres, int32_t numOfRow
|
|
|
|
|
|
|
|
|
|
pParentSql->res.code = TAOS_SYSTEM_ERROR(errno);
|
|
|
|
|
|
|
|
|
|
quitAllSubquery(pParentSql, pSupporter);
|
|
|
|
|
quitAllSubquery(pSql, pParentSql, pSupporter);
|
|
|
|
|
|
|
|
|
|
tscAsyncResultOnError(pParentSql);
|
|
|
|
|
|
|
|
|
@ -955,7 +1020,7 @@ static void tsCompRetrieveCallback(void* param, TAOS_RES* tres, int32_t numOfRow
|
|
|
|
|
|
|
|
|
|
pParentSql->res.code = TAOS_SYSTEM_ERROR(errno);
|
|
|
|
|
|
|
|
|
|
quitAllSubquery(pParentSql, pSupporter);
|
|
|
|
|
quitAllSubquery(pSql, pParentSql, pSupporter);
|
|
|
|
|
|
|
|
|
|
tscAsyncResultOnError(pParentSql);
|
|
|
|
|
|
|
|
|
@ -1009,9 +1074,9 @@ static void tsCompRetrieveCallback(void* param, TAOS_RES* tres, int32_t numOfRow
|
|
|
|
|
return;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if (atomic_sub_fetch_32(&pParentSql->subState.numOfRemain, 1) > 0) {
|
|
|
|
|
if (!subAndCheckDone(pSql, &pParentSql->subState, pSupporter->subqueryIndex)) {
|
|
|
|
|
return;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
tscDebug("%p all subquery retrieve ts complete, do ts block intersect", pParentSql);
|
|
|
|
|
|
|
|
|
@ -1088,9 +1153,8 @@ static void joinRetrieveFinalResCallback(void* param, TAOS_RES* tres, int numOfR
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
assert(pState->numOfRemain > 0);
|
|
|
|
|
if (atomic_sub_fetch_32(&pState->numOfRemain, 1) > 0) {
|
|
|
|
|
tscDebug("%p sub:%p completed, remain:%d, total:%d", pParentSql, tres, pState->numOfRemain, pState->numOfSub);
|
|
|
|
|
if (!subAndCheckDone(pSql, pState, pSupporter->subqueryIndex)) {
|
|
|
|
|
tscDebug("%p sub:%p,%d completed, total:%d", pParentSql, tres, pSupporter->subqueryIndex, pState->numOfSub);
|
|
|
|
|
return;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
@ -1205,16 +1269,6 @@ void tscFetchDatablockForSubquery(SSqlObj* pSql) {
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// get the number of subquery that need to retrieve the next vnode.
|
|
|
|
|
if (orderedPrjQuery) {
|
|
|
|
|
for (int32_t i = 0; i < pSql->subState.numOfSub; ++i) {
|
|
|
|
|
SSqlObj* pSub = pSql->pSubs[i];
|
|
|
|
|
if (pSub != NULL && pSub->res.row >= pSub->res.numOfRows && pSub->res.completed) {
|
|
|
|
|
pSql->subState.numOfRemain++;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
for (int32_t i = 0; i < pSql->subState.numOfSub; ++i) {
|
|
|
|
|
SSqlObj* pSub = pSql->pSubs[i];
|
|
|
|
|
if (pSub == NULL) {
|
|
|
|
@ -1242,9 +1296,13 @@ void tscFetchDatablockForSubquery(SSqlObj* pSql) {
|
|
|
|
|
pSub->cmd.command = TSDB_SQL_SELECT;
|
|
|
|
|
pSub->fp = tscJoinQueryCallback;
|
|
|
|
|
|
|
|
|
|
subquerySetState(pSub, &pSql->subState, i, 0);
|
|
|
|
|
|
|
|
|
|
tscProcessSql(pSub);
|
|
|
|
|
tryNextVnode = true;
|
|
|
|
|
} else {
|
|
|
|
|
subquerySetState(pSub, &pSql->subState, i, 1);
|
|
|
|
|
|
|
|
|
|
tscDebug("%p no result in current subquery anymore", pSub);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
@ -1270,7 +1328,19 @@ void tscFetchDatablockForSubquery(SSqlObj* pSql) {
|
|
|
|
|
// retrieve data from current vnode.
|
|
|
|
|
tscDebug("%p retrieve data from %d subqueries", pSql, numOfFetch);
|
|
|
|
|
SJoinSupporter* pSupporter = NULL;
|
|
|
|
|
pSql->subState.numOfRemain = numOfFetch;
|
|
|
|
|
|
|
|
|
|
for (int32_t i = 0; i < pSql->subState.numOfSub; ++i) {
|
|
|
|
|
SSqlObj* pSql1 = pSql->pSubs[i];
|
|
|
|
|
if (pSql1 == NULL) {
|
|
|
|
|
continue;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
SSqlRes* pRes1 = &pSql1->res;
|
|
|
|
|
|
|
|
|
|
if (pRes1->row >= pRes1->numOfRows) {
|
|
|
|
|
subquerySetState(pSql1, &pSql->subState, i, 0);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
for (int32_t i = 0; i < pSql->subState.numOfSub; ++i) {
|
|
|
|
|
SSqlObj* pSql1 = pSql->pSubs[i];
|
|
|
|
@ -1370,7 +1440,7 @@ void tscJoinQueryCallback(void* param, TAOS_RES* tres, int code) {
|
|
|
|
|
// retrieve actual query results from vnode during the second stage join subquery
|
|
|
|
|
if (pParentSql->res.code != TSDB_CODE_SUCCESS) {
|
|
|
|
|
tscError("%p abort query due to other subquery failure. code:%d, global code:%d", pSql, code, pParentSql->res.code);
|
|
|
|
|
quitAllSubquery(pParentSql, pSupporter);
|
|
|
|
|
quitAllSubquery(pSql, pParentSql, pSupporter);
|
|
|
|
|
tscAsyncResultOnError(pParentSql);
|
|
|
|
|
|
|
|
|
|
return;
|
|
|
|
@ -1383,7 +1453,7 @@ void tscJoinQueryCallback(void* param, TAOS_RES* tres, int code) {
|
|
|
|
|
tscError("%p abort query, code:%s, global code:%s", pSql, tstrerror(code), tstrerror(pParentSql->res.code));
|
|
|
|
|
pParentSql->res.code = code;
|
|
|
|
|
|
|
|
|
|
quitAllSubquery(pParentSql, pSupporter);
|
|
|
|
|
quitAllSubquery(pSql, pParentSql, pSupporter);
|
|
|
|
|
tscAsyncResultOnError(pParentSql);
|
|
|
|
|
|
|
|
|
|
return;
|
|
|
|
@ -1410,9 +1480,9 @@ void tscJoinQueryCallback(void* param, TAOS_RES* tres, int code) {
|
|
|
|
|
|
|
|
|
|
// In case of consequence query from other vnode, do not wait for other query response here.
|
|
|
|
|
if (!(pTableMetaInfo->vgroupIndex > 0 && tscNonOrderedProjectionQueryOnSTable(pQueryInfo, 0))) {
|
|
|
|
|
if (atomic_sub_fetch_32(&pParentSql->subState.numOfRemain, 1) > 0) {
|
|
|
|
|
if (!subAndCheckDone(pSql, &pParentSql->subState, pSupporter->subqueryIndex)) {
|
|
|
|
|
return;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
tscSetupOutputColumnIndex(pParentSql);
|
|
|
|
@ -1424,6 +1494,9 @@ void tscJoinQueryCallback(void* param, TAOS_RES* tres, int code) {
|
|
|
|
|
if (pTableMetaInfo->vgroupIndex > 0 && tscNonOrderedProjectionQueryOnSTable(pQueryInfo, 0)) {
|
|
|
|
|
pSql->fp = joinRetrieveFinalResCallback; // continue retrieve data
|
|
|
|
|
pSql->cmd.command = TSDB_SQL_FETCH;
|
|
|
|
|
|
|
|
|
|
subquerySetState(pSql, &pParentSql->subState, pSupporter->subqueryIndex, 0);
|
|
|
|
|
|
|
|
|
|
tscProcessSql(pSql);
|
|
|
|
|
} else { // first retrieve from vnode during the secondary stage sub-query
|
|
|
|
|
// set the command flag must be after the semaphore been correctly set.
|
|
|
|
@ -1459,8 +1532,7 @@ int32_t tscCreateJoinSubquery(SSqlObj *pSql, int16_t tableIndex, SJoinSupporter
|
|
|
|
|
return TSDB_CODE_TSC_OUT_OF_MEMORY;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
pSql->pSubs[pSql->subState.numOfRemain++] = pNew;
|
|
|
|
|
assert(pSql->subState.numOfRemain <= pSql->subState.numOfSub);
|
|
|
|
|
pSql->pSubs[tableIndex] = pNew;
|
|
|
|
|
|
|
|
|
|
if (QUERY_IS_JOIN_QUERY(pQueryInfo->type)) {
|
|
|
|
|
addGroupInfoForSubquery(pSql, pNew, 0, tableIndex);
|
|
|
|
@ -1592,6 +1664,14 @@ void tscHandleMasterJoinQuery(SSqlObj* pSql) {
|
|
|
|
|
int32_t code = TSDB_CODE_SUCCESS;
|
|
|
|
|
pSql->subState.numOfSub = pQueryInfo->numOfTables;
|
|
|
|
|
|
|
|
|
|
if (pSql->subState.states == NULL) {
|
|
|
|
|
pSql->subState.states = calloc(pSql->subState.numOfSub, sizeof(*pSql->subState.states));
|
|
|
|
|
if (pSql->subState.states == NULL) {
|
|
|
|
|
code = TSDB_CODE_TSC_OUT_OF_MEMORY;
|
|
|
|
|
goto _error;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
bool hasEmptySub = false;
|
|
|
|
|
|
|
|
|
|
tscDebug("%p start subquery, total:%d", pSql, pQueryInfo->numOfTables);
|
|
|
|
@ -1627,7 +1707,7 @@ void tscHandleMasterJoinQuery(SSqlObj* pSql) {
|
|
|
|
|
for (int32_t i = 0; i < pSql->subState.numOfSub; ++i) {
|
|
|
|
|
SSqlObj* pSub = pSql->pSubs[i];
|
|
|
|
|
if ((code = tscProcessSql(pSub)) != TSDB_CODE_SUCCESS) {
|
|
|
|
|
pSql->subState.numOfRemain = i - 1; // the already sent request will continue and do not go to the error process routine
|
|
|
|
|
memset(pSql->subState.states + i, 1, sizeof(*pSql->subState.states) * (pSql->subState.numOfSub - i)); // the already sent request will continue and do not go to the error process routine
|
|
|
|
|
break;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
@ -1711,7 +1791,18 @@ int32_t tscHandleMasterSTableQuery(SSqlObj *pSql) {
|
|
|
|
|
return ret;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
pState->numOfRemain = pState->numOfSub;
|
|
|
|
|
if (pState->states == NULL) {
|
|
|
|
|
pState->states = calloc(pState->numOfSub, sizeof(*pState->states));
|
|
|
|
|
if (pState->states == NULL) {
|
|
|
|
|
pRes->code = TSDB_CODE_TSC_OUT_OF_MEMORY;
|
|
|
|
|
tscAsyncResultOnError(pSql);
|
|
|
|
|
tfree(pMemoryBuf);
|
|
|
|
|
return ret;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
memset(pState->states, 0, sizeof(*pState->states) * pState->numOfSub);
|
|
|
|
|
|
|
|
|
|
pRes->code = TSDB_CODE_SUCCESS;
|
|
|
|
|
|
|
|
|
|
int32_t i = 0;
|
|
|
|
@ -1860,7 +1951,6 @@ void tscHandleSubqueryError(SRetrieveSupport *trsupport, SSqlObj *pSql, int numO
|
|
|
|
|
assert(pSql != NULL);
|
|
|
|
|
|
|
|
|
|
SSubqueryState* pState = &pParentSql->subState;
|
|
|
|
|
assert(pState->numOfRemain <= pState->numOfSub && pState->numOfRemain >= 0);
|
|
|
|
|
|
|
|
|
|
// retrieved in subquery failed. OR query cancelled in retrieve phase.
|
|
|
|
|
if (taos_errno(pSql) == TSDB_CODE_SUCCESS && pParentSql->res.code != TSDB_CODE_SUCCESS) {
|
|
|
|
@ -1891,14 +1981,12 @@ void tscHandleSubqueryError(SRetrieveSupport *trsupport, SSqlObj *pSql, int numO
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
int32_t remain = -1;
|
|
|
|
|
if ((remain = atomic_sub_fetch_32(&pState->numOfRemain, 1)) > 0) {
|
|
|
|
|
tscDebug("%p sub:%p orderOfSub:%d freed, finished subqueries:%d", pParentSql, pSql, trsupport->subqueryIndex,
|
|
|
|
|
pState->numOfSub - remain);
|
|
|
|
|
if (!subAndCheckDone(pSql, pState, subqueryIndex)) {
|
|
|
|
|
tscDebug("%p sub:%p,%d freed, not finished, total:%d", pParentSql, pSql, trsupport->subqueryIndex, pState->numOfSub);
|
|
|
|
|
|
|
|
|
|
tscFreeRetrieveSup(pSql);
|
|
|
|
|
return;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// all subqueries are failed
|
|
|
|
|
tscError("%p retrieve from %d vnode(s) completed,code:%s.FAILED.", pParentSql, pState->numOfSub,
|
|
|
|
@ -1963,14 +2051,12 @@ static void tscAllDataRetrievedFromDnode(SRetrieveSupport *trsupport, SSqlObj* p
|
|
|
|
|
return;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
int32_t remain = -1;
|
|
|
|
|
if ((remain = atomic_sub_fetch_32(&pParentSql->subState.numOfRemain, 1)) > 0) {
|
|
|
|
|
tscDebug("%p sub:%p orderOfSub:%d freed, finished subqueries:%d", pParentSql, pSql, trsupport->subqueryIndex,
|
|
|
|
|
pState->numOfSub - remain);
|
|
|
|
|
if (!subAndCheckDone(pSql, &pParentSql->subState, idx)) {
|
|
|
|
|
tscDebug("%p sub:%p orderOfSub:%d freed, not finished", pParentSql, pSql, trsupport->subqueryIndex);
|
|
|
|
|
|
|
|
|
|
tscFreeRetrieveSup(pSql);
|
|
|
|
|
return;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// all sub-queries are returned, start to local merge process
|
|
|
|
|
pDesc->pColumnModel->capacity = trsupport->pExtMemBuffer[idx]->numOfElemsPerPage;
|
|
|
|
@ -2016,7 +2102,6 @@ static void tscRetrieveFromDnodeCallBack(void *param, TAOS_RES *tres, int numOfR
|
|
|
|
|
SSqlObj * pParentSql = trsupport->pParentSql;
|
|
|
|
|
|
|
|
|
|
SSubqueryState* pState = &pParentSql->subState;
|
|
|
|
|
assert(pState->numOfRemain <= pState->numOfSub && pState->numOfRemain >= 0);
|
|
|
|
|
|
|
|
|
|
STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(&pSql->cmd, 0, 0);
|
|
|
|
|
SVgroupInfo *pVgroup = &pTableMetaInfo->vgroupList->vgroups[0];
|
|
|
|
@ -2237,7 +2322,8 @@ static void multiVnodeInsertFinalize(void* param, TAOS_RES* tres, int numOfRows)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if (atomic_sub_fetch_32(&pParentObj->subState.numOfRemain, 1) > 0) {
|
|
|
|
|
if (!subAndCheckDone(tres, &pParentObj->subState, pSupporter->index)) {
|
|
|
|
|
tscDebug("%p insert:%p,%d completed, total:%d", pParentObj, tres, pSupporter->index, pParentObj->subState.numOfSub);
|
|
|
|
|
return;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
@ -2271,6 +2357,8 @@ static void multiVnodeInsertFinalize(void* param, TAOS_RES* tres, int numOfRows)
|
|
|
|
|
STableMetaInfo* pMasterTableMetaInfo = tscGetTableMetaInfoFromCmd(&pParentObj->cmd, pSql->cmd.clauseIndex, 0);
|
|
|
|
|
tscAddTableMetaInfo(pQueryInfo, pMasterTableMetaInfo->name, NULL, NULL, NULL, NULL);
|
|
|
|
|
|
|
|
|
|
subquerySetState(pSql, &pParentObj->subState, i, 0);
|
|
|
|
|
|
|
|
|
|
tscDebug("%p, failed sub:%d, %p", pParentObj, i, pSql);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
@ -2285,7 +2373,6 @@ static void multiVnodeInsertFinalize(void* param, TAOS_RES* tres, int numOfRows)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
pParentObj->cmd.parseFinished = false;
|
|
|
|
|
pParentObj->subState.numOfRemain = numOfFailed;
|
|
|
|
|
|
|
|
|
|
tscResetSqlCmdObj(&pParentObj->cmd);
|
|
|
|
|
|
|
|
|
@ -2361,7 +2448,16 @@ int32_t tscHandleMultivnodeInsert(SSqlObj *pSql) {
|
|
|
|
|
// the number of already initialized subqueries
|
|
|
|
|
int32_t numOfSub = 0;
|
|
|
|
|
|
|
|
|
|
pSql->subState.numOfRemain = pSql->subState.numOfSub;
|
|
|
|
|
if (pSql->subState.states == NULL) {
|
|
|
|
|
pSql->subState.states = calloc(pSql->subState.numOfSub, sizeof(*pSql->subState.states));
|
|
|
|
|
if (pSql->subState.states == NULL) {
|
|
|
|
|
pRes->code = TSDB_CODE_TSC_OUT_OF_MEMORY;
|
|
|
|
|
goto _error;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
memset(pSql->subState.states, 0, sizeof(*pSql->subState.states) * pSql->subState.numOfSub);
|
|
|
|
|
|
|
|
|
|
pSql->pSubs = calloc(pSql->subState.numOfSub, POINTER_BYTES);
|
|
|
|
|
if (pSql->pSubs == NULL) {
|
|
|
|
|
goto _error;
|
|
|
|
|