add the union support in sql parser: refactor some codes. #1032. [TBASE-1140]

This commit is contained in:
hjxilinx 2020-01-08 17:04:00 +08:00
parent 3cc8eb0917
commit b058e4e85b
6 changed files with 124 additions and 87 deletions

View File

@ -241,6 +241,7 @@ void tscPrintSelectClause(SSqlCmd* pCmd, int32_t subClauseIndex);
bool hasMoreVnodesToTry(SSqlObj *pSql); bool hasMoreVnodesToTry(SSqlObj *pSql);
void tscTryQueryNextVnode(SSqlObj *pSql, __async_cb_func_t fp); void tscTryQueryNextVnode(SSqlObj *pSql, __async_cb_func_t fp);
void tscAsyncQuerySingleRowForNextVnode(void *param, TAOS_RES *tres, int numOfRows); void tscAsyncQuerySingleRowForNextVnode(void *param, TAOS_RES *tres, int numOfRows);
void tscTryQueryNextClause(SSqlObj* pSql, void (*queryFp)());
#ifdef __cplusplus #ifdef __cplusplus

View File

@ -36,8 +36,8 @@ static void tscProcessAsyncRetrieveImpl(void *param, TAOS_RES *tres, int numOfRo
* If sql queries upon a super table and two-stage merge procedure is not involved (when employ the projection * If sql queries upon a super table and two-stage merge procedure is not involved (when employ the projection
* query), it will sequentially query&retrieve data for all vnodes * query), it will sequentially query&retrieve data for all vnodes
*/ */
static void tscProcessAsyncFetchRowsProxy(void *param, TAOS_RES *tres, int numOfRows); static void tscAsyncFetchRowsProxy(void *param, TAOS_RES *tres, int numOfRows);
static void tscProcessAsyncFetchSingleRowProxy(void *param, TAOS_RES *tres, int numOfRows); static void tscAsyncFetchSingleRowProxy(void *param, TAOS_RES *tres, int numOfRows);
// TODO return the correct error code to client in tscQueueAsyncError // TODO return the correct error code to client in tscQueueAsyncError
void taos_query_a(TAOS *taos, const char *sqlstr, void (*fp)(void *, TAOS_RES *, int), void *param) { void taos_query_a(TAOS *taos, const char *sqlstr, void (*fp)(void *, TAOS_RES *, int), void *param) {
@ -80,7 +80,6 @@ void taos_query_a(TAOS *taos, const char *sqlstr, void (*fp)(void *, TAOS_RES *,
return; return;
} }
pSql->sqlstr = malloc(sqlLen + 1); pSql->sqlstr = malloc(sqlLen + 1);
if (pSql->sqlstr == NULL) { if (pSql->sqlstr == NULL) {
tscError("%p failed to malloc sql string buffer", pSql); tscError("%p failed to malloc sql string buffer", pSql);
@ -108,7 +107,7 @@ void taos_query_a(TAOS *taos, const char *sqlstr, void (*fp)(void *, TAOS_RES *,
tscDoQuery(pSql); tscDoQuery(pSql);
} }
static void tscProcessAsyncFetchRowsProxy(void *param, TAOS_RES *tres, int numOfRows) { static void tscAsyncFetchRowsProxy(void *param, TAOS_RES *tres, int numOfRows) {
if (tres == NULL) { if (tres == NULL) {
return; return;
} }
@ -121,6 +120,15 @@ static void tscProcessAsyncFetchRowsProxy(void *param, TAOS_RES *tres, int numOf
if (hasMoreVnodesToTry(pSql)) { // sequentially retrieve data from remain vnodes. if (hasMoreVnodesToTry(pSql)) { // sequentially retrieve data from remain vnodes.
tscTryQueryNextVnode(pSql, tscAsyncQueryRowsForNextVnode); tscTryQueryNextVnode(pSql, tscAsyncQueryRowsForNextVnode);
} else { } else {
/*
* all available virtual node has been checked already, now we need to check
* for the next subclause queries
*/
if (pCmd->clauseIndex < pCmd->numOfClause - 1) {
tscTryQueryNextClause(pSql, tscAsyncQueryRowsForNextVnode);
return;
}
/* /*
* 1. has reach the limitation * 1. has reach the limitation
* 2. no remain virtual nodes to be retrieved anymore * 2. no remain virtual nodes to be retrieved anymore
@ -150,7 +158,7 @@ static void tscProcessAsyncRetrieveImpl(void *param, TAOS_RES *tres, int numOfRo
SSqlCmd *pCmd = &pSql->cmd; SSqlCmd *pCmd = &pSql->cmd;
SSqlRes *pRes = &pSql->res; SSqlRes *pRes = &pSql->res;
if (pRes->qhandle == 0 || numOfRows != 0) { if ((pRes->qhandle == 0 || numOfRows != 0) && pCmd->command < TSDB_SQL_LOCAL) {
if (pRes->qhandle == 0) { if (pRes->qhandle == 0) {
tscError("qhandle is NULL"); tscError("qhandle is NULL");
} else { } else {
@ -175,12 +183,12 @@ static void tscProcessAsyncRetrieveImpl(void *param, TAOS_RES *tres, int numOfRo
*/ */
static void tscAsyncQueryRowsForNextVnode(void *param, TAOS_RES *tres, int numOfRows) { static void tscAsyncQueryRowsForNextVnode(void *param, TAOS_RES *tres, int numOfRows) {
// query completed, continue to retrieve // query completed, continue to retrieve
tscProcessAsyncRetrieveImpl(param, tres, numOfRows, tscProcessAsyncFetchRowsProxy); tscProcessAsyncRetrieveImpl(param, tres, numOfRows, tscAsyncFetchRowsProxy);
} }
void tscAsyncQuerySingleRowForNextVnode(void *param, TAOS_RES *tres, int numOfRows) { void tscAsyncQuerySingleRowForNextVnode(void *param, TAOS_RES *tres, int numOfRows) {
// query completed, continue to retrieve // query completed, continue to retrieve
tscProcessAsyncRetrieveImpl(param, tres, numOfRows, tscProcessAsyncFetchSingleRowProxy); tscProcessAsyncRetrieveImpl(param, tres, numOfRows, tscAsyncFetchSingleRowProxy);
} }
void taos_fetch_rows_a(TAOS_RES *taosa, void (*fp)(void *, TAOS_RES *, int), void *param) { void taos_fetch_rows_a(TAOS_RES *taosa, void (*fp)(void *, TAOS_RES *, int), void *param) {
@ -203,7 +211,7 @@ void taos_fetch_rows_a(TAOS_RES *taosa, void (*fp)(void *, TAOS_RES *, int), voi
// user-defined callback function is stored in fetchFp // user-defined callback function is stored in fetchFp
pSql->fetchFp = fp; pSql->fetchFp = fp;
pSql->fp = tscProcessAsyncFetchRowsProxy; pSql->fp = tscAsyncFetchRowsProxy;
pSql->param = param; pSql->param = param;
tscResetForNextRetrieve(pRes); tscResetForNextRetrieve(pRes);
@ -238,7 +246,7 @@ void taos_fetch_row_a(TAOS_RES *taosa, void (*fp)(void *, TAOS_RES *, TAOS_ROW),
if (pRes->row >= pRes->numOfRows) { if (pRes->row >= pRes->numOfRows) {
tscResetForNextRetrieve(pRes); tscResetForNextRetrieve(pRes);
pSql->fp = tscProcessAsyncFetchSingleRowProxy; pSql->fp = tscAsyncFetchSingleRowProxy;
if (pCmd->command != TSDB_SQL_RETRIEVE_METRIC && pCmd->command < TSDB_SQL_LOCAL) { if (pCmd->command != TSDB_SQL_RETRIEVE_METRIC && pCmd->command < TSDB_SQL_LOCAL) {
pCmd->command = (pCmd->command > TSDB_SQL_MGMT) ? TSDB_SQL_RETRIEVE : TSDB_SQL_FETCH; pCmd->command = (pCmd->command > TSDB_SQL_MGMT) ? TSDB_SQL_RETRIEVE : TSDB_SQL_FETCH;
@ -255,7 +263,7 @@ void taos_fetch_row_a(TAOS_RES *taosa, void (*fp)(void *, TAOS_RES *, TAOS_ROW),
} }
} }
void tscProcessAsyncFetchSingleRowProxy(void *param, TAOS_RES *tres, int numOfRows) { void tscAsyncFetchSingleRowProxy(void *param, TAOS_RES *tres, int numOfRows) {
SSqlObj *pSql = (SSqlObj *)tres; SSqlObj *pSql = (SSqlObj *)tres;
SSqlRes *pRes = &pSql->res; SSqlRes *pRes = &pSql->res;
SSqlCmd *pCmd = &pSql->cmd; SSqlCmd *pCmd = &pSql->cmd;
@ -352,7 +360,7 @@ void tscQueueAsyncRes(SSqlObj *pSql) {
tscTrace("%p SqlObj is freed, not add into queue async res", pSql); tscTrace("%p SqlObj is freed, not add into queue async res", pSql);
return; return;
} else { } else {
tscTrace("%p add into queued async res, code:%d", pSql, pSql->res.code); tscError("%p add into queued async res, code:%d", pSql, pSql->res.code);
} }
SSchedMsg schedMsg; SSchedMsg schedMsg;
@ -513,7 +521,8 @@ void tscMeterMetaCallBack(void *param, TAOS_RES *res, int code) {
} }
} }
if (code != 0) { if (code != TSDB_CODE_SUCCESS) {
pSql->res.code = code;
tscQueueAsyncRes(pSql); tscQueueAsyncRes(pSql);
return; return;
} }

View File

@ -448,6 +448,8 @@ void tscSetLocalQueryResult(SSqlObj *pSql, const char *val, const char *columnNa
SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, 0); SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);
pQueryInfo->order.order = TSQL_SO_ASC; pQueryInfo->order.order = TSQL_SO_ASC;
tscClearFieldInfo(&pQueryInfo->fieldsInfo);
tscFieldInfoSetValue(&pQueryInfo->fieldsInfo, 0, TSDB_DATA_TYPE_BINARY, columnName, valueLength); tscFieldInfoSetValue(&pQueryInfo->fieldsInfo, 0, TSDB_DATA_TYPE_BINARY, columnName, valueLength);
tscInitResObjForLocalQuery(pSql, 1, valueLength); tscInitResObjForLocalQuery(pSql, 1, valueLength);

View File

@ -1483,11 +1483,10 @@ int tscBuildRetrieveMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
*((uint16_t *)pMsg) = htons(pQueryInfo->type); *((uint16_t *)pMsg) = htons(pQueryInfo->type);
pMsg += sizeof(pQueryInfo->type); pMsg += sizeof(pQueryInfo->type);
msgLen = pMsg - pStart; pSql->cmd.payloadLen = pMsg - pStart;
pSql->cmd.payloadLen = msgLen;
pSql->cmd.msgType = TSDB_MSG_TYPE_RETRIEVE; pSql->cmd.msgType = TSDB_MSG_TYPE_RETRIEVE;
return msgLen; return TSDB_CODE_SUCCESS;
} }
void tscUpdateVnodeInSubmitMsg(SSqlObj *pSql, char *buf) { void tscUpdateVnodeInSubmitMsg(SSqlObj *pSql, char *buf) {
@ -2159,7 +2158,7 @@ int32_t tscBuildDropAcctMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
pCmd->payloadLen = pMsg - pStart; pCmd->payloadLen = pMsg - pStart;
pCmd->msgType = TSDB_MSG_TYPE_DROP_USER; pCmd->msgType = TSDB_MSG_TYPE_DROP_USER;
return msgLen; return TSDB_CODE_SUCCESS;
} }
int32_t tscBuildUseDbMsg(SSqlObj *pSql, SSqlInfo *pInfo) { int32_t tscBuildUseDbMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
@ -2535,8 +2534,6 @@ static int tscLocalResultCommonBuilder(SSqlObj *pSql, int32_t numOfRes) {
pRes->rspType = 1; pRes->rspType = 1;
tscSetResultPointer(pQueryInfo, pRes); tscSetResultPointer(pQueryInfo, pRes);
pRes->row = 0;
} else { } else {
tscResetForNextRetrieve(pRes); tscResetForNextRetrieve(pRes);
} }
@ -2625,11 +2622,10 @@ int tscBuildConnectMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
pMsg += sizeof(SConnectMsg); pMsg += sizeof(SConnectMsg);
msgLen = pMsg - pStart; pCmd->payloadLen = pMsg - pStart;
pCmd->payloadLen = msgLen;
pCmd->msgType = TSDB_MSG_TYPE_CONNECT; pCmd->msgType = TSDB_MSG_TYPE_CONNECT;
return msgLen; return TSDB_CODE_SUCCESS;
} }
int tscBuildMeterMetaMsg(SSqlObj *pSql, SSqlInfo *pInfo) { int tscBuildMeterMetaMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
@ -2678,7 +2674,7 @@ int tscBuildMeterMetaMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
tfree(tmpData); tfree(tmpData);
assert(msgLen + minMsgSize() <= pCmd->allocSize); assert(msgLen + minMsgSize() <= pCmd->allocSize);
return msgLen; return TSDB_CODE_SUCCESS;
} }
/** /**
@ -2881,7 +2877,8 @@ int tscBuildMetricMetaMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
pCmd->payloadLen = msgLen; pCmd->payloadLen = msgLen;
pCmd->msgType = TSDB_MSG_TYPE_METRIC_META; pCmd->msgType = TSDB_MSG_TYPE_METRIC_META;
assert(msgLen + minMsgSize() <= size); assert(msgLen + minMsgSize() <= size);
return msgLen;
return TSDB_CODE_SUCCESS;
} }
int tscEstimateHeartBeatMsgLength(SSqlObj *pSql) { int tscEstimateHeartBeatMsgLength(SSqlObj *pSql) {

View File

@ -543,7 +543,7 @@ static void **tscBuildResFromSubqueries(SSqlObj *pSql) {
pRes->numOfTotalInCurrentClause++; pRes->numOfTotalInCurrentClause++;
break; break;
} else {// continue retrieve data from vnode } else { // continue retrieve data from vnode
if (!tscHashRemainDataInSubqueryResultSet(pSql)) { if (!tscHashRemainDataInSubqueryResultSet(pSql)) {
tscTrace("%p at least one subquery exhausted, free all other %d subqueries", pSql, pSql->numOfSubs - 1); tscTrace("%p at least one subquery exhausted, free all other %d subqueries", pSql, pSql->numOfSubs - 1);
SSubqueryState *pState = NULL; SSubqueryState *pState = NULL;
@ -611,6 +611,7 @@ TAOS_ROW taos_fetch_row_impl(TAOS_RES *res) {
tscProcessSql(pSql); // retrieve data from virtual node tscProcessSql(pSql); // retrieve data from virtual node
//if failed to retrieve data from current virtual node, try next one if exists
if (hasMoreVnodesToTry(pSql)) { if (hasMoreVnodesToTry(pSql)) {
tscTryQueryNextVnode(pSql, NULL); tscTryQueryNextVnode(pSql, NULL);
} }
@ -634,7 +635,6 @@ TAOS_ROW taos_fetch_row_impl(TAOS_RES *res) {
TAOS_ROW taos_fetch_row(TAOS_RES *res) { TAOS_ROW taos_fetch_row(TAOS_RES *res) {
SSqlObj *pSql = (SSqlObj *)res; SSqlObj *pSql = (SSqlObj *)res;
SSqlCmd *pCmd = &pSql->cmd; SSqlCmd *pCmd = &pSql->cmd;
SSqlRes *pRes = &pSql->res;
if (pSql == NULL || pSql->signature != pSql) { if (pSql == NULL || pSql->signature != pSql) {
globalCode = TSDB_CODE_DISCONNECTED; globalCode = TSDB_CODE_DISCONNECTED;
@ -646,25 +646,13 @@ TAOS_ROW taos_fetch_row(TAOS_RES *res) {
* instead of two-stage merge * instead of two-stage merge
*/ */
TAOS_ROW rows = taos_fetch_row_impl(res); TAOS_ROW rows = taos_fetch_row_impl(res);
if (rows != NULL) {
return rows;
}
// current subclause is completed, try the next subclause // current subclause is completed, try the next subclause
while (rows == NULL && pCmd->clauseIndex < pCmd->numOfClause - 1) { while (rows == NULL && pCmd->clauseIndex < pCmd->numOfClause - 1) {
SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex); tscTryQueryNextClause(pSql, NULL);
pSql->cmd.command = pQueryInfo->command;
pCmd->clauseIndex++;
assert(pSql->fp == NULL);
pRes->numOfTotal += pRes->numOfTotalInCurrentClause;
pRes->numOfTotalInCurrentClause = 0;
pRes->rspType = 0;
pSql->numOfSubs = 0;
tfree(pSql->pSubs);
tscTrace("%p try data in the next subclause:%d, total subclause:%d", pSql, pCmd->clauseIndex, pCmd->numOfClause);
tscProcessSql(pSql);
// if the rows is not NULL, return immediately // if the rows is not NULL, return immediately
rows = taos_fetch_row_impl(res); rows = taos_fetch_row_impl(res);

View File

@ -38,7 +38,7 @@
* tagId2,...] + '.' + group_orderType * tagId2,...] + '.' + group_orderType
*/ */
void tscGetMetricMetaCacheKey(SQueryInfo* pQueryInfo, char* str, uint64_t uid) { void tscGetMetricMetaCacheKey(SQueryInfo* pQueryInfo, char* str, uint64_t uid) {
int32_t index = -1; int32_t index = -1;
SMeterMetaInfo* pMeterMetaInfo = tscGetMeterMetaInfoByUid(pQueryInfo, uid, &index); SMeterMetaInfo* pMeterMetaInfo = tscGetMeterMetaInfoByUid(pQueryInfo, uid, &index);
int32_t len = 0; int32_t len = 0;
@ -228,7 +228,7 @@ bool tscIsTwoStageMergeMetricQuery(SQueryInfo* pQueryInfo, int32_t tableIndex) {
return false; return false;
} }
bool tscProjectionQueryOnSTable(SQueryInfo *pQueryInfo, int32_t tableIndex) { bool tscProjectionQueryOnSTable(SQueryInfo* pQueryInfo, int32_t tableIndex) {
SMeterMetaInfo* pMeterMetaInfo = tscGetMeterMetaInfoFromQueryInfo(pQueryInfo, tableIndex); SMeterMetaInfo* pMeterMetaInfo = tscGetMeterMetaInfoFromQueryInfo(pQueryInfo, tableIndex);
/* /*
@ -1887,7 +1887,7 @@ SSqlObj* createSubqueryObj(SSqlObj* pSql, int16_t tableIndex, void (*fp)(), void
printf("the metricmeta key is:%s\n", key); printf("the metricmeta key is:%s\n", key);
#endif #endif
char* name = pMeterMetaInfo->name; char* name = pMeterMetaInfo->name;
SMeterMetaInfo* pFinalInfo = NULL; SMeterMetaInfo* pFinalInfo = NULL;
if (pPrevSql == NULL) { if (pPrevSql == NULL) {
@ -1911,10 +1911,11 @@ SSqlObj* createSubqueryObj(SSqlObj* pSql, int16_t tableIndex, void (*fp)(), void
assert(pFinalInfo->pMetricMeta != NULL); assert(pFinalInfo->pMetricMeta != NULL);
} }
tscTrace("%p new subquery %p, tableIndex:%d, vnodeIdx:%d, type:%d, exprInfo:%d, colList:%d," tscTrace(
"fieldInfo:%d, name:%s", pSql, pNew, tableIndex, "%p new subquery %p, tableIndex:%d, vnodeIdx:%d, type:%d, exprInfo:%d, colList:%d,"
pMeterMetaInfo->vnodeIndex, pNewQueryInfo->type, pNewQueryInfo->exprsInfo.numOfExprs, pNewQueryInfo->colList.numOfCols, "fieldInfo:%d, name:%s",
pNewQueryInfo->fieldsInfo.numOfOutputCols, pFinalInfo->name); pSql, pNew, tableIndex, pMeterMetaInfo->vnodeIndex, pNewQueryInfo->type, pNewQueryInfo->exprsInfo.numOfExprs,
pNewQueryInfo->colList.numOfCols, pNewQueryInfo->fieldsInfo.numOfOutputCols, pFinalInfo->name);
return pNew; return pNew;
} }
@ -1997,20 +1998,27 @@ char* tscGetErrorMsgPayload(SSqlCmd* pCmd) { return pCmd->payload; }
* If current vnode query does not return results anymore (pRes->numOfRows == 0), try the next vnode if exists, * If current vnode query does not return results anymore (pRes->numOfRows == 0), try the next vnode if exists,
* in case of multi-vnode super table projection query and the result does not reach the limitation. * in case of multi-vnode super table projection query and the result does not reach the limitation.
*/ */
bool hasMoreVnodesToTry(SSqlObj *pSql) { bool hasMoreVnodesToTry(SSqlObj* pSql) {
SSqlCmd *pCmd = &pSql->cmd; SSqlCmd* pCmd = &pSql->cmd;
SSqlRes *pRes = &pSql->res; SSqlRes* pRes = &pSql->res;
SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex); SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
return pRes->numOfRows == 0 && tscProjectionQueryOnSTable(pQueryInfo, 0) && !tscHasReachLimitation(pQueryInfo, pRes); SMeterMetaInfo* pMeterMetaInfo = tscGetMeterMetaInfoFromQueryInfo(pQueryInfo, 0);
if (!UTIL_METER_IS_SUPERTABLE(pMeterMetaInfo)) {
return false;
}
int32_t totalVnode = pMeterMetaInfo->pMetricMeta->numOfVnodes;
return pRes->numOfRows == 0 && tscProjectionQueryOnSTable(pQueryInfo, 0) &&
(!tscHasReachLimitation(pQueryInfo, pRes)) && (pMeterMetaInfo->vnodeIndex < totalVnode - 1);
} }
void tscTryQueryNextVnode(SSqlObj *pSql, __async_cb_func_t fp) { void tscTryQueryNextVnode(SSqlObj* pSql, __async_cb_func_t fp) {
SSqlCmd *pCmd = &pSql->cmd; SSqlCmd* pCmd = &pSql->cmd;
SSqlRes *pRes = &pSql->res; SSqlRes* pRes = &pSql->res;
SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex); SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
/* /*
* no result returned from the current virtual node anymore, try the next vnode if exists * no result returned from the current virtual node anymore, try the next vnode if exists
@ -2018,7 +2026,7 @@ void tscTryQueryNextVnode(SSqlObj *pSql, __async_cb_func_t fp) {
*/ */
assert(pRes->numOfRows == 0 && tscProjectionQueryOnSTable(pQueryInfo, 0) && !tscHasReachLimitation(pQueryInfo, pRes)); assert(pRes->numOfRows == 0 && tscProjectionQueryOnSTable(pQueryInfo, 0) && !tscHasReachLimitation(pQueryInfo, pRes));
SMeterMetaInfo *pMeterMetaInfo = tscGetMeterMetaInfoFromQueryInfo(pQueryInfo, 0); SMeterMetaInfo* pMeterMetaInfo = tscGetMeterMetaInfoFromQueryInfo(pQueryInfo, 0);
int32_t totalVnode = pMeterMetaInfo->pMetricMeta->numOfVnodes; int32_t totalVnode = pMeterMetaInfo->pMetricMeta->numOfVnodes;
while (++pMeterMetaInfo->vnodeIndex < totalVnode) { while (++pMeterMetaInfo->vnodeIndex < totalVnode) {
@ -2060,7 +2068,7 @@ void tscTryQueryNextVnode(SSqlObj *pSql, __async_cb_func_t fp) {
pSql->fp = fp; pSql->fp = fp;
if (fp1 != NULL) { if (fp1 != NULL) {
assert(fp != NULL); assert(fp != NULL);
} }
int32_t ret = tscProcessSql(pSql); // todo check for failure int32_t ret = tscProcessSql(pSql); // todo check for failure
@ -2094,3 +2102,35 @@ void tscTryQueryNextVnode(SSqlObj *pSql, __async_cb_func_t fp) {
tscTrace("%p all vnodes exhausted, prj query completed. total res:%d", pSql, totalVnode, pRes->numOfTotal); tscTrace("%p all vnodes exhausted, prj query completed. total res:%d", pSql, totalVnode, pRes->numOfTotal);
} }
} }
void tscTryQueryNextClause(SSqlObj* pSql, void (*queryFp)()) {
SSqlCmd* pCmd = &pSql->cmd;
SSqlRes* pRes = &pSql->res;
// current subclause is completed, try the next subclause
assert(pCmd->clauseIndex < pCmd->numOfClause - 1);
SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
pSql->cmd.command = pQueryInfo->command;
pCmd->clauseIndex++;
pRes->numOfTotal += pRes->numOfTotalInCurrentClause;
pRes->numOfTotalInCurrentClause = 0;
pRes->rspType = 0;
pSql->numOfSubs = 0;
tfree(pSql->pSubs);
if (pSql->fp != NULL) {
pSql->fp = queryFp;
assert(queryFp != NULL);
}
tscTrace("%p try data in the next subclause:%d, total subclause:%d", pSql, pCmd->clauseIndex, pCmd->numOfClause);
if (pCmd->command > TSDB_SQL_LOCAL) {
tscProcessLocalCmd(pSql);
} else {
tscProcessSql(pSql);
}
}