diff --git a/src/client/src/tscAsync.c b/src/client/src/tscAsync.c index 4fb6ee6f41..f39169c193 100644 --- a/src/client/src/tscAsync.c +++ b/src/client/src/tscAsync.c @@ -211,27 +211,27 @@ void taos_fetch_rows_a(TAOS_RES *tres, __async_cb_func_t fp, void *param) { pSql->fp = tscAsyncFetchRowsProxy; pSql->param = param; - if (pRes->qId == 0) { - tscError("qhandle is invalid"); - pRes->code = TSDB_CODE_TSC_INVALID_QHANDLE; - tscAsyncResultOnError(pSql); - return; - } - tscResetForNextRetrieve(pRes); // handle outer query based on the already retrieved nest query results. SQueryInfo* pQueryInfo = tscGetQueryInfo(pCmd); if (pQueryInfo->pUpstream != NULL && taosArrayGetSize(pQueryInfo->pUpstream) > 0) { SSchedMsg schedMsg = {0}; - schedMsg.fp = doRetrieveSubqueryData; + schedMsg.fp = doRetrieveSubqueryData; schedMsg.ahandle = (void *)pSql; schedMsg.thandle = (void *)1; - schedMsg.msg = 0; + schedMsg.msg = 0; taosScheduleTask(tscQhandle, &schedMsg); return; } + if (pRes->qId == 0) { + tscError("qhandle is invalid"); + pRes->code = TSDB_CODE_TSC_INVALID_QHANDLE; + tscAsyncResultOnError(pSql); + return; + } + if (pCmd->command == TSDB_SQL_TABLE_JOIN_RETRIEVE) { tscFetchDatablockForSubquery(pSql); } else if (pRes->completed) { diff --git a/src/client/src/tscSQLParser.c b/src/client/src/tscSQLParser.c index 0a018c863f..819f1af4f0 100644 --- a/src/client/src/tscSQLParser.c +++ b/src/client/src/tscSQLParser.c @@ -899,8 +899,7 @@ int32_t tscValidateSqlInfo(SSqlObj* pSql, struct SSqlInfo* pInfo) { for (int32_t i = 0; i < size; ++i) { SSqlNode* pSqlNode = taosArrayGetP(pInfo->list, i); - tscTrace("%p start to parse %dth subclause, total:%"PRIzu, pSql, i, size); - + tscTrace("0x%"PRIx64" start to parse the %dth subclause, total:%"PRIzu, pSql->self, i, size); // normalizeSqlNode(pSqlNode); // normalize the column name in each function if ((code = validateSqlNode(pSql, pSqlNode, pQueryInfo)) != TSDB_CODE_SUCCESS) { return code; diff --git a/src/client/src/tscServer.c b/src/client/src/tscServer.c index cebabfc024..7be3bbb3aa 100644 --- a/src/client/src/tscServer.c +++ b/src/client/src/tscServer.c @@ -392,14 +392,17 @@ void tscProcessMsgFromServer(SRpcMsg *rpcMsg, SRpcEpSet *pEpSet) { // single table query error need to be handled here. if ((cmd == TSDB_SQL_SELECT || cmd == TSDB_SQL_UPDATE_TAGS_VAL) && - (((rpcMsg->code == TSDB_CODE_TDB_INVALID_TABLE_ID || // change the retry procedure + (((rpcMsg->code == TSDB_CODE_TDB_INVALID_TABLE_ID || rpcMsg->code == TSDB_CODE_VND_INVALID_VGROUP_ID)) || - rpcMsg->code == TSDB_CODE_RPC_NETWORK_UNAVAIL || // change the retry procedure + rpcMsg->code == TSDB_CODE_RPC_NETWORK_UNAVAIL || rpcMsg->code == TSDB_CODE_APP_NOT_READY)) { - if (TSDB_QUERY_HAS_TYPE(pQueryInfo->type, (TSDB_QUERY_TYPE_STABLE_SUBQUERY | TSDB_QUERY_TYPE_SUBQUERY | + // 1. super table subquery + // 2. nest queries are all not updated the tablemeta and retry parse the sql after cleanup local tablemeta/vgroup id buffer + if ((TSDB_QUERY_HAS_TYPE(pQueryInfo->type, (TSDB_QUERY_TYPE_STABLE_SUBQUERY | TSDB_QUERY_TYPE_SUBQUERY | TSDB_QUERY_TYPE_TAG_FILTER_QUERY)) && - !TSDB_QUERY_HAS_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_PROJECTION_QUERY)) { + !TSDB_QUERY_HAS_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_PROJECTION_QUERY)) || + (TSDB_QUERY_HAS_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_NEST_SUBQUERY))) { // do nothing in case of super table subquery } else { pSql->retry += 1; diff --git a/src/client/src/tscSubquery.c b/src/client/src/tscSubquery.c index c08b086d84..7dcd699db6 100644 --- a/src/client/src/tscSubquery.c +++ b/src/client/src/tscSubquery.c @@ -2706,7 +2706,6 @@ void tscHandleSubqueryError(SRetrieveSupport *trsupport, SSqlObj *pSql, int numO // release allocated resource tscDestroyGlobalMergerEnv(trsupport->pExtMemBuffer, trsupport->pOrderDescriptor, pState->numOfSub); - tscFreeRetrieveSup(pSql); // in case of second stage join subquery, invoke its callback function instead of regular QueueAsyncRes @@ -2717,10 +2716,13 @@ void tscHandleSubqueryError(SRetrieveSupport *trsupport, SSqlObj *pSql, int numO int32_t code = pParentSql->res.code; if ((code == TSDB_CODE_TDB_INVALID_TABLE_ID || code == TSDB_CODE_VND_INVALID_VGROUP_ID) && pParentSql->retry < pParentSql->maxRetry) { // remove the cached tableMeta and vgroup id list, and then parse the sql again - STableMetaInfo* pTableMetaInfo = tscGetTableMetaInfoFromCmd(&pParentSql->cmd, 0); + SSqlCmd* pParentCmd = &pParentSql->cmd; + STableMetaInfo* pTableMetaInfo = tscGetTableMetaInfoFromCmd(pParentCmd, 0); tscRemoveTableMetaBuf(pTableMetaInfo, pParentSql->self); - tscResetSqlCmd(&pParentSql->cmd, true); + pParentCmd->pTableMetaMap = tscCleanupTableMetaMap(pParentCmd->pTableMetaMap); + pParentCmd->pTableMetaMap = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK); + pParentSql->res.code = TSDB_CODE_SUCCESS; pParentSql->retry++; diff --git a/src/client/src/tscUtil.c b/src/client/src/tscUtil.c index 288cefcde0..a68f8deced 100644 --- a/src/client/src/tscUtil.c +++ b/src/client/src/tscUtil.c @@ -3180,6 +3180,7 @@ static void freeQueryInfoImpl(SQueryInfo* pQueryInfo) { } pQueryInfo->tsBuf = tsBufDestroy(pQueryInfo->tsBuf); + pQueryInfo->fillType = 0; tfree(pQueryInfo->fillVal); tfree(pQueryInfo->buf); @@ -3818,13 +3819,64 @@ static void tscSubqueryRetrieveCallback(void* param, TAOS_RES* tres, int code) { } } -// todo handle the failure static void tscSubqueryCompleteCallback(void* param, TAOS_RES* tres, int code) { + SSqlObj* pSql = tres; + SRetrieveSupport* ps = param; + + if (pSql->res.code != TSDB_CODE_SUCCESS) { + SSqlObj* pParentSql = ps->pParentSql; + + int32_t index = ps->subqueryIndex; + bool ret = subAndCheckDone(pSql, pParentSql, index); + + tfree(ps); + pSql->param = NULL; + + if (!ret) { + tscDebug("0x%"PRIx64" sub:0x%"PRIx64" orderOfSub:%d completed, not all subquery finished", pParentSql->self, pSql->self, index); + return; + } + + // todo refactor + tscDebug("0x%"PRIx64" all subquery response received, retry", pParentSql->self); + + SSqlCmd* pParentCmd = &pParentSql->cmd; + STableMetaInfo* pTableMetaInfo = tscGetTableMetaInfoFromCmd(pParentCmd, 0); + tscRemoveTableMetaBuf(pTableMetaInfo, pParentSql->self); + + pParentCmd->pTableMetaMap = tscCleanupTableMetaMap(pParentCmd->pTableMetaMap); + pParentCmd->pTableMetaMap = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK); + + pParentSql->res.code = TSDB_CODE_SUCCESS; + pParentSql->retry++; + + tscDebug("0x%"PRIx64" retry parse sql and send query, prev error: %s, retry:%d", pParentSql->self, + tstrerror(code), pParentSql->retry); + + code = tsParseSql(pParentSql, true); + if (code == TSDB_CODE_TSC_ACTION_IN_PROGRESS) { + return; + } + + if (code != TSDB_CODE_SUCCESS) { + pParentSql->res.code = code; + tscAsyncResultOnError(pParentSql); + return; + } + + SQueryInfo *pQueryInfo = tscGetQueryInfo(pParentCmd); + executeQuery(pParentSql, pQueryInfo); + return; + } + taos_fetch_rows_a(tres, tscSubqueryRetrieveCallback, param); } // do execute the query according to the query execution plan void executeQuery(SSqlObj* pSql, SQueryInfo* pQueryInfo) { + int32_t code = TSDB_CODE_SUCCESS; + int32_t numOfInit = 0; + if (pSql->cmd.command == TSDB_SQL_RETRIEVE_EMPTY_RESULT) { (*pSql->fp)(pSql->param, pSql, 0); return; @@ -3839,7 +3891,12 @@ void executeQuery(SSqlObj* pSql, SQueryInfo* pQueryInfo) { pSql->pSubs = calloc(pSql->subState.numOfSub, POINTER_BYTES); pSql->subState.states = calloc(pSql->subState.numOfSub, sizeof(int8_t)); - pthread_mutex_init(&pSql->subState.mutex, NULL); + code = pthread_mutex_init(&pSql->subState.mutex, NULL); + + if (pSql->pSubs == NULL || pSql->subState.states == NULL || code != TSDB_CODE_SUCCESS) { + code = TSDB_CODE_TSC_OUT_OF_MEMORY; + goto _error; + } for(int32_t i = 0; i < pSql->subState.numOfSub; ++i) { SQueryInfo* pSub = taosArrayGetP(pQueryInfo->pUpstream, i); @@ -3847,46 +3904,71 @@ void executeQuery(SSqlObj* pSql, SQueryInfo* pQueryInfo) { pSql->cmd.active = pSub; pSql->cmd.command = TSDB_SQL_SELECT; - // TODO handle memory failure SSqlObj* pNew = (SSqlObj*)calloc(1, sizeof(SSqlObj)); if (pNew == NULL) { - terrno = TSDB_CODE_TSC_OUT_OF_MEMORY; - tscError("pNew == NULL, out of memory"); - return; + code = TSDB_CODE_TSC_OUT_OF_MEMORY; + goto _error; } - pNew->pTscObj = pSql->pTscObj; + pNew->pTscObj = pSql->pTscObj; pNew->signature = pNew; - pNew->sqlstr = strdup(pSql->sqlstr); // todo refactor - pNew->fp = tscSubqueryCompleteCallback; + pNew->sqlstr = strdup(pSql->sqlstr); + pNew->fp = tscSubqueryCompleteCallback; + pNew->maxRetry = pSql->maxRetry; tsem_init(&pNew->rspSem, 0, 0); SRetrieveSupport* ps = calloc(1, sizeof(SRetrieveSupport)); // todo use object id + if (ps == NULL) { + tscFreeSqlObj(pNew); + goto _error; + } + ps->pParentSql = pSql; ps->subqueryIndex = i; pNew->param = ps; pSql->pSubs[i] = pNew; - registerSqlObj(pNew); SSqlCmd* pCmd = &pNew->cmd; pCmd->command = TSDB_SQL_SELECT; - if (tscAddQueryInfo(pCmd) != TSDB_CODE_SUCCESS) { + if ((code = tscAddQueryInfo(pCmd)) != TSDB_CODE_SUCCESS) { + goto _error; } SQueryInfo* pNewQueryInfo = tscGetQueryInfo(pCmd); tscQueryInfoCopy(pNewQueryInfo, pSub); - // create sub query to handle the sub query. - executeQuery(pNew, pNewQueryInfo); + TSDB_QUERY_SET_TYPE(pNewQueryInfo->type, TSDB_QUERY_TYPE_NEST_SUBQUERY); + numOfInit++; + } + + for(int32_t i = 0; i < pSql->subState.numOfSub; ++i) { + SSqlObj* psub = pSql->pSubs[i]; + registerSqlObj(psub); + + // create sub query to handle the sub query. + SQueryInfo* pq = tscGetQueryInfo(&psub->cmd); + executeQuery(psub, pq); } - // merge sub query result and generate final results return; } pSql->cmd.active = pQueryInfo; doExecuteQuery(pSql, pQueryInfo); + return; + + _error: + for(int32_t i = 0; i < numOfInit; ++i) { + SSqlObj* p = pSql->pSubs[i]; + tscFreeSqlObj(p); + } + + pSql->res.code = code; + pSql->subState.numOfSub = 0; // not initialized sub query object will not be freed + tfree(pSql->subState.states); + tfree(pSql->pSubs); + tscAsyncResultOnError(pSql); } int16_t tscGetJoinTagColIdByUid(STagCond* pTagCond, uint64_t uid) { diff --git a/src/inc/taosdef.h b/src/inc/taosdef.h index 4a5497795f..94be247b0d 100644 --- a/src/inc/taosdef.h +++ b/src/inc/taosdef.h @@ -365,6 +365,7 @@ do { \ #define TSDB_QUERY_TYPE_MULTITABLE_QUERY 0x200u #define TSDB_QUERY_TYPE_FILE_INSERT 0x400u // insert data from file #define TSDB_QUERY_TYPE_STMT_INSERT 0x800u // stmt insert type +#define TSDB_QUERY_TYPE_NEST_SUBQUERY 0x1000u // nested sub query #define TSDB_QUERY_HAS_TYPE(x, _type) (((x) & (_type)) != 0) #define TSDB_QUERY_SET_TYPE(x, _type) ((x) |= (_type)) diff --git a/src/os/src/detail/osTimer.c b/src/os/src/detail/osTimer.c index c381b3e825..618df8a8ba 100644 --- a/src/os/src/detail/osTimer.c +++ b/src/os/src/detail/osTimer.c @@ -38,7 +38,7 @@ static void *taosProcessAlarmSignal(void *tharg) { struct sigevent sevent = {{0}}; - setThreadName("alarmSignal"); + setThreadName("tmr"); #ifdef _ALPINE sevent.sigev_notify = SIGEV_THREAD; diff --git a/src/tsdb/src/tsdbRead.c b/src/tsdb/src/tsdbRead.c index f5c01d86e7..d59ede920f 100644 --- a/src/tsdb/src/tsdbRead.c +++ b/src/tsdb/src/tsdbRead.c @@ -486,6 +486,10 @@ static STsdbQueryHandle* tsdbQueryTablesImpl(STsdbRepo* tsdb, STsdbQueryCond* pC TsdbQueryHandleT* tsdbQueryTables(STsdbRepo* tsdb, STsdbQueryCond* pCond, STableGroupInfo* groupList, uint64_t qId, SMemRef* pRef) { STsdbQueryHandle* pQueryHandle = tsdbQueryTablesImpl(tsdb, pCond, qId, pRef); + if (pQueryHandle == NULL) { + return NULL; + } + if (emptyQueryTimewindow(pQueryHandle)) { return (TsdbQueryHandleT*) pQueryHandle; } @@ -596,6 +600,10 @@ TsdbQueryHandleT tsdbQueryLastRow(STsdbRepo *tsdb, STsdbQueryCond *pCond, STable } STsdbQueryHandle *pQueryHandle = (STsdbQueryHandle*) tsdbQueryTables(tsdb, pCond, groupList, qId, pMemRef); + if (pQueryHandle == NULL) { + return NULL; + } + int32_t code = checkForCachedLastRow(pQueryHandle, groupList); if (code != TSDB_CODE_SUCCESS) { // set the numOfTables to be 0 terrno = code; @@ -613,6 +621,10 @@ TsdbQueryHandleT tsdbQueryLastRow(STsdbRepo *tsdb, STsdbQueryCond *pCond, STable TsdbQueryHandleT tsdbQueryCacheLast(STsdbRepo *tsdb, STsdbQueryCond *pCond, STableGroupInfo *groupList, uint64_t qId, SMemRef* pMemRef) { STsdbQueryHandle *pQueryHandle = (STsdbQueryHandle*) tsdbQueryTables(tsdb, pCond, groupList, qId, pMemRef); + if (pQueryHandle == NULL) { + return NULL; + } + int32_t code = checkForCachedLast(pQueryHandle); if (code != TSDB_CODE_SUCCESS) { // set the numOfTables to be 0 terrno = code; diff --git a/tests/script/general/parser/join.sim b/tests/script/general/parser/join.sim index 2c14a86c3a..e2132589bd 100644 --- a/tests/script/general/parser/join.sim +++ b/tests/script/general/parser/join.sim @@ -444,6 +444,10 @@ if $rows != $val then return -1 endi +print ================>TD-5600 +sql select first(join_tb0.c8),first(join_tb0.c9) from join_tb1 , join_tb0 where join_tb1.ts = join_tb0.ts and join_tb1.ts <= 100002 and join_tb1.ts>=100000 interval(1s) fill(linear); + + #=============================================================== sql select first(join_tb0.c8),first(join_tb0.c9) from join_tb1 , join_tb0 where join_tb1.ts = join_tb0.ts and join_tb1.ts <= 100002 and join_tb0.c7 = true