Merge pull request #2340 from taosdata/feature/query

Feature/query
This commit is contained in:
Shengliang Guan 2020-06-18 16:29:18 +08:00 committed by GitHub
commit c196808306
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 471 additions and 458 deletions

View File

@ -13,8 +13,8 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>. * along with this program. If not, see <http://www.gnu.org/licenses/>.
*/ */
#ifndef TDENGINE_TSCSECONARYMERGE_H #ifndef TDENGINE_TSCLOCALMERGE_H
#define TDENGINE_TSCSECONARYMERGE_H #define TDENGINE_TSCLOCALMERGE_H
#ifdef __cplusplus #ifdef __cplusplus
extern "C" { extern "C" {
@ -28,13 +28,6 @@ extern "C" {
#define MAX_NUM_OF_SUBQUERY_RETRY 3 #define MAX_NUM_OF_SUBQUERY_RETRY 3
/*
* @version 0.1
* @date 2018/01/05
* @author liaohj
* management of client-side reducer for metric query
*/
struct SQLFunctionCtx; struct SQLFunctionCtx;
typedef struct SLocalDataSource { typedef struct SLocalDataSource {
@ -60,7 +53,6 @@ typedef struct SLocalReducer {
char * prevRowOfInput; char * prevRowOfInput;
tFilePage * pResultBuf; tFilePage * pResultBuf;
int32_t nResultBufSize; int32_t nResultBufSize;
// char * pBufForInterpo; // intermediate buffer for interpolation
tFilePage * pTempBuffer; tFilePage * pTempBuffer;
struct SQLFunctionCtx *pCtx; struct SQLFunctionCtx *pCtx;
int32_t rowSize; // size of each intermediate result. int32_t rowSize; // size of each intermediate result.
@ -81,13 +73,8 @@ typedef struct SLocalReducer {
} SLocalReducer; } SLocalReducer;
typedef struct SSubqueryState { typedef struct SSubqueryState {
/* int32_t numOfRemain; // the number of remain unfinished subquery
* the number of completed retrieval subquery, once this value equals to numOfVnodes, int32_t numOfTotal; // the number of total sub-queries
* all retrieval are completed.Local merge is launched.
*/
int32_t numOfCompleted;
int32_t numOfTotal; // number of total sub-queries
int32_t code; // code from subqueries
uint64_t numOfRetrievedRows; // total number of points in this query uint64_t numOfRetrievedRows; // total number of points in this query
} SSubqueryState; } SSubqueryState;
@ -128,4 +115,4 @@ int32_t tscDoLocalMerge(SSqlObj *pSql);
} }
#endif #endif
#endif // TDENGINE_TSCSECONARYMERGE_H #endif // TDENGINE_TSCLOCALMERGE_H

View File

@ -26,11 +26,9 @@ extern "C" {
void tscFetchDatablockFromSubquery(SSqlObj* pSql); void tscFetchDatablockFromSubquery(SSqlObj* pSql);
void tscSetupOutputColumnIndex(SSqlObj* pSql); void tscSetupOutputColumnIndex(SSqlObj* pSql);
int32_t tscLaunchSecondPhaseSubqueries(SSqlObj* pSql);
void tscJoinQueryCallback(void* param, TAOS_RES* tres, int code); void tscJoinQueryCallback(void* param, TAOS_RES* tres, int code);
SJoinSupporter* tscCreateJoinSupporter(SSqlObj* pSql, SSubqueryState* pState, int32_t index); SJoinSupporter* tscCreateJoinSupporter(SSqlObj* pSql, SSubqueryState* pState, int32_t index);
void tscDestroyJoinSupporter(SJoinSupporter* pSupporter);
int32_t tscHandleMasterJoinQuery(SSqlObj* pSql); int32_t tscHandleMasterJoinQuery(SSqlObj* pSql);

View File

@ -64,7 +64,8 @@ typedef struct SJoinSupporter {
SSubqueryState* pState; SSubqueryState* pState;
SSqlObj* pObj; // parent SqlObj SSqlObj* pObj; // parent SqlObj
int32_t subqueryIndex; // index of sub query int32_t subqueryIndex; // index of sub query
int64_t interval; // interval time int64_t intervalTime; // interval time
int64_t slidingTime; // sliding time
SLimitVal limit; // limit info SLimitVal limit; // limit info
uint64_t uid; // query meter uid uint64_t uid; // query meter uid
SArray* colList; // previous query information, no need to use this attribute, and the corresponding attribution SArray* colList; // previous query information, no need to use this attribute, and the corresponding attribution

View File

@ -12,12 +12,12 @@
* You should have received a copy of the GNU Affero General Public License * You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>. * along with this program. If not, see <http://www.gnu.org/licenses/>.
*/ */
#include "os.h"
#include "tscSubquery.h" #include "tscSubquery.h"
#include <qast.h> #include "qast.h"
#include <tcompare.h> #include "tcompare.h"
#include <tschemautil.h> #include "tschemautil.h"
#include "os.h"
#include "qtsbuf.h" #include "qtsbuf.h"
#include "tscLog.h" #include "tscLog.h"
#include "tsclient.h" #include "tsclient.h"
@ -169,7 +169,8 @@ SJoinSupporter* tscCreateJoinSupporter(SSqlObj* pSql, SSubqueryState* pState, in
pSupporter->subqueryIndex = index; pSupporter->subqueryIndex = index;
SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, pSql->cmd.clauseIndex); SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, pSql->cmd.clauseIndex);
pSupporter->interval = pQueryInfo->intervalTime; pSupporter->intervalTime = pQueryInfo->intervalTime;
pSupporter->slidingTime = pQueryInfo->slidingTime;
pSupporter->limit = pQueryInfo->limit; pSupporter->limit = pQueryInfo->limit;
STableMetaInfo* pTableMetaInfo = tscGetTableMetaInfoFromCmd(&pSql->cmd, pSql->cmd.clauseIndex, index); STableMetaInfo* pTableMetaInfo = tscGetTableMetaInfoFromCmd(&pSql->cmd, pSql->cmd.clauseIndex, index);
@ -186,7 +187,7 @@ SJoinSupporter* tscCreateJoinSupporter(SSqlObj* pSql, SSubqueryState* pState, in
return pSupporter; return pSupporter;
} }
void tscDestroyJoinSupporter(SJoinSupporter* pSupporter) { static void tscDestroyJoinSupporter(SJoinSupporter* pSupporter) {
if (pSupporter == NULL) { if (pSupporter == NULL) {
return; return;
} }
@ -217,7 +218,7 @@ void tscDestroyJoinSupporter(SJoinSupporter* pSupporter) {
* primary timestamp column , the secondary query is not necessary * primary timestamp column , the secondary query is not necessary
* *
*/ */
bool needSecondaryQuery(SQueryInfo* pQueryInfo) { static UNUSED_FUNC bool needSecondaryQuery(SQueryInfo* pQueryInfo) {
size_t numOfCols = taosArrayGetSize(pQueryInfo->colList); size_t numOfCols = taosArrayGetSize(pQueryInfo->colList);
for (int32_t i = 0; i < numOfCols; ++i) { for (int32_t i = 0; i < numOfCols; ++i) {
@ -233,14 +234,11 @@ bool needSecondaryQuery(SQueryInfo* pQueryInfo) {
/* /*
* launch secondary stage query to fetch the result that contains timestamp in set * launch secondary stage query to fetch the result that contains timestamp in set
*/ */
int32_t tscLaunchSecondPhaseSubqueries(SSqlObj* pSql) { static int32_t tscLaunchSecondPhaseSubqueries(SSqlObj* pSql) {
int32_t numOfSub = 0; int32_t numOfSub = 0;
SJoinSupporter* pSupporter = NULL; SJoinSupporter* pSupporter = NULL;
/* //If the columns are not involved in the final select clause, the corresponding query will not be issued.
* If the columns are not involved in the final select clause,
* the corresponding query will not be issued.
*/
for (int32_t i = 0; i < pSql->numOfSubs; ++i) { for (int32_t i = 0; i < pSql->numOfSubs; ++i) {
pSupporter = pSql->pSubs[i]->param; pSupporter = pSql->pSubs[i]->param;
if (taosArrayGetSize(pSupporter->exprList) > 0) { if (taosArrayGetSize(pSupporter->exprList) > 0) {
@ -251,13 +249,12 @@ int32_t tscLaunchSecondPhaseSubqueries(SSqlObj* pSql) {
assert(numOfSub > 0); assert(numOfSub > 0);
// scan all subquery, if one sub query has only ts, ignore it // scan all subquery, if one sub query has only ts, ignore it
tscTrace("%p start to launch secondary subquery, total:%d, only:%d needs to query, others are not retrieve in " tscTrace("%p start to launch secondary subquery, total:%d, only:%d needs to query", pSql, pSql->numOfSubs, numOfSub);
"select clause", pSql, pSql->numOfSubs, numOfSub);
//the subqueries that do not actually launch the secondary query to virtual node is set as completed. //the subqueries that do not actually launch the secondary query to virtual node is set as completed.
SSubqueryState* pState = pSupporter->pState; SSubqueryState* pState = pSupporter->pState;
pState->numOfTotal = pSql->numOfSubs; pState->numOfTotal = pSql->numOfSubs;
pState->numOfCompleted = (pSql->numOfSubs - numOfSub); pState->numOfRemain = numOfSub;
bool success = true; bool success = true;
@ -301,7 +298,8 @@ int32_t tscLaunchSecondPhaseSubqueries(SSqlObj* pSql) {
// set the second stage sub query for join process // set the second stage sub query for join process
TSDB_QUERY_SET_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_JOIN_SEC_STAGE); TSDB_QUERY_SET_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_JOIN_SEC_STAGE);
pQueryInfo->intervalTime = pSupporter->interval; pQueryInfo->intervalTime = pSupporter->intervalTime;
pQueryInfo->slidingTime = pSupporter->slidingTime;
pQueryInfo->groupbyExpr = pSupporter->groupbyExpr; pQueryInfo->groupbyExpr = pSupporter->groupbyExpr;
tscTagCondCopy(&pQueryInfo->tagCond, &pSupporter->tagCond); tscTagCondCopy(&pQueryInfo->tagCond, &pSupporter->tagCond);
@ -375,6 +373,7 @@ int32_t tscLaunchSecondPhaseSubqueries(SSqlObj* pSql) {
if (pSql->pSubs[i] == NULL) { if (pSql->pSubs[i] == NULL) {
continue; continue;
} }
tscDoQuery(pSql->pSubs[i]); tscDoQuery(pSql->pSubs[i]);
} }
@ -405,11 +404,9 @@ void freeJoinSubqueryObj(SSqlObj* pSql) {
} }
static void quitAllSubquery(SSqlObj* pSqlObj, SJoinSupporter* pSupporter) { static void quitAllSubquery(SSqlObj* pSqlObj, SJoinSupporter* pSupporter) {
int32_t numOfTotal = pSupporter->pState->numOfTotal; assert(pSupporter->pState->numOfRemain > 0);
int32_t finished = atomic_add_fetch_32(&pSupporter->pState->numOfCompleted, 1);
pSqlObj->res.code = pSupporter->pState->code; if (atomic_sub_fetch_32(&pSupporter->pState->numOfRemain, 1) <= 0) {
if (finished >= numOfTotal) {
tscError("%p all subquery return and query failed, global code:%d", pSqlObj, pSqlObj->res.code); tscError("%p all subquery return and query failed, global code:%d", pSqlObj, pSqlObj->res.code);
freeJoinSubqueryObj(pSqlObj); freeJoinSubqueryObj(pSqlObj);
} }
@ -421,7 +418,7 @@ static void updateQueryTimeRange(SQueryInfo* pQueryInfo, STimeWindow* win) {
pQueryInfo->window = *win; pQueryInfo->window = *win;
} }
static void tSIntersectionAndLaunchSecQuery(SJoinSupporter* pSupporter, SSqlObj* pSql) { static UNUSED_FUNC void tSIntersectionAndLaunchSecQuery(SJoinSupporter* pSupporter, SSqlObj* pSql) {
SSqlObj* pParentSql = pSupporter->pObj; SSqlObj* pParentSql = pSupporter->pObj;
SQueryInfo* pParentQueryInfo = tscGetQueryInfoDetail(&pParentSql->cmd, pParentSql->cmd.clauseIndex); SQueryInfo* pParentQueryInfo = tscGetQueryInfoDetail(&pParentSql->cmd, pParentSql->cmd.clauseIndex);
@ -444,21 +441,6 @@ static void tSIntersectionAndLaunchSecQuery(SJoinSupporter* pSupporter, SSqlObj*
// } // }
// } // }
int32_t numOfTotal = pSupporter->pState->numOfTotal;
int32_t finished = atomic_add_fetch_32(&pSupporter->pState->numOfCompleted, 1);
if (finished >= numOfTotal) {
assert(finished == numOfTotal);
if (pSupporter->pState->code != TSDB_CODE_SUCCESS) {
tscTrace("%p sub:%p, numOfSub:%d, quit from further procedure due to other queries failure", pParentSql, pSql,
pSupporter->subqueryIndex);
freeJoinSubqueryObj(pParentSql);
return;
}
tscTrace("%p all subqueries retrieve ts complete, do ts block intersect", pParentSql);
SJoinSupporter* p1 = pParentSql->pSubs[0]->param; SJoinSupporter* p1 = pParentSql->pSubs[0]->param;
SJoinSupporter* p2 = pParentSql->pSubs[1]->param; SJoinSupporter* p2 = pParentSql->pSubs[1]->param;
@ -472,7 +454,6 @@ static void tSIntersectionAndLaunchSecQuery(SJoinSupporter* pSupporter, SSqlObj*
tscLaunchSecondPhaseSubqueries(pParentSql); tscLaunchSecondPhaseSubqueries(pParentSql);
} }
} }
}
int32_t tscCompareTidTags(const void* p1, const void* p2) { int32_t tscCompareTidTags(const void* p1, const void* p2) {
const STidTags* t1 = (const STidTags*) varDataVal(p1); const STidTags* t1 = (const STidTags*) varDataVal(p1);
@ -545,10 +526,12 @@ static void issueTSCompQuery(SSqlObj* pSql, SJoinSupporter* pSupporter, SSqlObj*
tscAddSpecialColumnForSelect(pQueryInfo, 0, TSDB_FUNC_TS_COMP, &index, &colSchema, TSDB_COL_NORMAL); tscAddSpecialColumnForSelect(pQueryInfo, 0, TSDB_FUNC_TS_COMP, &index, &colSchema, TSDB_COL_NORMAL);
// set the tags value for ts_comp function // set the tags value for ts_comp function
if (UTIL_TABLE_IS_SUPER_TABLE(pTableMetaInfo)) {
SSqlExpr *pExpr = tscSqlExprGet(pQueryInfo, 0); SSqlExpr *pExpr = tscSqlExprGet(pQueryInfo, 0);
int16_t tagColId = tscGetJoinTagColIdByUid(&pSupporter->tagCond, pTableMetaInfo->pTableMeta->uid); int16_t tagColId = tscGetJoinTagColIdByUid(&pSupporter->tagCond, pTableMetaInfo->pTableMeta->uid);
pExpr->param->i64Key = tagColId; pExpr->param->i64Key = tagColId;
pExpr->numOfParams = 1; pExpr->numOfParams = 1;
}
// add the filter tag column // add the filter tag column
if (pSupporter->colList != NULL) { if (pSupporter->colList != NULL) {
@ -575,7 +558,7 @@ static void issueTSCompQuery(SSqlObj* pSql, SJoinSupporter* pSupporter, SSqlObj*
tscProcessSql(pSql); tscProcessSql(pSql);
} }
static bool checkForIdenticalTagVal(SQueryInfo* pQueryInfo, SJoinSupporter* p1, void* pSql) { static bool checkForDuplicateTagVal(SQueryInfo* pQueryInfo, SJoinSupporter* p1, SSqlObj* pPSqlObj) {
STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0); STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
SSchema* pSchema = tscGetTableTagSchema(pTableMetaInfo->pTableMeta);// todo: tags mismatch, tags not completed SSchema* pSchema = tscGetTableTagSchema(pTableMetaInfo->pTableMeta);// todo: tags mismatch, tags not completed
@ -587,8 +570,8 @@ static bool checkForIdenticalTagVal(SQueryInfo* pQueryInfo, SJoinSupporter* p1,
STidTags* p = (STidTags*) varDataVal(p1->pIdTagList + i * p1->tagSize); STidTags* p = (STidTags*) varDataVal(p1->pIdTagList + i * p1->tagSize);
if (doCompare(prev->tag, p->tag, pColSchema->type, pColSchema->bytes) == 0) { if (doCompare(prev->tag, p->tag, pColSchema->type, pColSchema->bytes) == 0) {
tscError("%p join tags have same value for different table, free all sub SqlObj and quit", pSql); tscError("%p join tags have same value for different table, free all sub SqlObj and quit", pPSqlObj);
p1->pState->code = TSDB_CODE_QRY_DUP_JOIN_KEY; pPSqlObj->res.code = TSDB_CODE_QRY_DUP_JOIN_KEY;
return false; return false;
} }
} }
@ -596,8 +579,8 @@ static bool checkForIdenticalTagVal(SQueryInfo* pQueryInfo, SJoinSupporter* p1,
return true; return true;
} }
static void getIntersectionOfTagVal(SQueryInfo* pQueryInfo, SSqlObj* pParentSql, SArray** s1, SArray** s2) { static void getIntersectionOfTableTuple(SQueryInfo* pQueryInfo, SSqlObj* pParentSql, SArray** s1, SArray** s2) {
tscTrace("%p all subqueries retrieve tags complete, do tags match", pParentSql); tscTrace("%p all subqueries retrieve <tid, tags> complete, do tags match", pParentSql);
SJoinSupporter* p1 = pParentSql->pSubs[0]->param; SJoinSupporter* p1 = pParentSql->pSubs[0]->param;
SJoinSupporter* p2 = pParentSql->pSubs[1]->param; SJoinSupporter* p2 = pParentSql->pSubs[1]->param;
@ -613,7 +596,7 @@ static void getIntersectionOfTagVal(SQueryInfo* pQueryInfo, SSqlObj* pParentSql,
*s1 = taosArrayInit(p1->num, p1->tagSize); *s1 = taosArrayInit(p1->num, p1->tagSize);
*s2 = taosArrayInit(p2->num, p2->tagSize); *s2 = taosArrayInit(p2->num, p2->tagSize);
if (!(checkForIdenticalTagVal(pQueryInfo, p1, pParentSql) && checkForIdenticalTagVal(pQueryInfo, p2, pParentSql))) { if (!(checkForDuplicateTagVal(pQueryInfo, p1, pParentSql) && checkForDuplicateTagVal(pQueryInfo, p2, pParentSql))) {
freeJoinSubqueryObj(pParentSql); freeJoinSubqueryObj(pParentSql);
pParentSql->res.code = TSDB_CODE_QRY_DUP_JOIN_KEY; pParentSql->res.code = TSDB_CODE_QRY_DUP_JOIN_KEY;
tscQueueAsyncRes(pParentSql); tscQueueAsyncRes(pParentSql);
@ -642,7 +625,7 @@ static void getIntersectionOfTagVal(SQueryInfo* pQueryInfo, SSqlObj* pParentSql,
} }
} }
static void joinRetrieveCallback(void* param, TAOS_RES* tres, int numOfRows) { static void tidTagRetrieveCallback(void* param, TAOS_RES* tres, int32_t numOfRows) {
SJoinSupporter* pSupporter = (SJoinSupporter*)param; SJoinSupporter* pSupporter = (SJoinSupporter*)param;
SSqlObj* pParentSql = pSupporter->pObj; SSqlObj* pParentSql = pSupporter->pObj;
@ -652,25 +635,53 @@ static void joinRetrieveCallback(void* param, TAOS_RES* tres, int numOfRows) {
SSqlRes* pRes = &pSql->res; SSqlRes* pRes = &pSql->res;
SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex); SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
assert(TSDB_QUERY_HAS_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_TAG_FILTER_QUERY));
// response of tag retrieve // check for the error code firstly
if (TSDB_QUERY_HAS_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_TAG_FILTER_QUERY)) { if (taos_errno(pSql) != TSDB_CODE_SUCCESS) {
// todo handle error // todo retry if other subqueries are not failed
if (numOfRows == 0 || pRes->completed) { assert(numOfRows < 0 && numOfRows == taos_errno(pSql));
tscError("%p sub query failed, code:%s, index:%d", pSql, tstrerror(numOfRows), pSupporter->subqueryIndex);
pParentSql->res.code = numOfRows;
quitAllSubquery(pParentSql, pSupporter);
tscQueueAsyncRes(pParentSql);
return;
}
// keep the results in memory
if (numOfRows > 0) { if (numOfRows > 0) {
size_t validLen = pSupporter->tagSize * pRes->numOfRows; size_t validLen = pSupporter->tagSize * pRes->numOfRows;
size_t length = pSupporter->totalLen + validLen; size_t length = pSupporter->totalLen + validLen;
// todo handle memory error
char* tmp = realloc(pSupporter->pIdTagList, length); char* tmp = realloc(pSupporter->pIdTagList, length);
assert(tmp != NULL); if (tmp == NULL) {
tscError("%p failed to malloc memory", pSql);
pParentSql->res.code = TAOS_SYSTEM_ERROR(errno);
quitAllSubquery(pParentSql, pSupporter);
tscQueueAsyncRes(pParentSql);
return;
}
pSupporter->pIdTagList = tmp; pSupporter->pIdTagList = tmp;
memcpy(pSupporter->pIdTagList + pSupporter->totalLen, pRes->data, validLen); memcpy(pSupporter->pIdTagList + pSupporter->totalLen, pRes->data, validLen);
pSupporter->totalLen += validLen; pSupporter->totalLen += validLen;
pSupporter->num += pRes->numOfRows; pSupporter->num += pRes->numOfRows;
// query not completed, continue to retrieve tid + tag tuples
if (!pRes->completed) {
taos_fetch_rows_a(tres, tidTagRetrieveCallback, param);
return;
}
} }
// data in current vnode has all returned to client, try next vnode if exits
// <tid + tag> tuples have been retrieved to client, try <tid + tag> tuples from the next vnode // <tid + tag> tuples have been retrieved to client, try <tid + tag> tuples from the next vnode
if (hasMoreVnodesToTry(pSql)) { if (hasMoreVnodesToTry(pSql)) {
STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0); STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
@ -680,8 +691,7 @@ static void joinRetrieveCallback(void* param, TAOS_RES* tres, int numOfRows) {
assert(pTableMetaInfo->vgroupIndex < totalVgroups); assert(pTableMetaInfo->vgroupIndex < totalVgroups);
tscTrace("%p tid_tag from vgroup index:%d completed, try next vgroup:%d. total vgroups:%d. current numOfRes:%d", tscTrace("%p tid_tag from vgroup index:%d completed, try next vgroup:%d. total vgroups:%d. current numOfRes:%d",
pSql, pTableMetaInfo->vgroupIndex - 1, pTableMetaInfo->vgroupIndex, totalVgroups, pSql, pTableMetaInfo->vgroupIndex - 1, pTableMetaInfo->vgroupIndex, totalVgroups, pSupporter->num);
pSupporter->num);
pCmd->command = TSDB_SQL_SELECT; pCmd->command = TSDB_SQL_SELECT;
tscResetForNextRetrieve(&pSql->res); tscResetForNextRetrieve(&pSql->res);
@ -692,23 +702,21 @@ static void joinRetrieveCallback(void* param, TAOS_RES* tres, int numOfRows) {
return; return;
} }
int32_t numOfTotal = pSupporter->pState->numOfTotal; // no data exists in next vnode, mark the <tid, tags> query completed
int32_t finished = atomic_add_fetch_32(&pSupporter->pState->numOfCompleted, 1); // only when there is no subquery exits any more, proceeds to get the intersect of the <tid, tags> tuple sets.
if (finished < numOfTotal) { if (atomic_sub_fetch_32(&pSupporter->pState->numOfRemain, 1) > 0) {
return; return;
} }
// all subquery are returned, start to compare the tags
assert(finished == numOfTotal);
SArray *s1 = NULL, *s2 = NULL; SArray *s1 = NULL, *s2 = NULL;
getIntersectionOfTagVal(pQueryInfo, pParentSql, &s1, &s2); getIntersectionOfTableTuple(pQueryInfo, pParentSql, &s1, &s2);
if (taosArrayGetSize(s1) == 0 || taosArrayGetSize(s2) == 0) { // no results,return. if (taosArrayGetSize(s1) == 0 || taosArrayGetSize(s2) == 0) { // no results,return.
tscTrace("%p free all sub SqlObj and quit", pParentSql); tscTrace("%p free all sub SqlObj and quit", pParentSql);
freeJoinSubqueryObj(pParentSql); freeJoinSubqueryObj(pParentSql);
return; return;
} else { }
// proceed to for ts_comp query
SSqlCmd* pSubCmd1 = &pParentSql->pSubs[0]->cmd; SSqlCmd* pSubCmd1 = &pParentSql->pSubs[0]->cmd;
SSqlCmd* pSubCmd2 = &pParentSql->pSubs[1]->cmd; SSqlCmd* pSubCmd2 = &pParentSql->pSubs[1]->cmd;
@ -720,50 +728,37 @@ static void joinRetrieveCallback(void* param, TAOS_RES* tres, int numOfRows) {
STableMetaInfo* pTableMetaInfo2 = tscGetMetaInfo(pQueryInfo2, 0); STableMetaInfo* pTableMetaInfo2 = tscGetMetaInfo(pQueryInfo2, 0);
tscBuildVgroupTableInfo(pParentSql, pTableMetaInfo2, s2); tscBuildVgroupTableInfo(pParentSql, pTableMetaInfo2, s2);
pSupporter->pState->numOfCompleted = 0;
pSupporter->pState->code = 0;
pSupporter->pState->numOfTotal = 2; pSupporter->pState->numOfTotal = 2;
pSupporter->pState->numOfRemain = pSupporter->pState->numOfTotal;
for (int32_t m = 0; m < pParentSql->numOfSubs; ++m) { for (int32_t m = 0; m < pParentSql->numOfSubs; ++m) {
SSqlObj* psub = pParentSql->pSubs[m]; SSqlObj* sub = pParentSql->pSubs[m];
issueTSCompQuery(psub, psub->param, pParentSql); issueTSCompQuery(sub, sub->param, pParentSql);
} }
} }
} else { static void tsCompRetrieveCallback(void* param, TAOS_RES* tres, int32_t numOfRows) {
if (numOfRows < 0) { // error SJoinSupporter* pSupporter = (SJoinSupporter*)param;
pSupporter->pState->code = numOfRows;
quitAllSubquery(pParentSql, pSupporter); SSqlObj* pParentSql = pSupporter->pObj;
SSqlObj* pSql = (SSqlObj*)tres;
SSqlCmd* pCmd = &pSql->cmd;
SSqlRes* pRes = &pSql->res;
SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
assert(!TSDB_QUERY_HAS_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_JOIN_SEC_STAGE));
// check for the error code firstly
if (taos_errno(pSql) != TSDB_CODE_SUCCESS) {
// todo retry if other subqueries are not failed yet
assert(numOfRows < 0 && numOfRows == taos_errno(pSql));
tscError("%p sub query failed, code:%s, index:%d", pSql, tstrerror(numOfRows), pSupporter->subqueryIndex);
pParentSql->res.code = numOfRows; pParentSql->res.code = numOfRows;
tscQueueAsyncRes(pParentSql);
return;
}
size_t length = pSupporter->totalLen + pRes->rspLen;
assert(length > 0);
char* tmp = realloc(pSupporter->pIdTagList, length);
assert(tmp != NULL);
pSupporter->pIdTagList = tmp;
memcpy(pSupporter->pIdTagList, pRes->data, pRes->rspLen);
pSupporter->totalLen += pRes->rspLen;
pSupporter->num += pRes->numOfRows;
// continue retrieve data from vnode
taos_fetch_rows_a(tres, joinRetrieveCallback, param);
}
return;
}
if (!TSDB_QUERY_HAS_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_JOIN_SEC_STAGE)) {
if (numOfRows < 0) {
tscError("%p sub query failed, code:%s, index:%d", pSql, tstrerror(numOfRows), pSupporter->subqueryIndex);
pSupporter->pState->code = numOfRows;
quitAllSubquery(pParentSql, pSupporter); quitAllSubquery(pParentSql, pSupporter);
tscQueueAsyncRes(pParentSql);
return; return;
} }
@ -776,8 +771,9 @@ static void joinRetrieveCallback(void* param, TAOS_RES* tres, int numOfRows) {
if (pBuf == NULL) { // in error process, close the fd if (pBuf == NULL) { // in error process, close the fd
tscError("%p invalid ts comp file from vnode, abort subquery, file size:%d", pSql, numOfRows); tscError("%p invalid ts comp file from vnode, abort subquery, file size:%d", pSql, numOfRows);
pSupporter->pState->code = TSDB_CODE_TSC_APP_ERROR; // todo set the informative code pParentSql->res.code = TAOS_SYSTEM_ERROR(errno);
quitAllSubquery(pParentSql, pSupporter); tscQueueAsyncRes(pParentSql);
return; return;
} }
@ -791,9 +787,18 @@ static void joinRetrieveCallback(void* param, TAOS_RES* tres, int numOfRows) {
tsBufMerge(pSupporter->pTSBuf, pBuf, pTableMetaInfo->vgroupIndex); tsBufMerge(pSupporter->pTSBuf, pBuf, pTableMetaInfo->vgroupIndex);
tsBufDestory(pBuf); tsBufDestory(pBuf);
} }
// continue to retrieve ts-comp data from vnode
if (!pRes->completed) {
getTmpfilePath("ts-join", pSupporter->path);
pSupporter->f = fopen(pSupporter->path, "w");
pRes->row = pRes->numOfRows;
taos_fetch_rows_a(tres, tsCompRetrieveCallback, param);
return;
}
} }
if (pRes->completed) {
if (hasMoreVnodesToTry(pSql)) { if (hasMoreVnodesToTry(pSql)) {
STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0); STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
@ -810,6 +815,8 @@ static void joinRetrieveCallback(void* param, TAOS_RES* tres, int numOfRows) {
assert(pSupporter->f == NULL); assert(pSupporter->f == NULL);
getTmpfilePath("ts-join", pSupporter->path); getTmpfilePath("ts-join", pSupporter->path);
// TODO check for failure
pSupporter->f = fopen(pSupporter->path, "w"); pSupporter->f = fopen(pSupporter->path, "w");
pRes->row = pRes->numOfRows; pRes->row = pRes->numOfRows;
@ -817,21 +824,51 @@ static void joinRetrieveCallback(void* param, TAOS_RES* tres, int numOfRows) {
pSql->fp = tscJoinQueryCallback; pSql->fp = tscJoinQueryCallback;
tscProcessSql(pSql); tscProcessSql(pSql);
return; return;
} else {
tSIntersectionAndLaunchSecQuery(pSupporter, pSql);
} }
} else { // open a new file to save the incoming result if (atomic_sub_fetch_32(&pSupporter->pState->numOfRemain, 1) > 0) {
getTmpfilePath("ts-join", pSupporter->path); return;
pSupporter->f = fopen(pSupporter->path, "w");
pRes->row = pRes->numOfRows;
taos_fetch_rows_a(tres, joinRetrieveCallback, param);
} }
} else { // secondary stage retrieve, driven by taos_fetch_row or other functions
if (numOfRows < 0) { tscTrace("%p all subquery retrieve ts complete, do ts block intersect", pParentSql);
pSupporter->pState->code = numOfRows;
tscError("%p retrieve failed, code:%s, index:%d", pSql, tstrerror(numOfRows), pSupporter->subqueryIndex); // proceeds to launched secondary query to retrieve final data
SJoinSupporter* p1 = pParentSql->pSubs[0]->param;
SJoinSupporter* p2 = pParentSql->pSubs[1]->param;
STimeWindow win = TSWINDOW_INITIALIZER;
int64_t num = doTSBlockIntersect(pParentSql, p1, p2, &win);
if (num <= 0) { // no result during ts intersect
tscTrace("%p no results generated in ts intersection, free all sub SqlObj and quit", pParentSql);
freeJoinSubqueryObj(pParentSql);
return;
}
// launch the query the retrieve actual results from vnode along with the filtered timestamp
SQueryInfo* pPQueryInfo = tscGetQueryInfoDetail(&pParentSql->cmd, pParentSql->cmd.clauseIndex);
updateQueryTimeRange(pPQueryInfo, &win);
tscLaunchSecondPhaseSubqueries(pParentSql);
}
static void joinRetrieveFinalResCallback(void* param, TAOS_RES* tres, int numOfRows) {
SJoinSupporter* pSupporter = (SJoinSupporter*)param;
SSqlObj* pParentSql = pSupporter->pObj;
SSubqueryState* pState = pSupporter->pState;
SSqlObj* pSql = (SSqlObj*)tres;
SSqlCmd* pCmd = &pSql->cmd;
SSqlRes* pRes = &pSql->res;
SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
// TODO put to async res?
if (taos_errno(pSql) != TSDB_CODE_SUCCESS) {
assert(numOfRows == taos_errno(pSql));
pParentSql->res.code = numOfRows;
tscError("%p retrieve failed, index:%d, code:%s", pSql, pSupporter->subqueryIndex, tstrerror(numOfRows));
} }
if (numOfRows >= 0) { if (numOfRows >= 0) {
@ -844,8 +881,8 @@ static void joinRetrieveCallback(void* param, TAOS_RES* tres, int numOfRows) {
// for projection query, need to try next vnode if current vnode is exhausted // for projection query, need to try next vnode if current vnode is exhausted
if ((++pTableMetaInfo->vgroupIndex) < pTableMetaInfo->vgroupList->numOfVgroups) { if ((++pTableMetaInfo->vgroupIndex) < pTableMetaInfo->vgroupList->numOfVgroups) {
pSupporter->pState->numOfCompleted = 0; pState->numOfRemain = 1;
pSupporter->pState->numOfTotal = 1; pState->numOfTotal = 1;
pSql->cmd.command = TSDB_SQL_SELECT; pSql->cmd.command = TSDB_SQL_SELECT;
pSql->fp = tscJoinQueryCallback; pSql->fp = tscJoinQueryCallback;
@ -855,16 +892,14 @@ static void joinRetrieveCallback(void* param, TAOS_RES* tres, int numOfRows) {
} }
} }
int32_t numOfTotal = pSupporter->pState->numOfTotal; if (atomic_sub_fetch_32(&pState->numOfRemain, 1) > 0) {
int32_t finished = atomic_add_fetch_32(&pSupporter->pState->numOfCompleted, 1); tscTrace("%p sub:%p completed, remain:%d, total:%d", pParentSql, tres, pState->numOfRemain, pState->numOfTotal);
return;
}
if (finished >= numOfTotal) { tscTrace("%p all %d secondary subqueries retrieval completed, code:%d", tres, pState->numOfTotal, pParentSql->res.code);
assert(finished == numOfTotal);
tscTrace("%p all %d secondary subquery retrieves completed, global code:%d", tres, numOfTotal,
pParentSql->res.code);
if (pSupporter->pState->code != TSDB_CODE_SUCCESS) { if (pParentSql->res.code != TSDB_CODE_SUCCESS) {
pParentSql->res.code = pSupporter->pState->code;
freeJoinSubqueryObj(pParentSql); freeJoinSubqueryObj(pParentSql);
pParentSql->res.completed = true; pParentSql->res.completed = true;
} }
@ -881,10 +916,6 @@ static void joinRetrieveCallback(void* param, TAOS_RES* tres, int numOfRows) {
// data has retrieved to client, build the join results // data has retrieved to client, build the join results
tscBuildResFromSubqueries(pParentSql); tscBuildResFromSubqueries(pParentSql);
} else {
tscTrace("%p sub:%p completed, completed:%d, total:%d", pParentSql, tres, finished, numOfTotal);
}
}
} }
static SJoinSupporter* tscUpdateSubqueryStatus(SSqlObj* pSql, int32_t numOfFetch) { static SJoinSupporter* tscUpdateSubqueryStatus(SSqlObj* pSql, int32_t numOfFetch) {
@ -902,7 +933,7 @@ static SJoinSupporter* tscUpdateSubqueryStatus(SSqlObj* pSql, int32_t numOfFetch
} }
pState->numOfTotal = pSql->numOfSubs; pState->numOfTotal = pSql->numOfSubs;
pState->numOfCompleted = pSql->numOfSubs - numOfFetch; pState->numOfRemain = numOfFetch;
return pSupporter; return pSupporter;
} }
@ -990,7 +1021,7 @@ void tscFetchDatablockFromSubquery(SSqlObj* pSql) {
pSupporter->subqueryIndex, pTableMetaInfo->vgroupIndex); pSupporter->subqueryIndex, pTableMetaInfo->vgroupIndex);
tscResetForNextRetrieve(pRes1); tscResetForNextRetrieve(pRes1);
pSql1->fp = joinRetrieveCallback; pSql1->fp = joinRetrieveFinalResCallback;
if (pCmd1->command < TSDB_SQL_LOCAL) { if (pCmd1->command < TSDB_SQL_LOCAL) {
pCmd1->command = (pCmd1->command > TSDB_SQL_MGMT) ? TSDB_SQL_RETRIEVE : TSDB_SQL_FETCH; pCmd1->command = (pCmd1->command > TSDB_SQL_MGMT) ? TSDB_SQL_RETRIEVE : TSDB_SQL_FETCH;
@ -1008,8 +1039,9 @@ void tscSetupOutputColumnIndex(SSqlObj* pSql) {
tscTrace("%p all subquery response, retrieve data", pSql); tscTrace("%p all subquery response, retrieve data", pSql);
// the column transfer support struct has been built
if (pRes->pColumnIndex != NULL) { if (pRes->pColumnIndex != NULL) {
return; // the column transfer support struct has been built return;
} }
SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex); SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
@ -1047,43 +1079,54 @@ void tscJoinQueryCallback(void* param, TAOS_RES* tres, int code) {
SSqlObj* pSql = (SSqlObj*)tres; SSqlObj* pSql = (SSqlObj*)tres;
SJoinSupporter* pSupporter = (SJoinSupporter*)param; SJoinSupporter* pSupporter = (SJoinSupporter*)param;
SSqlObj* pParentSql = pSupporter->pObj;
// There is only one subquery and table for each subquery. // There is only one subquery and table for each subquery.
SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0); SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0);
assert(pQueryInfo->numOfTables == 1 && pSql->cmd.numOfClause == 1); assert(pQueryInfo->numOfTables == 1 && pSql->cmd.numOfClause == 1);
if (!TSDB_QUERY_HAS_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_JOIN_SEC_STAGE)) { // retrieve actual query results from vnode during the second stage join subquery
if (code != TSDB_CODE_SUCCESS) { // direct call joinRetrieveCallback and set the error code if (pParentSql->res.code != TSDB_CODE_SUCCESS) {
joinRetrieveCallback(param, pSql, code); tscError("%p abort query due to other subquery failure. code:%d, global code:%d", pSql, code, pParentSql->res.code);
} else { // first stage query, continue to retrieve compressed time stamp data
pSql->fp = joinRetrieveCallback;
pSql->cmd.command = TSDB_SQL_FETCH;
tscProcessSql(pSql);
}
} else { // second stage join subquery
SSqlObj* pParentSql = pSupporter->pObj;
if (pSupporter->pState->code != TSDB_CODE_SUCCESS) {
tscError("%p abort query due to other subquery failure. code:%d, global code:%d", pSql, code,
pSupporter->pState->code);
quitAllSubquery(pParentSql, pSupporter); quitAllSubquery(pParentSql, pSupporter);
tscQueueAsyncRes(pParentSql);
return; return;
} }
if (code != TSDB_CODE_SUCCESS) { // TODO here retry is required, not directly returns to client
tscError("%p sub query failed, code:%s, set global code:%s, index:%d", pSql, tstrerror(code), tstrerror(code), if (taos_errno(pSql) != TSDB_CODE_SUCCESS) {
pSupporter->subqueryIndex); assert(taos_errno(pSql) == code);
tscError("%p abort query, code:%d, global code:%d", pSql, code, pParentSql->res.code);
pParentSql->res.code = code;
pSupporter->pState->code = code;
quitAllSubquery(pParentSql, pSupporter); quitAllSubquery(pParentSql, pSupporter);
} else { tscQueueAsyncRes(pParentSql);
int32_t numOfTotal = pSupporter->pState->numOfTotal;
int32_t finished = atomic_add_fetch_32(&pSupporter->pState->numOfCompleted, 1);
if (finished >= numOfTotal) { return;
assert(finished == numOfTotal); }
// retrieve <tid, tag> tuples from vnode
if (TSDB_QUERY_HAS_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_TAG_FILTER_QUERY)) {
pSql->fp = tidTagRetrieveCallback;
pSql->cmd.command = TSDB_SQL_FETCH;
tscProcessSql(pSql);
return;
}
// retrieve ts_comp info from vnode
if (!TSDB_QUERY_HAS_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_JOIN_SEC_STAGE)) {
pSql->fp = tsCompRetrieveCallback;
pSql->cmd.command = TSDB_SQL_FETCH;
tscProcessSql(pSql);
return;
}
// wait for the other subqueries response from vnode
if (atomic_sub_fetch_32(&pSupporter->pState->numOfRemain, 1) > 0) {
return;
}
tscSetupOutputColumnIndex(pParentSql); tscSetupOutputColumnIndex(pParentSql);
STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0); STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
@ -1093,9 +1136,9 @@ void tscJoinQueryCallback(void* param, TAOS_RES* tres, int code) {
* data instead of returning to its invoker * data instead of returning to its invoker
*/ */
if (pTableMetaInfo->vgroupIndex > 0 && tscNonOrderedProjectionQueryOnSTable(pQueryInfo, 0)) { if (pTableMetaInfo->vgroupIndex > 0 && tscNonOrderedProjectionQueryOnSTable(pQueryInfo, 0)) {
pSupporter->pState->numOfCompleted = 0; // reset the record value pSupporter->pState->numOfRemain = pSupporter->pState->numOfTotal; // reset the record value
pSql->fp = joinRetrieveCallback; // continue retrieve data pSql->fp = joinRetrieveFinalResCallback; // continue retrieve data
pSql->cmd.command = TSDB_SQL_FETCH; pSql->cmd.command = TSDB_SQL_FETCH;
tscProcessSql(pSql); tscProcessSql(pSql);
} else { // first retrieve from vnode during the secondary stage sub-query } else { // first retrieve from vnode during the secondary stage sub-query
@ -1107,9 +1150,6 @@ void tscJoinQueryCallback(void* param, TAOS_RES* tres, int code) {
} }
} }
} }
}
}
}
///////////////////////////////////////////////////////////////////////////////////////// /////////////////////////////////////////////////////////////////////////////////////////
static void tscRetrieveDataRes(void *param, TAOS_RES *tres, int code); static void tscRetrieveDataRes(void *param, TAOS_RES *tres, int code);
@ -1257,6 +1297,7 @@ int32_t tscHandleMasterJoinQuery(SSqlObj* pSql) {
SSubqueryState *pState = calloc(1, sizeof(SSubqueryState)); SSubqueryState *pState = calloc(1, sizeof(SSubqueryState));
pState->numOfTotal = pQueryInfo->numOfTables; pState->numOfTotal = pQueryInfo->numOfTables;
pState->numOfRemain = pState->numOfTotal;
tscTrace("%p start launch subquery, total:%d", pSql, pQueryInfo->numOfTables); tscTrace("%p start launch subquery, total:%d", pSql, pQueryInfo->numOfTables);
for (int32_t i = 0; i < pQueryInfo->numOfTables; ++i) { for (int32_t i = 0; i < pQueryInfo->numOfTables; ++i) {
@ -1264,7 +1305,7 @@ int32_t tscHandleMasterJoinQuery(SSqlObj* pSql) {
if (pSupporter == NULL) { // failed to create support struct, abort current query if (pSupporter == NULL) { // failed to create support struct, abort current query
tscError("%p tableIndex:%d, failed to allocate join support object, abort further query", pSql, i); tscError("%p tableIndex:%d, failed to allocate join support object, abort further query", pSql, i);
pState->numOfCompleted = pQueryInfo->numOfTables - i - 1; pState->numOfRemain = i;
pSql->res.code = TSDB_CODE_TSC_OUT_OF_MEMORY; pSql->res.code = TSDB_CODE_TSC_OUT_OF_MEMORY;
return pSql->res.code; return pSql->res.code;
@ -1342,6 +1383,8 @@ int32_t tscHandleMasterSTableQuery(SSqlObj *pSql) {
tscTrace("%p retrieved query data from %d vnode(s)", pSql, pSql->numOfSubs); tscTrace("%p retrieved query data from %d vnode(s)", pSql, pSql->numOfSubs);
SSubqueryState *pState = calloc(1, sizeof(SSubqueryState)); SSubqueryState *pState = calloc(1, sizeof(SSubqueryState));
pState->numOfTotal = pSql->numOfSubs; pState->numOfTotal = pSql->numOfSubs;
pState->numOfRemain = pSql->numOfSubs;
pRes->code = TSDB_CODE_SUCCESS; pRes->code = TSDB_CODE_SUCCESS;
int32_t i = 0; int32_t i = 0;
@ -1436,7 +1479,7 @@ static void tscFreeSubSqlObj(SRetrieveSupport *trsupport, SSqlObj *pSql) {
static void tscRetrieveFromDnodeCallBack(void *param, TAOS_RES *tres, int numOfRows); static void tscRetrieveFromDnodeCallBack(void *param, TAOS_RES *tres, int numOfRows);
static void tscHandleSubqueryError(SRetrieveSupport *trsupport, SSqlObj *pSql, int numOfRows); static void tscHandleSubqueryError(SRetrieveSupport *trsupport, SSqlObj *pSql, int numOfRows);
static void tscAbortFurtherRetryRetrieval(SRetrieveSupport *trsupport, TAOS_RES *tres, int32_t errCode) { static void tscAbortFurtherRetryRetrieval(SRetrieveSupport *trsupport, TAOS_RES *tres, int32_t code) {
// set no disk space error info // set no disk space error info
#ifdef WINDOWS #ifdef WINDOWS
LPVOID lpMsgBuf; LPVOID lpMsgBuf;
@ -1449,26 +1492,26 @@ static void tscAbortFurtherRetryRetrieval(SRetrieveSupport *trsupport, TAOS_RES
tscError("sub:%p failed to flush data to disk:reason:%s", tres, strerror(errno)); tscError("sub:%p failed to flush data to disk:reason:%s", tres, strerror(errno));
#endif #endif
trsupport->pState->code = -errCode; SSqlObj* pParentSql = trsupport->pParentSqlObj;
pParentSql->res.code = code;
trsupport->numOfRetry = MAX_NUM_OF_SUBQUERY_RETRY; trsupport->numOfRetry = MAX_NUM_OF_SUBQUERY_RETRY;
pthread_mutex_unlock(&trsupport->queryMutex); pthread_mutex_unlock(&trsupport->queryMutex);
tscHandleSubqueryError(trsupport, tres, trsupport->pState->code); tscHandleSubqueryError(trsupport, tres, pParentSql->res.code);
} }
void tscHandleSubqueryError(SRetrieveSupport *trsupport, SSqlObj *pSql, int numOfRows) { void tscHandleSubqueryError(SRetrieveSupport *trsupport, SSqlObj *pSql, int numOfRows) {
SSqlObj *pPObj = trsupport->pParentSqlObj; SSqlObj *pParentSql = trsupport->pParentSqlObj;
int32_t subqueryIndex = trsupport->subqueryIndex; int32_t subqueryIndex = trsupport->subqueryIndex;
assert(pSql != NULL); assert(pSql != NULL);
SSubqueryState* pState = trsupport->pState; SSubqueryState* pState = trsupport->pState;
assert(pState->numOfCompleted < pState->numOfTotal && pState->numOfCompleted >= 0 && assert(pState->numOfRemain <= pState->numOfTotal && pState->numOfRemain >= 0 && pParentSql->numOfSubs == pState->numOfTotal);
pPObj->numOfSubs == pState->numOfTotal);
// retrieved in subquery failed. OR query cancelled in retrieve phase. // retrieved in subquery failed. OR query cancelled in retrieve phase.
if (pState->code == TSDB_CODE_SUCCESS && pPObj->res.code != TSDB_CODE_SUCCESS) { if (taos_errno(pSql) == TSDB_CODE_SUCCESS && pParentSql->res.code != TSDB_CODE_SUCCESS) {
pState->code = pPObj->res.code;
/* /*
* kill current sub-query connection, which may retrieve data from vnodes; * kill current sub-query connection, which may retrieve data from vnodes;
@ -1476,16 +1519,16 @@ void tscHandleSubqueryError(SRetrieveSupport *trsupport, SSqlObj *pSql, int numO
*/ */
pSql->res.numOfRows = 0; pSql->res.numOfRows = 0;
trsupport->numOfRetry = MAX_NUM_OF_SUBQUERY_RETRY; // disable retry efforts trsupport->numOfRetry = MAX_NUM_OF_SUBQUERY_RETRY; // disable retry efforts
tscTrace("%p query is cancelled, sub:%p, orderOfSub:%d abort retrieve, code:%d", trsupport->pParentSqlObj, pSql, tscTrace("%p query is cancelled, sub:%p, orderOfSub:%d abort retrieve, code:%d", pParentSql, pSql,
subqueryIndex, pState->code); subqueryIndex, pParentSql->res.code);
} }
if (numOfRows >= 0) { // current query is successful, but other sub query failed, still abort current query. if (numOfRows >= 0) { // current query is successful, but other sub query failed, still abort current query.
tscTrace("%p sub:%p retrieve numOfRows:%d,orderOfSub:%d", pPObj, pSql, numOfRows, subqueryIndex); tscTrace("%p sub:%p retrieve numOfRows:%d,orderOfSub:%d", pParentSql, pSql, numOfRows, subqueryIndex);
tscError("%p sub:%p abort further retrieval due to other queries failure,orderOfSub:%d,code:%d", pPObj, pSql, tscError("%p sub:%p abort further retrieval due to other queries failure,orderOfSub:%d,code:%d", pParentSql, pSql,
subqueryIndex, pState->code); subqueryIndex, pParentSql->res.code);
} else { } else {
if (trsupport->numOfRetry++ < MAX_NUM_OF_SUBQUERY_RETRY && pState->code == TSDB_CODE_SUCCESS) { if (trsupport->numOfRetry++ < MAX_NUM_OF_SUBQUERY_RETRY && pParentSql->res.code == TSDB_CODE_SUCCESS) {
/* /*
* current query failed, and the retry count is less than the available * current query failed, and the retry count is less than the available
* count, retry query clear previous retrieved data, then launch a new sub query * count, retry query clear previous retrieved data, then launch a new sub query
@ -1504,7 +1547,7 @@ void tscHandleSubqueryError(SRetrieveSupport *trsupport, SSqlObj *pSql, int numO
tscError("%p sub:%p failed to create new subquery sqlObj due to out of memory, abort retry", tscError("%p sub:%p failed to create new subquery sqlObj due to out of memory, abort retry",
trsupport->pParentSqlObj, pSql); trsupport->pParentSqlObj, pSql);
pState->code = TSDB_CODE_TSC_OUT_OF_MEMORY; pParentSql->res.code = TSDB_CODE_TSC_OUT_OF_MEMORY;
trsupport->numOfRetry = MAX_NUM_OF_SUBQUERY_RETRY; trsupport->numOfRetry = MAX_NUM_OF_SUBQUERY_RETRY;
return; return;
} }
@ -1512,23 +1555,23 @@ void tscHandleSubqueryError(SRetrieveSupport *trsupport, SSqlObj *pSql, int numO
tscProcessSql(pNew); tscProcessSql(pNew);
return; return;
} else { // reach the maximum retry count, abort } else { // reach the maximum retry count, abort
atomic_val_compare_exchange_32(&pState->code, TSDB_CODE_SUCCESS, numOfRows); atomic_val_compare_exchange_32(&pParentSql->res.code, TSDB_CODE_SUCCESS, numOfRows);
tscError("%p sub:%p retrieve failed,code:%s,orderOfSub:%d failed.no more retry,set global code:%s", pPObj, pSql, tscError("%p sub:%p retrieve failed,code:%s,orderOfSub:%d failed.no more retry,set global code:%s", pParentSql, pSql,
tstrerror(numOfRows), subqueryIndex, tstrerror(pState->code)); tstrerror(numOfRows), subqueryIndex, tstrerror(pParentSql->res.code));
} }
} }
int32_t numOfTotal = pState->numOfTotal; int32_t remain = -1;
if ((remain = atomic_sub_fetch_32(&pState->numOfRemain, 1)) > 0) {
tscTrace("%p sub:%p orderOfSub:%d freed, finished subqueries:%d", pParentSql, pSql, trsupport->subqueryIndex,
pState->numOfTotal - remain);
int32_t finished = atomic_add_fetch_32(&pState->numOfCompleted, 1);
if (finished < numOfTotal) {
tscTrace("%p sub:%p orderOfSub:%d freed, finished subqueries:%d", pPObj, pSql, trsupport->subqueryIndex, finished);
return tscFreeSubSqlObj(trsupport, pSql); return tscFreeSubSqlObj(trsupport, pSql);
} }
// all subqueries are failed // all subqueries are failed
tscError("%p retrieve from %d vnode(s) completed,code:%s.FAILED.", pPObj, pState->numOfTotal, tstrerror(pState->code)); tscError("%p retrieve from %d vnode(s) completed,code:%s.FAILED.", pParentSql, pState->numOfTotal,
pPObj->res.code = pState->code; tstrerror(pParentSql->res.code));
// release allocated resource // release allocated resource
tscLocalReducerEnvDestroy(trsupport->pExtMemBuffer, trsupport->pOrderDescriptor, trsupport->pFinalColModel, tscLocalReducerEnvDestroy(trsupport->pExtMemBuffer, trsupport->pOrderDescriptor, trsupport->pFinalColModel,
@ -1538,13 +1581,13 @@ void tscHandleSubqueryError(SRetrieveSupport *trsupport, SSqlObj *pSql, int numO
tscFreeSubSqlObj(trsupport, pSql); tscFreeSubSqlObj(trsupport, pSql);
// in case of second stage join subquery, invoke its callback function instead of regular QueueAsyncRes // in case of second stage join subquery, invoke its callback function instead of regular QueueAsyncRes
SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(&pPObj->cmd, 0); SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(&pParentSql->cmd, 0);
if (!TSDB_QUERY_HAS_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_JOIN_SEC_STAGE)) { if (!TSDB_QUERY_HAS_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_JOIN_SEC_STAGE)) {
(*pPObj->fp)(pPObj->param, pPObj, pPObj->res.code); (*pParentSql->fp)(pParentSql->param, pParentSql, pParentSql->res.code);
} else { // regular super table query } else { // regular super table query
if (pPObj->res.code != TSDB_CODE_SUCCESS) { if (pParentSql->res.code != TSDB_CODE_SUCCESS) {
tscQueueAsyncRes(pPObj); tscQueueAsyncRes(pParentSql);
} }
} }
} }
@ -1590,14 +1633,11 @@ static void tscAllDataRetrievedFromDnode(SRetrieveSupport *trsupport, SSqlObj* p
return tscAbortFurtherRetryRetrieval(trsupport, pSql, TSDB_CODE_TSC_NO_DISKSPACE); return tscAbortFurtherRetryRetrieval(trsupport, pSql, TSDB_CODE_TSC_NO_DISKSPACE);
} }
// keep this value local variable, since the pState variable may be released by other threads, if atomic_add opertion int32_t remain = -1;
// increases the finished value up to pState->numOfTotal value, which means all subqueries are completed. if ((remain = atomic_sub_fetch_32(&pState->numOfRemain, 1)) > 0) {
// In this case, the comparsion between finished value and released pState->numOfTotal is not safe. tscTrace("%p sub:%p orderOfSub:%d freed, finished subqueries:%d", pPObj, pSql, trsupport->subqueryIndex,
int32_t numOfTotal = pState->numOfTotal; pState->numOfTotal - remain);
int32_t finished = atomic_add_fetch_32(&pState->numOfCompleted, 1);
if (finished < numOfTotal) {
tscTrace("%p sub:%p orderOfSub:%d freed, finished subqueries:%d", pPObj, pSql, trsupport->subqueryIndex, finished);
return tscFreeSubSqlObj(trsupport, pSql); return tscFreeSubSqlObj(trsupport, pSql);
} }
@ -1632,9 +1672,9 @@ static void tscAllDataRetrievedFromDnode(SRetrieveSupport *trsupport, SSqlObj* p
static void tscRetrieveFromDnodeCallBack(void *param, TAOS_RES *tres, int numOfRows) { static void tscRetrieveFromDnodeCallBack(void *param, TAOS_RES *tres, int numOfRows) {
SRetrieveSupport *trsupport = (SRetrieveSupport *)param; SRetrieveSupport *trsupport = (SRetrieveSupport *)param;
tOrderDescriptor *pDesc = trsupport->pOrderDescriptor;
int32_t idx = trsupport->subqueryIndex; int32_t idx = trsupport->subqueryIndex;
SSqlObj * pPObj = trsupport->pParentSqlObj; SSqlObj * pPObj = trsupport->pParentSqlObj;
tOrderDescriptor *pDesc = trsupport->pOrderDescriptor;
SSqlObj *pSql = (SSqlObj *)tres; SSqlObj *pSql = (SSqlObj *)tres;
if (pSql == NULL) { // sql object has been released in error process, return immediately if (pSql == NULL) { // sql object has been released in error process, return immediately
@ -1643,13 +1683,12 @@ static void tscRetrieveFromDnodeCallBack(void *param, TAOS_RES *tres, int numOfR
} }
SSubqueryState* pState = trsupport->pState; SSubqueryState* pState = trsupport->pState;
assert(pState->numOfCompleted < pState->numOfTotal && pState->numOfCompleted >= 0 && assert(pState->numOfRemain <= pState->numOfTotal && pState->numOfRemain >= 0 && pPObj->numOfSubs == pState->numOfTotal);
pPObj->numOfSubs == pState->numOfTotal);
// query process and cancel query process may execute at the same time // query process and cancel query process may execute at the same time
pthread_mutex_lock(&trsupport->queryMutex); pthread_mutex_lock(&trsupport->queryMutex);
if (numOfRows < 0 || pState->code < 0 || pPObj->res.code != TSDB_CODE_SUCCESS) { if (numOfRows < 0 || pPObj->res.code != TSDB_CODE_SUCCESS) {
return tscHandleSubqueryError(trsupport, pSql, numOfRows); return tscHandleSubqueryError(trsupport, pSql, numOfRows);
} }
@ -1738,18 +1777,16 @@ void tscRetrieveDataRes(void *param, TAOS_RES *tres, int code) {
SCMVgroupInfo* pVgroup = &pTableMetaInfo->vgroupList->vgroups[0]; SCMVgroupInfo* pVgroup = &pTableMetaInfo->vgroupList->vgroups[0];
SSubqueryState* pState = trsupport->pState; SSubqueryState* pState = trsupport->pState;
assert(pState->numOfCompleted < pState->numOfTotal && pState->numOfCompleted >= 0 && assert(pState->numOfRemain <= pState->numOfTotal && pState->numOfRemain >= 0 && pParentSql->numOfSubs == pState->numOfTotal);
pParentSql->numOfSubs == pState->numOfTotal);
if (pParentSql->res.code != TSDB_CODE_SUCCESS || pState->code != TSDB_CODE_SUCCESS) { // todo set error code
if (pParentSql->res.code != TSDB_CODE_SUCCESS) {
// stable query is killed, abort further retry // stable query is killed, abort further retry
trsupport->numOfRetry = MAX_NUM_OF_SUBQUERY_RETRY; trsupport->numOfRetry = MAX_NUM_OF_SUBQUERY_RETRY;
if (pParentSql->res.code != TSDB_CODE_SUCCESS) { if (pParentSql->res.code != TSDB_CODE_SUCCESS) {
code = pParentSql->res.code; code = pParentSql->res.code;
} else {
code = pState->code;
} }
tscTrace("%p query cancelled or failed, sub:%p, orderOfSub:%d abort, code:%s", pParentSql, pSql, tscTrace("%p query cancelled or failed, sub:%p, orderOfSub:%d abort, code:%s", pParentSql, pSql,
@ -1766,7 +1803,7 @@ void tscRetrieveDataRes(void *param, TAOS_RES *tres, int code) {
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
if (trsupport->numOfRetry++ >= MAX_NUM_OF_SUBQUERY_RETRY) { if (trsupport->numOfRetry++ >= MAX_NUM_OF_SUBQUERY_RETRY) {
tscTrace("%p sub:%p reach the max retry times, set global code:%s", pParentSql, pSql, tstrerror(code)); tscTrace("%p sub:%p reach the max retry times, set global code:%s", pParentSql, pSql, tstrerror(code));
atomic_val_compare_exchange_32(&pState->code, 0, code); atomic_val_compare_exchange_32(&pParentSql->res.code, TSDB_CODE_SUCCESS, code);
} else { // does not reach the maximum retry time, go on } else { // does not reach the maximum retry time, go on
tscTrace("%p sub:%p failed code:%s, retry:%d", pParentSql, pSql, tstrerror(code), trsupport->numOfRetry); tscTrace("%p sub:%p failed code:%s, retry:%d", pParentSql, pSql, tstrerror(code), trsupport->numOfRetry);
@ -1776,7 +1813,7 @@ void tscRetrieveDataRes(void *param, TAOS_RES *tres, int code) {
tscError("%p sub:%p failed to create new subquery due to out of memory, abort retry, vgId:%d, orderOfSub:%d", tscError("%p sub:%p failed to create new subquery due to out of memory, abort retry, vgId:%d, orderOfSub:%d",
trsupport->pParentSqlObj, pSql, pVgroup->vgId, trsupport->subqueryIndex); trsupport->pParentSqlObj, pSql, pVgroup->vgId, trsupport->subqueryIndex);
pState->code = TSDB_CODE_TSC_OUT_OF_MEMORY; pParentSql->res.code = TSDB_CODE_TSC_OUT_OF_MEMORY;
trsupport->numOfRetry = MAX_NUM_OF_SUBQUERY_RETRY; trsupport->numOfRetry = MAX_NUM_OF_SUBQUERY_RETRY;
} else { } else {
SQueryInfo *pNewQueryInfo = tscGetQueryInfoDetail(&pNew->cmd, 0); SQueryInfo *pNewQueryInfo = tscGetQueryInfoDetail(&pNew->cmd, 0);
@ -1789,11 +1826,11 @@ void tscRetrieveDataRes(void *param, TAOS_RES *tres, int code) {
} }
} }
if (pState->code != TSDB_CODE_SUCCESS) { // at least one peer subquery failed, abort current query if (pParentSql->res.code != TSDB_CODE_SUCCESS) { // at least one peer subquery failed, abort current query
tscTrace("%p sub:%p query failed,ip:%u,vgId:%d,orderOfSub:%d,global code:%d", pParentSql, pSql, tscTrace("%p sub:%p query failed,ip:%u,vgId:%d,orderOfSub:%d,global code:%d", pParentSql, pSql,
pVgroup->ipAddr[0].fqdn, pVgroup->vgId, trsupport->subqueryIndex, pState->code); pVgroup->ipAddr[0].fqdn, pVgroup->vgId, trsupport->subqueryIndex, pParentSql->res.code);
tscHandleSubqueryError(param, tres, pState->code); tscHandleSubqueryError(param, tres, pParentSql->res.code);
} else { // success, proceed to retrieve data from dnode } else { // success, proceed to retrieve data from dnode
tscTrace("%p sub:%p query complete, ip:%s, vgId:%d, orderOfSub:%d, retrieve data", trsupport->pParentSqlObj, pSql, tscTrace("%p sub:%p query complete, ip:%s, vgId:%d, orderOfSub:%d, retrieve data", trsupport->pParentSqlObj, pSql,
pVgroup->ipAddr[0].fqdn, pVgroup->vgId, trsupport->subqueryIndex); pVgroup->ipAddr[0].fqdn, pVgroup->vgId, trsupport->subqueryIndex);
@ -1813,7 +1850,6 @@ static void multiVnodeInsertMerge(void* param, TAOS_RES* tres, int numOfRows) {
SSqlCmd* pParentCmd = &pParentObj->cmd; SSqlCmd* pParentCmd = &pParentObj->cmd;
SSubqueryState* pState = pSupporter->pState; SSubqueryState* pState = pSupporter->pState;
int32_t total = pState->numOfTotal;
// increase the total inserted rows // increase the total inserted rows
if (numOfRows > 0) { if (numOfRows > 0) {
@ -1826,8 +1862,7 @@ static void multiVnodeInsertMerge(void* param, TAOS_RES* tres, int numOfRows) {
} }
taos_free_result(tres); taos_free_result(tres);
int32_t completed = atomic_add_fetch_32(&pState->numOfCompleted, 1); if (atomic_sub_fetch_32(&pState->numOfRemain, 1) > 0) {
if (completed < total) {
return; return;
} }
@ -2157,5 +2192,3 @@ static bool tscHasRemainDataInSubqueryResultSet(SSqlObj *pSql) {
return hasData; return hasData;
} }

View File

@ -1216,7 +1216,6 @@ static int32_t tableApplyFunctionsOnBlock(SQueryRuntimeEnv *pRuntimeEnv, SDataBl
// interval query with limit applied // interval query with limit applied
int32_t numOfRes = 0; int32_t numOfRes = 0;
if (isIntervalQuery(pQuery)) { if (isIntervalQuery(pQuery)) {
numOfRes = doCheckQueryCompleted(pRuntimeEnv, lastKey, pWindowResInfo); numOfRes = doCheckQueryCompleted(pRuntimeEnv, lastKey, pWindowResInfo);
} else { } else {

View File

@ -15,7 +15,6 @@
#include <stdint.h> #include <stdint.h>
#include <pthread.h> #include <pthread.h>
#include <errno.h>
#include <stdlib.h> #include <stdlib.h>
#include <string.h> #include <string.h>

View File

@ -114,46 +114,46 @@ sql drop table tb
sql drop table mt sql drop table mt
sleep 3000 sleep 3000
## ALTER TABLE WHILE STREAMING [TBASE271] ### ALTER TABLE WHILE STREAMING [TBASE271]
sql create table tb1 (ts timestamp, c1 int, c2 nchar(5), c3 int) #sql create table tb1 (ts timestamp, c1 int, c2 nchar(5), c3 int)
sql create table strm as select count(*), avg(c1), first(c2), sum(c3) from tb1 interval(2s) #sql create table strm as select count(*), avg(c1), first(c2), sum(c3) from tb1 interval(2s)
sql select * from strm #sql select * from strm
if $rows != 0 then #if $rows != 0 then
return -1 # return -1
endi #endi
#sleep 12000 ##sleep 12000
sql insert into tb1 values (now, 1, 'taos', 1) #sql insert into tb1 values (now, 1, 'taos', 1)
sleep 20000 #sleep 20000
sql select * from strm #sql select * from strm
print rows = $rows #print rows = $rows
if $rows != 1 then #if $rows != 1 then
return -1 # return -1
endi #endi
if $data04 != 1 then #if $data04 != 1 then
return -1 # return -1
endi #endi
sql alter table tb1 drop column c3 #sql alter table tb1 drop column c3
sleep 6000 #sleep 6000
sql insert into tb1 values (now, 2, 'taos') #sql insert into tb1 values (now, 2, 'taos')
sleep 30000 #sleep 30000
sql select * from strm #sql select * from strm
if $rows != 2 then #if $rows != 2 then
return -1 # return -1
endi #endi
if $data04 != 1 then #if $data04 != 1 then
return -1 # return -1
endi #endi
sql alter table tb1 add column c3 int #sql alter table tb1 add column c3 int
sleep 6000 #sleep 6000
sql insert into tb1 values (now, 3, 'taos', 3); #sql insert into tb1 values (now, 3, 'taos', 3);
sleep 3000 #sleep 3000
sql select * from strm #sql select * from strm
if $rows != 3 then #if $rows != 3 then
return -1 # return -1
endi #endi
if $data04 != 1 then #if $data04 != 1 then
return -1 # return -1
endi #endi
## ALTER TABLE AND INSERT BY COLUMNS ## ALTER TABLE AND INSERT BY COLUMNS
sql create table mt (ts timestamp, c1 int, c2 int) tags(t1 int) sql create table mt (ts timestamp, c1 int, c2 int) tags(t1 int)

View File

@ -82,26 +82,22 @@ sleep 2000
run general/parser/groupby.sim run general/parser/groupby.sim
sleep 2000 sleep 2000
run general/parser/set_tag_vals.sim run general/parser/set_tag_vals.sim
sleep 2000 sleep 2000
run general/parser/slimit_alter_tags.sim # persistent failed run general/parser/slimit_alter_tags.sim # persistent failed
sleep 2000 sleep 2000
run general/parser/join.sim run general/parser/join.sim
sleep 2000 sleep 2000
run general/parser/join_multivnode.sim run general/parser/join_multivnode.sim
sleep 2000 sleep 2000
#run general/parser/repeatAlter.sim run general/parser/repeatAlter.sim
sleep 2000
#run general/parser/repeatStream.sim
sleep 2000 sleep 2000
run general/parser/binary_escapeCharacter.sim run general/parser/binary_escapeCharacter.sim
sleep 2000 sleep 2000
run general/parser/bug.sim run general/parser/bug.sim
sleep 2000 #sleep 2000
run general/parser/stream_on_sys.sim #run general/parser/repeatStream.sim
sleep 2000 #sleep 2000
run general/parser/stream.sim #run general/parser/stream_on_sys.sim
#sleep 2000
#run general/parser/stream.sim