From 84158d861d24ebe38a8282dfa556364241346837 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Thu, 2 Jun 2022 16:54:35 +0800 Subject: [PATCH] enh(query): query APIs are built on top of async APIs. --- include/client/taos.h | 1 - include/libs/scheduler/scheduler.h | 2 - source/client/inc/clientInt.h | 18 +- source/client/src/clientEnv.c | 9 +- source/client/src/clientImpl.c | 286 ++++++++++++++++--------- source/client/src/clientMain.c | 239 +++++++++++++++++++-- source/client/src/clientMsgHandler.c | 59 +++-- source/client/src/clientSml.c | 4 +- source/client/src/clientStmt.c | 5 +- source/client/test/clientTests.cpp | 94 +++++++- source/client/test/smlTest.cpp | 30 +-- source/common/src/systable.c | 2 +- source/libs/parser/src/parTranslater.c | 4 - source/libs/scheduler/src/schDbg.c | 9 - source/libs/scheduler/src/scheduler.c | 19 +- tests/script/tsim/testsuit.sim | 183 +++++++++------- 16 files changed, 685 insertions(+), 279 deletions(-) diff --git a/include/client/taos.h b/include/client/taos.h index 0b8c67aa79..11cfc4c048 100644 --- a/include/client/taos.h +++ b/include/client/taos.h @@ -151,7 +151,6 @@ DLL_EXPORT int taos_stmt_affected_rows(TAOS_STMT *stmt); DLL_EXPORT int taos_stmt_affected_rows_once(TAOS_STMT *stmt); DLL_EXPORT TAOS_RES *taos_query(TAOS *taos, const char *sql); -DLL_EXPORT TAOS_RES *taos_query_l(TAOS *taos, const char *sql, int sqlLen); DLL_EXPORT TAOS_ROW taos_fetch_row(TAOS_RES *res); DLL_EXPORT int taos_result_precision(TAOS_RES *res); // get the time precision of result diff --git a/include/libs/scheduler/scheduler.h b/include/libs/scheduler/scheduler.h index 0d32cce20b..cdbd4f784c 100644 --- a/include/libs/scheduler/scheduler.h +++ b/include/libs/scheduler/scheduler.h @@ -121,8 +121,6 @@ void schedulerFreeJob(int64_t job); void schedulerDestroy(void); void schdExecCallback(SQueryResult* pResult, void* param, int32_t code); -void schdFetchCallback(void* pResult, void* param, int32_t code); - #ifdef __cplusplus } diff --git a/source/client/inc/clientInt.h b/source/client/inc/clientInt.h index d9f3351008..449a9853a0 100644 --- a/source/client/inc/clientInt.h +++ b/source/client/inc/clientInt.h @@ -179,7 +179,9 @@ typedef struct SReqResultInfo { typedef struct SRequestSendRecvBody { tsem_t rspSem; // not used now - void* fp; + __taos_async_fn_t queryFp; + __taos_async_fn_t fetchFp; + void* param; SDataBuf requestMsg; int64_t queryJob; // query job, created according to sql query DAG. struct SQueryPlan* pDag; // the query dag, generated according to the sql statement. @@ -214,12 +216,20 @@ typedef struct SRequestObj { SRequestSendRecvBody body; } SRequestObj; +typedef struct SSyncQueryParam { + tsem_t sem; + SRequestObj* pRequest; +} SSyncQueryParam; + +void* doAsyncFetchRow(SRequestObj* pRequest, bool setupOneRowPtr, bool convertUcs4); void* doFetchRows(SRequestObj* pRequest, bool setupOneRowPtr, bool convertUcs4); + void doSetOneRowPtr(SReqResultInfo* pResultInfo); void setResPrecision(SReqResultInfo* pResInfo, int32_t precision); int32_t setQueryResultFromRsp(SReqResultInfo* pResultInfo, const SRetrieveTableRsp* pRsp, bool convertUcs4); void setResSchemaInfo(SReqResultInfo* pResInfo, const SSchema* pSchema, int32_t numOfCols); void doFreeReqResultInfo(SReqResultInfo* pResInfo); +SRequestObj* execQuery(STscObj* pTscObj, const char* sql, int sqlLen); static FORCE_INLINE SReqResultInfo* tmqGetCurResInfo(TAOS_RES* res) { SMqRspObj* msg = (SMqRspObj*)res; @@ -265,7 +275,7 @@ int32_t releaseTscObj(int64_t rid); uint64_t generateRequestId(); -void* createRequest(STscObj* pObj, __taos_async_fn_t fp, void* param, int32_t type); +void* createRequest(STscObj* pObj, void* param, int32_t type); void destroyRequest(SRequestObj* pRequest); SRequestObj* acquireRequest(int64_t rid); int32_t releaseRequest(int64_t rid); @@ -313,8 +323,8 @@ int hbAddConnInfo(SAppHbMgr* pAppHbMgr, SClientHbKey connKey, void* key, void* v // --- mq void hbMgrInitMqHbRspHandle(); -SRequestObj* launchQueryImpl(SRequestObj* pRequest, SQuery* pQuery, int32_t code, bool keepQuery, void** res); -int32_t getQueryPlan(SRequestObj* pRequest, SQuery* pQuery, SArray** pNodeList); +SRequestObj* launchQueryImpl(SRequestObj* pRequest, SQuery* pQuery, bool keepQuery, void** res); +void launchAsyncQuery(SRequestObj* pRequest, SQuery* pQuery); int32_t scheduleQuery(SRequestObj* pRequest, SQueryPlan* pDag, SArray* pNodeList, void** res); int32_t refreshMeta(STscObj* pTscObj, SRequestObj* pRequest); diff --git a/source/client/src/clientEnv.c b/source/client/src/clientEnv.c index 669b2bc97e..4e7ee39692 100644 --- a/source/client/src/clientEnv.c +++ b/source/client/src/clientEnv.c @@ -13,10 +13,11 @@ * along with this program. If not, see . */ +#include "os.h" #include "catalog.h" +#include "functionMgt.h" #include "clientInt.h" #include "clientLog.h" -#include "os.h" #include "query.h" #include "scheduler.h" #include "tcache.h" @@ -170,7 +171,7 @@ STscObj *acquireTscObj(int64_t rid) { return (STscObj *)taosAcquireRef(clientCon int32_t releaseTscObj(int64_t rid) { return taosReleaseRef(clientConnRefPool, rid); } -void *createRequest(STscObj *pObj, __taos_async_fn_t fp, void *param, int32_t type) { +void *createRequest(STscObj *pObj, void *param, int32_t type) { assert(pObj != NULL); SRequestObj *pRequest = (SRequestObj *)taosMemoryCalloc(1, sizeof(SRequestObj)); @@ -184,9 +185,10 @@ void *createRequest(STscObj *pObj, __taos_async_fn_t fp, void *param, int32_t ty pRequest->requestId = generateRequestId(); pRequest->metric.start = taosGetTimestampUs(); + pRequest->body.param = param; + pRequest->type = type; pRequest->pTscObj = pObj; - pRequest->body.fp = fp; // not used it yet pRequest->msgBuf = taosMemoryCalloc(1, ERROR_MSG_BUF_DEFAULT_SIZE); pRequest->msgBufLen = ERROR_MSG_BUF_DEFAULT_SIZE; tsem_init(&pRequest->body.rspSem, 0, 0); @@ -285,6 +287,7 @@ void taos_init_imp(void) { taosSetCoreDump(true); initTaskQueue(); + fmFuncMgtInit(); clientConnRefPool = taosOpenRef(200, destroyTscObj); clientReqRefPool = taosOpenRef(40960, doDestroyRequest); diff --git a/source/client/src/clientImpl.c b/source/client/src/clientImpl.c index eb4c4cb59f..772ad6177a 100644 --- a/source/client/src/clientImpl.c +++ b/source/client/src/clientImpl.c @@ -132,7 +132,7 @@ TAOS* taos_connect_internal(const char* ip, const char* user, const char* pass, } int32_t buildRequest(STscObj* pTscObj, const char* sql, int sqlLen, SRequestObj** pRequest) { - *pRequest = createRequest(pTscObj, NULL, NULL, TSDB_SQL_SELECT); + *pRequest = createRequest(pTscObj, NULL, TSDB_SQL_SELECT); if (*pRequest == NULL) { tscError("failed to malloc sqlObj"); return TSDB_CODE_TSC_OUT_OF_MEMORY; @@ -190,6 +190,7 @@ int32_t parseSql(SRequestObj* pRequest, bool topicQuery, SQuery** pQuery, SStmtC setResPrecision(&pRequest->body.resInfo, (*pQuery)->precision); } } + if (TSDB_CODE_SUCCESS == code || NEED_CLIENT_HANDLE_ERROR(code)) { TSWAP(pRequest->dbList, (*pQuery)->pDbList); TSWAP(pRequest->tableList, (*pQuery)->pTableList); @@ -228,24 +229,52 @@ int32_t execDdlQuery(SRequestObj* pRequest, SQuery* pQuery) { return TSDB_CODE_SUCCESS; } +static SAppInstInfo* getAppInfo(SRequestObj* pRequest) { + return pRequest->pTscObj->pAppInfo; +} + +int32_t asyncExecDdlQuery(SRequestObj* pRequest, SQuery* pQuery) { + // drop table if exists not_exists_table + if (NULL == pQuery->pCmdMsg) { + return TSDB_CODE_SUCCESS; + } + + SCmdMsgInfo* pMsgInfo = pQuery->pCmdMsg; + pRequest->type = pMsgInfo->msgType; + pRequest->body.requestMsg = (SDataBuf){.pData = pMsgInfo->pMsg, .len = pMsgInfo->msgLen, .handle = NULL}; + pMsgInfo->pMsg = NULL; // pMsg transferred to SMsgSendInfo management + + SAppInstInfo* pAppInfo = getAppInfo(pRequest); + SMsgSendInfo* pSendMsg = buildMsgInfoImpl(pRequest); + + int64_t transporterId = 0; + asyncSendMsgToServer(pAppInfo->pTransporter, &pMsgInfo->epSet, &transporterId, pSendMsg); + return TSDB_CODE_SUCCESS; +} + int32_t getPlan(SRequestObj* pRequest, SQuery* pQuery, SQueryPlan** pPlan, SArray* pNodeList) { pRequest->type = pQuery->msgType; + SAppInstInfo* pAppInfo = getAppInfo(pRequest); + SPlanContext cxt = {.queryId = pRequest->requestId, .acctId = pRequest->pTscObj->acctId, - .mgmtEpSet = getEpSet_s(&pRequest->pTscObj->pAppInfo->mgmtEp), + .mgmtEpSet = getEpSet_s(&pAppInfo->mgmtEp), .pAstRoot = pQuery->pRoot, .showRewrite = pQuery->showRewrite, .pMsg = pRequest->msgBuf, .msgLen = ERROR_MSG_BUF_DEFAULT_SIZE}; - SEpSet mgmtEpSet = getEpSet_s(&pRequest->pTscObj->pAppInfo->mgmtEp); - SCatalog* pCatalog = NULL; - int32_t code = catalogGetHandle(pRequest->pTscObj->pAppInfo->clusterId, &pCatalog); + + SCatalog* pCatalog = NULL; + int32_t code = catalogGetHandle(pAppInfo->clusterId, &pCatalog); + if (TSDB_CODE_SUCCESS == code) { - code = catalogGetQnodeList(pCatalog, pRequest->pTscObj->pAppInfo->pTransporter, &mgmtEpSet, pNodeList); + code = catalogGetQnodeList(pCatalog, pAppInfo->pTransporter, &cxt.mgmtEpSet, pNodeList); } + if (TSDB_CODE_SUCCESS == code) { code = qCreateQueryPlan(&cxt, pPlan, pNodeList); } + return code; } @@ -338,7 +367,7 @@ int32_t scheduleAsyncQuery(SRequestObj* pRequest, SQueryPlan* pDag, SArray* pNod int32_t scheduleQuery(SRequestObj* pRequest, SQueryPlan* pDag, SArray* pNodeList, void** pRes) { void* pTransporter = pRequest->pTscObj->pAppInfo->pTransporter; - SQueryResult res = {.code = 0, .numOfRows = 0}; + SQueryResult res = {0}; int32_t code = schedulerExecJob(pTransporter, pNodeList, pDag, &pRequest->body.queryJob, pRequest->sqlstr, pRequest->metric.start, &res); if (code != TSDB_CODE_SUCCESS) { @@ -368,11 +397,6 @@ int32_t scheduleQuery(SRequestObj* pRequest, SQueryPlan* pDag, SArray* pNodeList return pRequest->code; } -int32_t getQueryPlan(SRequestObj* pRequest, SQuery* pQuery, SArray** pNodeList) { - *pNodeList = taosArrayInit(4, sizeof(struct SQueryNodeAddr)); - return getPlan(pRequest, pQuery, &pRequest->body.pDag, *pNodeList); -} - int32_t validateSversion(SRequestObj* pRequest, void* res) { SArray* pArray = NULL; int32_t code = 0; @@ -444,35 +468,42 @@ void freeRequestRes(SRequestObj* pRequest, void* res) { } } -SRequestObj* launchQueryImpl(SRequestObj* pRequest, SQuery* pQuery, int32_t code, bool keepQuery, void** res) { +void schedulerExecCb(SQueryResult* pResult, void* param, int32_t code) { + SRequestObj* pRequest = (SRequestObj*) param; + + // return to client + pRequest->body.queryFp(pRequest->body.param, pRequest, code); +} + +SRequestObj* launchQueryImpl(SRequestObj* pRequest, SQuery* pQuery, bool keepQuery, void** res) { void* pRes = NULL; - if (TSDB_CODE_SUCCESS == code) { - switch (pQuery->execMode) { - case QUERY_EXEC_MODE_LOCAL: - code = execLocalCmd(pRequest, pQuery); - break; - case QUERY_EXEC_MODE_RPC: - code = execDdlQuery(pRequest, pQuery); - break; - case QUERY_EXEC_MODE_SCHEDULE: { - SArray* pNodeList = taosArrayInit(4, sizeof(struct SQueryNodeAddr)); - code = getPlan(pRequest, pQuery, &pRequest->body.pDag, pNodeList); - if (TSDB_CODE_SUCCESS == code) { - code = scheduleQuery(pRequest, pRequest->body.pDag, pNodeList, &pRes); - if (NULL != pRes) { - code = validateSversion(pRequest, pRes); - } + int32_t code = 0; + switch (pQuery->execMode) { + case QUERY_EXEC_MODE_LOCAL: + code = execLocalCmd(pRequest, pQuery); + break; + case QUERY_EXEC_MODE_RPC: + code = execDdlQuery(pRequest, pQuery); + break; + case QUERY_EXEC_MODE_SCHEDULE: { + SArray* pNodeList = taosArrayInit(4, sizeof(struct SQueryNodeAddr)); + code = getPlan(pRequest, pQuery, &pRequest->body.pDag, pNodeList); + if (TSDB_CODE_SUCCESS == code) { + code = scheduleQuery(pRequest, pRequest->body.pDag, pNodeList, &pRes); + if (NULL != pRes) { + code = validateSversion(pRequest, pRes); } - taosArrayDestroy(pNodeList); - break; } - case QUERY_EXEC_MODE_EMPTY_RESULT: - pRequest->type = TSDB_SQL_RETRIEVE_EMPTY_RESULT; - break; - default: - break; + + taosArrayDestroy(pNodeList); + break; } + case QUERY_EXEC_MODE_EMPTY_RESULT: + pRequest->type = TSDB_SQL_RETRIEVE_EMPTY_RESULT; + break; + default: + break; } if (!keepQuery) { @@ -509,7 +540,70 @@ SRequestObj* launchQuery(STscObj* pTscObj, const char* sql, int sqlLen) { return pRequest; } - return launchQueryImpl(pRequest, pQuery, code, false, NULL); + return launchQueryImpl(pRequest, pQuery, false, NULL); +} + +void launchAsyncQuery(SRequestObj* pRequest, SQuery* pQuery) { + void* pRes = NULL; + + int32_t code = 0; + switch (pQuery->execMode) { + case QUERY_EXEC_MODE_LOCAL: + code = execLocalCmd(pRequest, pQuery); + break; + case QUERY_EXEC_MODE_RPC: + code = asyncExecDdlQuery(pRequest, pQuery); + break; + case QUERY_EXEC_MODE_SCHEDULE: { + SArray* pNodeList = taosArrayInit(4, sizeof(struct SQueryNodeAddr)); + + pRequest->type = pQuery->msgType; + + SPlanContext cxt = {.queryId = pRequest->requestId, + .acctId = pRequest->pTscObj->acctId, + .mgmtEpSet = getEpSet_s(&pRequest->pTscObj->pAppInfo->mgmtEp), + .pAstRoot = pQuery->pRoot, + .showRewrite = pQuery->showRewrite, + .pMsg = pRequest->msgBuf, + .msgLen = ERROR_MSG_BUF_DEFAULT_SIZE}; + + SAppInstInfo* pAppInfo = getAppInfo(pRequest); + + if (TSDB_CODE_SUCCESS == code) { + code = qCreateQueryPlan(&cxt, &pRequest->body.pDag, pNodeList); + } + + if (TSDB_CODE_SUCCESS == code) { + schedulerAsyncExecJob(pAppInfo->pTransporter, pNodeList, pRequest->body.pDag, &pRequest->body.queryJob, + pRequest->sqlstr, pRequest->metric.start, schedulerExecCb, pRequest); + // if (NULL != pRes) { + // code = validateSversion(pRequest, pRes); + // } + } + + taosArrayDestroy(pNodeList); + break; + } + case QUERY_EXEC_MODE_EMPTY_RESULT: + pRequest->type = TSDB_SQL_RETRIEVE_EMPTY_RESULT; + break; + default: + break; + } + + // if (!keepQuery) { + // qDestroyQuery(pQuery); + // } + + if (NULL != pRequest && TSDB_CODE_SUCCESS != code) { + pRequest->code = terrno; + } + + // if (res) { + // *res = pRes; + // } else { +// freeRequestRes(pRequest, pRes); +// pRes = NULL; } int32_t refreshMeta(STscObj* pTscObj, SRequestObj* pRequest) { @@ -592,17 +686,6 @@ SRequestObj* execQuery(STscObj* pTscObj, const char* sql, int sqlLen) { return pRequest; } -TAOS_RES* taos_query_l(TAOS* taos, const char* sql, int sqlLen) { - STscObj* pTscObj = (STscObj*)taos; - if (sqlLen > (size_t)TSDB_MAX_ALLOWED_SQL_LEN) { - tscError("sql string exceeds max length:%d", TSDB_MAX_ALLOWED_SQL_LEN); - terrno = TSDB_CODE_TSC_EXCEED_SQL_LIMIT; - return NULL; - } - - return execQuery(pTscObj, sql, sqlLen); -} - int initEpSetFromCfg(const char* firstEp, const char* secondEp, SCorEpSet* pEpSet) { pEpSet->version = 0; @@ -652,7 +735,7 @@ STscObj* taosConnectImpl(const char* user, const char* auth, const char* db, __t return pTscObj; } - SRequestObj* pRequest = createRequest(pTscObj, fp, param, TDMT_MND_CONNECT); + SRequestObj* pRequest = createRequest(pTscObj, param, TDMT_MND_CONNECT); if (pRequest == NULL) { destroyTscObj(pTscObj); terrno = TSDB_CODE_TSC_OUT_OF_MEMORY; @@ -765,11 +848,9 @@ void updateTargetEpSet(SMsgSendInfo* pSendInfo, STscObj* pTscObj, SRpcMsg* pMsg, } } - void processMsgFromServer(void* parent, SRpcMsg* pMsg, SEpSet* pEpSet) { SMsgSendInfo* pSendInfo = (SMsgSendInfo*)pMsg->info.ahandle; assert(pMsg->info.ahandle != NULL); - SRequestObj* pRequest = NULL; STscObj* pTscObj = NULL; if (pSendInfo->requestObjRefId != 0) { @@ -870,58 +951,7 @@ void doSetOneRowPtr(SReqResultInfo* pResultInfo) { } } -void* doAsyncFetchRows(SRequestObj* pRequest, bool setupOneRowPtr, bool convertUcs4) { - assert(pRequest != NULL); - - SReqResultInfo* pResultInfo = &pRequest->body.resInfo; - if (pResultInfo->pData == NULL || pResultInfo->current >= pResultInfo->numOfRows) { - // All data has returned to App already, no need to try again - if (pResultInfo->completed) { - pResultInfo->numOfRows = 0; - return NULL; - } - - tsem_init(&schdRspSem, 0, 0); - - SReqResultInfo* pResInfo = &pRequest->body.resInfo; - SSchdFetchParam param = {.pData = (void**)&pResInfo->pData, .code = &pRequest->code}; - pRequest->code = schedulerAsyncFetchRows(pRequest->body.queryJob, schdFetchCallback, ¶m); - if (pRequest->code != TSDB_CODE_SUCCESS) { - pResultInfo->numOfRows = 0; - return NULL; - } - - tsem_wait(&schdRspSem); - if (pRequest->code != TSDB_CODE_SUCCESS) { - pResultInfo->numOfRows = 0; - return NULL; - } - - pRequest->code = setQueryResultFromRsp(&pRequest->body.resInfo, (SRetrieveTableRsp*)pResInfo->pData, convertUcs4); - if (pRequest->code != TSDB_CODE_SUCCESS) { - pResultInfo->numOfRows = 0; - return NULL; - } - - tscDebug("0x%" PRIx64 " fetch results, numOfRows:%d total Rows:%" PRId64 ", complete:%d, reqId:0x%" PRIx64, - pRequest->self, pResInfo->numOfRows, pResInfo->totalRows, pResInfo->completed, pRequest->requestId); - - if (pResultInfo->numOfRows == 0) { - return NULL; - } - } - - if (setupOneRowPtr) { - doSetOneRowPtr(pResultInfo); - pResultInfo->current += 1; - } - - return pResultInfo->row; -} - - void* doFetchRows(SRequestObj* pRequest, bool setupOneRowPtr, bool convertUcs4) { - //return doAsyncFetchRows(pRequest, setupOneRowPtr, convertUcs4); assert(pRequest != NULL); SReqResultInfo* pResultInfo = &pRequest->body.resInfo; @@ -961,6 +991,58 @@ void* doFetchRows(SRequestObj* pRequest, bool setupOneRowPtr, bool convertUcs4) return pResultInfo->row; } +static void syncFetchFn(void* param, TAOS_RES* res, int32_t numOfRows) { + SSyncQueryParam* pParam = param; + tsem_post(&pParam->sem); +} + +void* doAsyncFetchRow(SRequestObj* pRequest, bool setupOneRowPtr, bool convertUcs4) { + assert(pRequest != NULL); + + SReqResultInfo* pResultInfo = &pRequest->body.resInfo; + if (pResultInfo->pData == NULL || pResultInfo->current >= pResultInfo->numOfRows) { + // All data has returned to App already, no need to try again + if (pResultInfo->completed) { + pResultInfo->numOfRows = 0; + return NULL; + } + + SSyncQueryParam* pParam = pRequest->body.param; + + // always converted in async query: convertUcs4 + taos_fetch_rows_a(pRequest, syncFetchFn, pParam); + tsem_wait(&pParam->sem); + } + + /* + pRequest->code = schedulerFetchRows(pRequest->body.queryJob, (void**)&pResInfo->pData); + if (pRequest->code != TSDB_CODE_SUCCESS) { + pResultInfo->numOfRows = 0; + return NULL; + } + + pRequest->code = setQueryResultFromRsp(&pRequest->body.resInfo, (SRetrieveTableRsp*)pResInfo->pData, convertUcs4); + if (pRequest->code != TSDB_CODE_SUCCESS) { + pResultInfo->numOfRows = 0; + return NULL; + } + + tscDebug("0x%" PRIx64 " fetch results, numOfRows:%d total Rows:%" PRId64 ", complete:%d, reqId:0x%" PRIx64, + pRequest->self, pResInfo->numOfRows, pResInfo->totalRows, pResInfo->completed, pRequest->requestId); + + if (pResultInfo->numOfRows == 0) { + return NULL; + } + */ + + if (setupOneRowPtr) { + doSetOneRowPtr(pResultInfo); + pResultInfo->current += 1; + } + + return pResultInfo->row; +} + static int32_t doPrepareResPtr(SReqResultInfo* pResInfo) { if (pResInfo->row == NULL) { pResInfo->row = taosMemoryCalloc(pResInfo->numOfCols, POINTER_BYTES); diff --git a/source/client/src/clientMain.c b/source/client/src/clientMain.c index 53eb443b36..b69757e5cc 100644 --- a/source/client/src/clientMain.c +++ b/source/client/src/clientMain.c @@ -175,12 +175,41 @@ TAOS_FIELD *taos_fetch_fields(TAOS_RES *res) { return pResInfo->userFields; } +static void syncQueryFn(void* param, void* res, int32_t code) { + SSyncQueryParam* pParam = param; + pParam->pRequest = res; + pParam->pRequest->code = code; + + printf("ready to go in query rsp---------------\n"); + tsem_post(&pParam->sem); +} + TAOS_RES *taos_query(TAOS *taos, const char *sql) { if (taos == NULL || sql == NULL) { return NULL; } - return taos_query_l(taos, sql, (int32_t)strlen(sql)); + STscObj* pTscObj = (STscObj*)taos; +#if 0 + size_t sqlLen = strlen(sql); + if (sqlLen > (size_t)TSDB_MAX_ALLOWED_SQL_LEN) { + tscError("sql string exceeds max length:%d", TSDB_MAX_ALLOWED_SQL_LEN); + terrno = TSDB_CODE_TSC_EXCEED_SQL_LIMIT; + return NULL; + } + + return execQuery(pTscObj, sql, sqlLen); +#endif + + SSyncQueryParam* param = taosMemoryCalloc(1, sizeof(struct SSyncQueryParam)); + tsem_init(¶m->sem, 0, 0); + + taos_query_a(pTscObj, sql, syncQueryFn, param); + + printf("start to waiting\n"); + tsem_wait(¶m->sem); + + return param->pRequest; } TAOS_ROW taos_fetch_row(TAOS_RES *res) { @@ -195,8 +224,8 @@ TAOS_ROW taos_fetch_row(TAOS_RES *res) { return NULL; } - return doFetchRows(pRequest, true, true); - + return doAsyncFetchRow(pRequest, true, true); +// return doFetchRows(pRequest, true, true); } else if (TD_RES_TMQ(res)) { SMqRspObj *msg = ((SMqRspObj *)res); SReqResultInfo *pResultInfo; @@ -205,6 +234,7 @@ TAOS_ROW taos_fetch_row(TAOS_RES *res) { } else { pResultInfo = tmqGetCurResInfo(res); } + if (pResultInfo->current < pResultInfo->numOfRows) { doSetOneRowPtr(pResultInfo); pResultInfo->current += 1; @@ -563,38 +593,205 @@ const char *taos_get_server_info(TAOS *taos) { return pTscObj->ver; } +typedef struct SqlParseWrapper { + SParseContext* pCtx; + SCatalogReq catalogReq; + SRequestObj* pRequest; + SQuery* pQuery; +} SqlParseWrapper; + +void retrieveMetaCallback(SMetaData* pResultMeta, void* param, int32_t code) { + SqlParseWrapper *pWrapper = (SqlParseWrapper*) param; + SQuery* pQuery = pWrapper->pQuery; + SRequestObj* pRequest = pWrapper->pRequest; + + if (code != TSDB_CODE_SUCCESS) { + goto _error; + } + + code = qAnalyseSqlSemantic(pWrapper->pCtx, &pWrapper->catalogReq, pResultMeta, pQuery); + if (code != TSDB_CODE_SUCCESS) { + goto _error; + } + + if (pQuery->haveResultSet) { + setResSchemaInfo(&pRequest->body.resInfo, pQuery->pResSchema, (pQuery)->numOfResCols); + setResPrecision(&pRequest->body.resInfo, (pQuery)->precision); + } + + TSWAP(pRequest->dbList, (pQuery)->pDbList); + TSWAP(pRequest->tableList, (pQuery)->pTableList); + + taosMemoryFree(pWrapper); + launchAsyncQuery(pRequest, pQuery); + return; + + _error: + taosMemoryFree(pWrapper); + tscError("0x%" PRIx64 " error occurs, code:%s, return to user app, reqId:%" PRIx64, pRequest->self, tstrerror(code), + pRequest->requestId); + pRequest->code = code; + pRequest->body.queryFp(pRequest->body.param, pRequest, code); +} + +// todo add retry before return user's callback void taos_query_a(TAOS *taos, const char *sql, __taos_async_fn_t fp, void *param) { + ASSERT(fp != NULL); + if (taos == NULL || sql == NULL) { - fp(param, NULL, TSDB_CODE_INVALID_PARA); + terrno = TSDB_CODE_INVALID_PARA; + fp(param, NULL, terrno); return; } - SRequestObj* pRequest = NULL; + size_t sqlLen = strlen(sql); + if (sqlLen > (size_t)TSDB_MAX_ALLOWED_SQL_LEN) { + tscError("sql string exceeds max length:%d", TSDB_MAX_ALLOWED_SQL_LEN); + terrno = TSDB_CODE_TSC_EXCEED_SQL_LIMIT; + + fp(param, NULL, terrno); + return; + } + + SRequestObj *pRequest = NULL; int32_t retryNum = 0; int32_t code = 0; - size_t sqlLen = strlen(sql); - - while (retryNum++ < REQUEST_MAX_TRY_TIMES) { - pRequest = launchQuery(taos, sql, sqlLen); - if (pRequest == NULL || TSDB_CODE_SUCCESS == pRequest->code || !NEED_CLIENT_HANDLE_ERROR(pRequest->code)) { - break; - } - - code = refreshMeta(taos, pRequest); - if (code) { - pRequest->code = code; - break; - } - - destroyRequest(pRequest); + // while (retryNum++ < REQUEST_MAX_TRY_TIMES) { + code = buildRequest(taos, sql, sqlLen, &pRequest); + if (code != TSDB_CODE_SUCCESS) { + terrno = code; + fp(param, NULL, code); + return; } + pRequest->body.queryFp = fp; + pRequest->body.param = param; + + STscObj *pTscObj = pRequest->pTscObj; + + SParseContext* pCxt = taosMemoryCalloc(1, sizeof(SParseContext)); + *pCxt = (SParseContext){.requestId = pRequest->requestId, + .acctId = pTscObj->acctId, + .db = pRequest->pDb, + .topicQuery = false, + .pSql = pRequest->sqlstr, + .sqlLen = pRequest->sqlLen, + .pMsg = pRequest->msgBuf, + .msgLen = ERROR_MSG_BUF_DEFAULT_SIZE, + .pTransporter = pTscObj->pAppInfo->pTransporter, + .pStmtCb = NULL, + .pUser = pTscObj->user, + .isSuperUser = (0 == strcmp(pTscObj->user, TSDB_DEFAULT_USER)), + .async = true,}; + + pCxt->mgmtEpSet = getEpSet_s(&pTscObj->pAppInfo->mgmtEp); + code = catalogGetHandle(pTscObj->pAppInfo->clusterId, &pCxt->pCatalog); + if (code != TSDB_CODE_SUCCESS) { + goto _error; + } + + SQuery * pQuery = NULL; + SCatalogReq catalogReq = {0}; + code = qParseSqlSyntax(pCxt, &pQuery, &catalogReq); + if (code != TSDB_CODE_SUCCESS) { + goto _error; + } + + SqlParseWrapper *pWrapper = taosMemoryCalloc(1, sizeof(SqlParseWrapper)); + pWrapper->pCtx = pCxt; + pWrapper->pQuery = pQuery; + pWrapper->pRequest = pRequest; + pWrapper->catalogReq = catalogReq; + + //todo refactor move to asyncGetAllMeta function + bool allNull = (catalogReq.pUdf == NULL && catalogReq.pUser == NULL && catalogReq.pDbCfg == NULL && + catalogReq.pIndex == NULL && catalogReq.pDbInfo == NULL && catalogReq.pDbVgroup == NULL && + catalogReq.pTableHash == NULL && catalogReq.pTableMeta == NULL && catalogReq.qNodeRequired == false); + + if (allNull) { + SMetaData* pMetaData = taosMemoryCalloc(1, sizeof(SMetaData)); + retrieveMetaCallback(pMetaData, pWrapper, TSDB_CODE_SUCCESS); + } else { + code = catalogAsyncGetAllMeta(pCxt->pCatalog, pCxt->pTransporter, &pCxt->mgmtEpSet, pRequest->requestId, + &catalogReq, retrieveMetaCallback, pWrapper, &pRequest->body.queryJob); + } + + if (code != TSDB_CODE_SUCCESS) { + goto _error; + } + + return; + + // todo handle the retry process + + // if (TSDB_CODE_SUCCESS == code || NEED_CLIENT_HANDLE_ERROR(code)) { + // TSWAP(pRequest->dbList, (pQuery)->pDbList); + // TSWAP(pRequest->tableList, (pQuery)->pTableList); + // } + + _error: + terrno = code; + pRequest->code = code; fp(param, pRequest, code); } +static void fetchCallback(void* pResult, void* param, int32_t code) { + SRequestObj* pRequest = (SRequestObj*) param; + + SReqResultInfo* pResultInfo = &pRequest->body.resInfo; + + pResultInfo->pData = pResult; + pResultInfo->numOfRows = 0; + + if (code != TSDB_CODE_SUCCESS) { + pRequest->code = code; + pRequest->body.fetchFp(pRequest->body.param, pRequest, 0); + return; + } + + if (pRequest->code != TSDB_CODE_SUCCESS) { + pRequest->code = code; + pRequest->body.fetchFp(pRequest->body.param, pRequest, 0); + } + + pRequest->code = setQueryResultFromRsp(&pRequest->body.resInfo, (SRetrieveTableRsp*)pResultInfo->pData, true); + if (pRequest->code != TSDB_CODE_SUCCESS) { + pResultInfo->numOfRows = 0; + + pRequest->code = code; + pRequest->body.fetchFp(pRequest->body.param, pRequest, 0); + } + + tscDebug("0x%" PRIx64 " fetch results, numOfRows:%d total Rows:%" PRId64 ", complete:%d, reqId:0x%" PRIx64, + pRequest->self, pResultInfo->numOfRows, pResultInfo->totalRows, pResultInfo->completed, pRequest->requestId); + + pRequest->body.fetchFp(pRequest->body.param, pRequest, pResultInfo->numOfRows); +} + void taos_fetch_rows_a(TAOS_RES *res, __taos_async_fn_t fp, void *param) { - // TODO + ASSERT (res != NULL && fp != NULL); + + SRequestObj *pRequest = res; + pRequest->body.fetchFp = fp; + + SReqResultInfo *pResultInfo = &pRequest->body.resInfo; + if (taos_num_fields(pRequest) == 0) { + pResultInfo->numOfRows = 0; + pRequest->body.fetchFp(param, pRequest, pResultInfo->numOfRows); + return; + } + + if (pResultInfo->pData == NULL || pResultInfo->current >= pResultInfo->numOfRows) { + // All data has returned to App already, no need to try again + if (pResultInfo->completed) { + pResultInfo->numOfRows = 0; + pRequest->body.fetchFp(param, pRequest, pResultInfo->numOfRows); + return; + } + } + + pRequest->code = schedulerAsyncFetchRows(pRequest->body.queryJob, fetchCallback, pRequest); } TAOS_SUB *taos_subscribe(TAOS *taos, int restart, const char *topic, const char *sql, TAOS_SUBSCRIBE_CALLBACK fp, diff --git a/source/client/src/clientMsgHandler.c b/source/client/src/clientMsgHandler.c index f15315fe60..e2274e46d2 100644 --- a/source/client/src/clientMsgHandler.c +++ b/source/client/src/clientMsgHandler.c @@ -33,7 +33,11 @@ int32_t genericRspCallback(void* param, const SDataBuf* pMsg, int32_t code) { setErrno(pRequest, code); taosMemoryFree(pMsg->pData); - tsem_post(&pRequest->body.rspSem); + if (pRequest->body.queryFp != NULL) { + pRequest->body.queryFp(pRequest->body.param, pRequest, code); + } else { + tsem_post(&pRequest->body.rspSem); + } return code; } @@ -117,7 +121,12 @@ int32_t processCreateDbRsp(void* param, const SDataBuf* pMsg, int32_t code) { if (code != TSDB_CODE_SUCCESS) { setErrno(pRequest, code); } - tsem_post(&pRequest->body.rspSem); + + if (pRequest->body.queryFp) { + pRequest->body.queryFp(pRequest->body.param, pRequest, code); + } else { + tsem_post(&pRequest->body.rspSem); + } return code; } @@ -146,7 +155,13 @@ int32_t processUseDbRsp(void* param, const SDataBuf* pMsg, int32_t code) { if (code != TSDB_CODE_SUCCESS) { taosMemoryFree(pMsg->pData); setErrno(pRequest, code); - tsem_post(&pRequest->body.rspSem); + + if (pRequest->body.queryFp != NULL) { + pRequest->body.queryFp(pRequest->body.param, pRequest, pRequest->code); + } else { + tsem_post(&pRequest->body.rspSem); + } + return code; } @@ -185,7 +200,12 @@ int32_t processUseDbRsp(void* param, const SDataBuf* pMsg, int32_t code) { setConnectionDB(pRequest->pTscObj, db); taosMemoryFree(pMsg->pData); - tsem_post(&pRequest->body.rspSem); + + if (pRequest->body.queryFp != NULL) { + pRequest->body.queryFp(pRequest->body.param, pRequest, pRequest->code); + } else { + tsem_post(&pRequest->body.rspSem); + } return 0; } @@ -196,11 +216,13 @@ int32_t processCreateTableRsp(void* param, const SDataBuf* pMsg, int32_t code) { taosMemoryFree(pMsg->pData); if (code != TSDB_CODE_SUCCESS) { setErrno(pRequest, code); - tsem_post(&pRequest->body.rspSem); - return code; } - tsem_post(&pRequest->body.rspSem); + if (pRequest->body.queryFp != NULL) { + pRequest->body.queryFp(pRequest->body.param, pRequest, code); + } else { + tsem_post(&pRequest->body.rspSem); + } return code; } @@ -208,21 +230,24 @@ int32_t processDropDbRsp(void* param, const SDataBuf* pMsg, int32_t code) { SRequestObj* pRequest = param; if (code != TSDB_CODE_SUCCESS) { setErrno(pRequest, code); - tsem_post(&pRequest->body.rspSem); - return code; + } else { + SDropDbRsp dropdbRsp = {0}; + tDeserializeSDropDbRsp(pMsg->pData, pMsg->len, &dropdbRsp); + + struct SCatalog* pCatalog = NULL; + catalogGetHandle(pRequest->pTscObj->pAppInfo->clusterId, &pCatalog); + catalogRemoveDB(pCatalog, dropdbRsp.db, dropdbRsp.uid); } - SDropDbRsp dropdbRsp = {0}; - tDeserializeSDropDbRsp(pMsg->pData, pMsg->len, &dropdbRsp); - - struct SCatalog* pCatalog = NULL; - catalogGetHandle(pRequest->pTscObj->pAppInfo->clusterId, &pCatalog); - catalogRemoveDB(pCatalog, dropdbRsp.db, dropdbRsp.uid); - - tsem_post(&pRequest->body.rspSem); + if (pRequest->body.queryFp != NULL) { + pRequest->body.queryFp(pRequest->body.param, pRequest, code); + } else { + tsem_post(&pRequest->body.rspSem); + } return code; } +// todo refactor: this arraylist is too large void initMsgHandleFp() { handleRequestRspFp[TMSG_INDEX(TDMT_MND_CONNECT)] = processConnectRsp; handleRequestRspFp[TMSG_INDEX(TDMT_MND_CREATE_DB)] = processCreateDbRsp; diff --git a/source/client/src/clientSml.c b/source/client/src/clientSml.c index 7d623072d6..8e2328b947 100644 --- a/source/client/src/clientSml.c +++ b/source/client/src/clientSml.c @@ -2241,7 +2241,7 @@ static int32_t smlInsertData(SSmlHandle* info) { } info->cost.insertRpcTime = taosGetTimestampUs(); - launchQueryImpl(info->pRequest, info->pQuery, TSDB_CODE_SUCCESS, true, NULL); + launchQueryImpl(info->pRequest, info->pQuery, true, NULL); info->affectedRows = taos_affected_rows(info->pRequest); return info->pRequest->code; @@ -2340,7 +2340,7 @@ cleanup: */ TAOS_RES* taos_schemaless_insert(TAOS* taos, char* lines[], int numLines, int protocol, int precision) { - SRequestObj* request = (SRequestObj*)createRequest((STscObj *)taos, NULL, NULL, TSDB_SQL_INSERT); + SRequestObj* request = (SRequestObj*)createRequest((STscObj *)taos, NULL, TSDB_SQL_INSERT); if(!request){ uError("SML:taos_schemaless_insert error request is null"); return NULL; diff --git a/source/client/src/clientStmt.c b/source/client/src/clientStmt.c index 01d785ef73..d3fabe71fe 100644 --- a/source/client/src/clientStmt.c +++ b/source/client/src/clientStmt.c @@ -773,11 +773,10 @@ int stmtExec(TAOS_STMT* stmt) { STMT_ERR_RET(stmtSwitchStatus(pStmt, STMT_EXECUTE)); if (STMT_TYPE_QUERY == pStmt->sql.type) { - launchQueryImpl(pStmt->exec.pRequest, pStmt->sql.pQuery, TSDB_CODE_SUCCESS, true, NULL); + launchQueryImpl(pStmt->exec.pRequest, pStmt->sql.pQuery, true, NULL); } else { STMT_ERR_RET(qBuildStmtOutput(pStmt->sql.pQuery, pStmt->exec.pVgHash, pStmt->exec.pBlockHash)); - launchQueryImpl(pStmt->exec.pRequest, pStmt->sql.pQuery, TSDB_CODE_SUCCESS, true, - (autoCreateTbl ? (void**)&pRsp : NULL)); + launchQueryImpl(pStmt->exec.pRequest, pStmt->sql.pQuery, true, (autoCreateTbl ? (void**)&pRsp : NULL)); } if (pStmt->exec.pRequest->code && NEED_CLIENT_HANDLE_ERROR(pStmt->exec.pRequest->code)) { diff --git a/source/client/test/clientTests.cpp b/source/client/test/clientTests.cpp index 914e5aefc2..1dbc1bcfb3 100644 --- a/source/client/test/clientTests.cpp +++ b/source/client/test/clientTests.cpp @@ -13,10 +13,12 @@ * along with this program. If not, see . */ -#include -#include -#include #include +#include +#include "taoserror.h" +#include "tglobal.h" +#include "thash.h" +#include "clientInt.h" #pragma GCC diagnostic push #pragma GCC diagnostic ignored "-Wwrite-strings" @@ -24,7 +26,6 @@ #pragma GCC diagnostic ignored "-Wunused-variable" #pragma GCC diagnostic ignored "-Wsign-compare" -#include "../inc/clientInt.h" #include "taos.h" namespace { @@ -41,6 +42,47 @@ void showDB(TAOS* pConn) { printf("%s\n", str); } } + +void fetchCallback(void* param, void* res, int32_t numOfRow) { + printf("numOfRow = %d \n", numOfRow); + int numFields = taos_num_fields(res); + TAOS_FIELD *fields = taos_fetch_fields(res); + TAOS *_taos = (TAOS *)param; + if (numOfRow > 0) { + for (int i = 0; i < numOfRow; ++i) { + TAOS_ROW row = taos_fetch_row(res); + + char temp[256] = {0}; + taos_print_row(temp, row, fields, numFields); + printf("%s\n", temp); + } + taos_fetch_rows_a(res, fetchCallback, _taos); + } else { + printf("no more data, close the connection.\n"); +// taos_free_result(res); +// taos_close(_taos); +// taos_cleanup(); + } +} + +void queryCallback(void* param, void* res, int32_t code) { + if (code != TSDB_CODE_SUCCESS) { + printf("failed to execute, reason:%s\n", taos_errstr(res)); + } + printf("start to fetch data\n"); + taos_fetch_rows_a(res, fetchCallback, param); +} + +void queryCallback1(void* param, void* res, int32_t code) { + if (code != TSDB_CODE_SUCCESS) { + printf("failed to execute, reason:%s\n", taos_errstr(res)); + } + + taos_free_result(res); + + printf("exec query:\n"); + taos_query_a(param, "select * from tm1", queryCallback, param); +} } // namespace int main(int argc, char** argv) { @@ -480,9 +522,7 @@ TEST(testCase, show_table_Test) { TAOS_RES* pRes = taos_query(pConn, "show tables"); if (taos_errno(pRes) != 0) { printf("failed to show tables, reason:%s\n", taos_errstr(pRes)); - taos_free_result(pRes); } - taos_free_result(pRes); taos_query(pConn, "use abc1"); @@ -567,7 +607,6 @@ TEST(testCase, insert_test) { taos_free_result(pRes); taos_close(pConn); } -#endif TEST(testCase, projection_query_tables) { TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); @@ -664,7 +703,8 @@ TEST(testCase, projection_query_tables) { // taos_free_result(pRes); taos_close(pConn); } -#if 0 +#endif + TEST(testCase, projection_query_stables) { TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); ASSERT_NE(pConn, nullptr); @@ -693,6 +733,7 @@ TEST(testCase, projection_query_stables) { taos_close(pConn); } +#if 0 TEST(testCase, agg_query_tables) { TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); ASSERT_NE(pConn, nullptr); @@ -705,7 +746,7 @@ TEST(testCase, agg_query_tables) { } taos_free_result(pRes); - pRes = taos_query(pConn, "explain analyze select count(*) from tu interval(1s)"); + pRes = taos_query(pConn, "show stables"); if (taos_errno(pRes) != 0) { printf("failed to select from table, reason:%s\n", taos_errstr(pRes)); taos_free_result(pRes); @@ -733,6 +774,41 @@ TEST(testCase, agg_query_tables) { taos_free_result(pRes); taos_close(pConn); } + +TEST(testCase, async_api_test) { + TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); + ASSERT_NE(pConn, nullptr); + + taos_query_a(pConn, "create table abc1.txx(ts timestamp, k int)", queryCallback1, pConn); + getchar(); + +// if (taos_errno(pRes) != 0) { +// printf("failed to select from table, reason:%s\n", taos_errstr(pRes)); +// taos_free_result(pRes); +// ASSERT_TRUE(false); +// } +// +// TAOS_ROW pRow = NULL; +// TAOS_FIELD* pFields = taos_fetch_fields(pRes); +// int32_t numOfFields = taos_num_fields(pRes); +// +// int32_t n = 0; +// char str[512] = {0}; +// while ((pRow = taos_fetch_row(pRes)) != NULL) { +// int32_t* length = taos_fetch_lengths(pRes); +// for(int32_t i = 0; i < numOfFields; ++i) { +// printf("(%d):%d " , i, length[i]); +// } +// printf("\n"); +// +// int32_t code = taos_print_row(str, pRow, pFields, numOfFields); +// printf("%s\n", str); +// memset(str, 0, sizeof(str)); +// } +// +// taos_free_result(pRes); + taos_close(pConn); +} #endif #pragma GCC diagnostic pop diff --git a/source/client/test/smlTest.cpp b/source/client/test/smlTest.cpp index 217699e360..444fe24795 100644 --- a/source/client/test/smlTest.cpp +++ b/source/client/test/smlTest.cpp @@ -486,7 +486,7 @@ TEST(testCase, smlProcess_influx_Test) { pRes = taos_query(taos, "use inflx_db"); taos_free_result(pRes); - SRequestObj *request = (SRequestObj *)createRequest((STscObj*)taos, NULL, NULL, TSDB_SQL_INSERT); + SRequestObj *request = (SRequestObj *)createRequest((STscObj*)taos, NULL, TSDB_SQL_INSERT); ASSERT_NE(request, nullptr); SSmlHandle *info = smlBuildSmlInfo(taos, request, TSDB_SML_LINE_PROTOCOL, TSDB_SML_TIMESTAMP_NANO_SECONDS); @@ -607,7 +607,7 @@ TEST(testCase, smlParseLine_error_Test) { pRes = taos_query(taos, "use sml_db"); taos_free_result(pRes); - SRequestObj *request = (SRequestObj *)createRequest((STscObj*)taos, NULL, NULL, TSDB_SQL_INSERT); + SRequestObj *request = (SRequestObj *)createRequest((STscObj*)taos, NULL, TSDB_SQL_INSERT); ASSERT_NE(request, nullptr); SSmlHandle *info = smlBuildSmlInfo(taos, request, TSDB_SML_LINE_PROTOCOL, TSDB_SML_TIMESTAMP_NANO_SECONDS); @@ -656,7 +656,7 @@ TEST(testCase, smlProcess_telnet_Test) { pRes = taos_query(taos, "use telnet_db"); taos_free_result(pRes); - SRequestObj *request = (SRequestObj *)createRequest((STscObj*)taos, NULL, NULL, TSDB_SQL_INSERT); + SRequestObj *request = (SRequestObj *)createRequest((STscObj*)taos, NULL, TSDB_SQL_INSERT); ASSERT_NE(request, nullptr); SSmlHandle *info = smlBuildSmlInfo(taos, request, TSDB_SML_TELNET_PROTOCOL, TSDB_SML_TIMESTAMP_NANO_SECONDS); @@ -710,7 +710,7 @@ TEST(testCase, smlProcess_json1_Test) { pRes = taos_query(taos, "use json_db"); taos_free_result(pRes); - SRequestObj *request = (SRequestObj *)createRequest((STscObj *)taos, NULL, NULL, TSDB_SQL_INSERT); + SRequestObj *request = (SRequestObj *)createRequest((STscObj *)taos, NULL, TSDB_SQL_INSERT); ASSERT_NE(request, nullptr); SSmlHandle *info = smlBuildSmlInfo(taos, request, TSDB_SML_JSON_PROTOCOL, TSDB_SML_TIMESTAMP_NANO_SECONDS); @@ -779,7 +779,7 @@ TEST(testCase, smlProcess_json2_Test) { pRes = taos_query(taos, "use sml_db"); taos_free_result(pRes); - SRequestObj *request = (SRequestObj *)createRequest((STscObj *)taos, NULL, NULL, TSDB_SQL_INSERT); + SRequestObj *request = (SRequestObj *)createRequest((STscObj *)taos, NULL, TSDB_SQL_INSERT); ASSERT_NE(request, nullptr); SSmlHandle *info = smlBuildSmlInfo(taos, request, TSDB_SML_JSON_PROTOCOL, TSDB_SML_TIMESTAMP_NANO_SECONDS); @@ -823,7 +823,7 @@ TEST(testCase, smlProcess_json3_Test) { pRes = taos_query(taos, "use sml_db"); taos_free_result(pRes); - SRequestObj *request = (SRequestObj *)createRequest((STscObj *)taos, NULL, NULL, TSDB_SQL_INSERT); + SRequestObj *request = (SRequestObj *)createRequest((STscObj *)taos, NULL, TSDB_SQL_INSERT); ASSERT_NE(request, nullptr); SSmlHandle *info = smlBuildSmlInfo(taos, request, TSDB_SML_JSON_PROTOCOL, TSDB_SML_TIMESTAMP_NANO_SECONDS); @@ -895,7 +895,7 @@ TEST(testCase, smlProcess_json4_Test) { pRes = taos_query(taos, "use sml_db"); taos_free_result(pRes); - SRequestObj *request = (SRequestObj *)createRequest((STscObj*)taos, NULL, NULL, TSDB_SQL_INSERT); + SRequestObj *request = (SRequestObj *)createRequest((STscObj*)taos, NULL, TSDB_SQL_INSERT); ASSERT_NE(request, nullptr); SSmlHandle *info = smlBuildSmlInfo(taos, request, TSDB_SML_JSON_PROTOCOL, TSDB_SML_TIMESTAMP_NANO_SECONDS); @@ -957,7 +957,7 @@ TEST(testCase, smlParseTelnetLine_error_Test) { pRes = taos_query(taos, "use sml_db"); taos_free_result(pRes); - SRequestObj *request = (SRequestObj *)createRequest((STscObj*)taos, NULL, NULL, TSDB_SQL_INSERT); + SRequestObj *request = (SRequestObj *)createRequest((STscObj*)taos, NULL, TSDB_SQL_INSERT); ASSERT_NE(request, nullptr); SSmlHandle *info = smlBuildSmlInfo(taos, request, TSDB_SML_TELNET_PROTOCOL, TSDB_SML_TIMESTAMP_NANO_SECONDS); @@ -1006,7 +1006,7 @@ TEST(testCase, smlParseTelnetLine_diff_type_Test) { pRes = taos_query(taos, "use sml_db"); taos_free_result(pRes); - SRequestObj *request = (SRequestObj *)createRequest((STscObj*)taos, NULL, NULL, TSDB_SQL_INSERT); + SRequestObj *request = (SRequestObj *)createRequest((STscObj*)taos, NULL, TSDB_SQL_INSERT); ASSERT_NE(request, nullptr); SSmlHandle *info = smlBuildSmlInfo(taos, request, TSDB_SML_TELNET_PROTOCOL, TSDB_SML_TIMESTAMP_NANO_SECONDS); @@ -1033,7 +1033,7 @@ TEST(testCase, smlParseTelnetLine_json_error_Test) { pRes = taos_query(taos, "use sml_db"); taos_free_result(pRes); - SRequestObj *request = (SRequestObj *)createRequest((STscObj*)taos, NULL, NULL, TSDB_SQL_INSERT); + SRequestObj *request = (SRequestObj *)createRequest((STscObj*)taos, NULL, TSDB_SQL_INSERT); ASSERT_NE(request, nullptr); SSmlHandle *info = smlBuildSmlInfo(taos, request, TSDB_SML_TELNET_PROTOCOL, TSDB_SML_TIMESTAMP_NANO_SECONDS); @@ -1101,7 +1101,7 @@ TEST(testCase, smlParseTelnetLine_diff_json_type1_Test) { pRes = taos_query(taos, "use sml_db"); taos_free_result(pRes); - SRequestObj *request = (SRequestObj *)createRequest((STscObj*)taos, NULL, NULL, TSDB_SQL_INSERT); + SRequestObj *request = (SRequestObj *)createRequest((STscObj*)taos, NULL, TSDB_SQL_INSERT); ASSERT_NE(request, nullptr); SSmlHandle *info = smlBuildSmlInfo(taos, request, TSDB_SML_TELNET_PROTOCOL, TSDB_SML_TIMESTAMP_NANO_SECONDS); @@ -1146,7 +1146,7 @@ TEST(testCase, smlParseTelnetLine_diff_json_type2_Test) { pRes = taos_query(taos, "use sml_db"); taos_free_result(pRes); - SRequestObj *request = (SRequestObj *)createRequest((STscObj*)taos, NULL, NULL, TSDB_SQL_INSERT); + SRequestObj *request = (SRequestObj *)createRequest((STscObj*)taos, NULL, TSDB_SQL_INSERT); ASSERT_NE(request, nullptr); SSmlHandle *info = smlBuildSmlInfo(taos, request, TSDB_SML_TELNET_PROTOCOL, TSDB_SML_TIMESTAMP_NANO_SECONDS); @@ -1191,7 +1191,7 @@ TEST(testCase, sml_TD15662_Test) { pRes = taos_query(taos, "use db_15662"); taos_free_result(pRes); - SRequestObj *request = (SRequestObj *)createRequest((STscObj *)taos, NULL, NULL, TSDB_SQL_INSERT); + SRequestObj *request = (SRequestObj *)createRequest((STscObj *)taos, NULL, TSDB_SQL_INSERT); ASSERT_NE(request, nullptr); SSmlHandle *info = smlBuildSmlInfo(taos, request, TSDB_SML_LINE_PROTOCOL, TSDB_SML_TIMESTAMP_MILLI_SECONDS); @@ -1218,7 +1218,7 @@ TEST(testCase, sml_TD15735_Test) { pRes = taos_query(taos, "use sml_db"); taos_free_result(pRes); - SRequestObj *request = (SRequestObj *)createRequest((STscObj*)taos, NULL, NULL, TSDB_SQL_INSERT); + SRequestObj *request = (SRequestObj *)createRequest((STscObj*)taos, NULL, TSDB_SQL_INSERT); ASSERT_NE(request, nullptr); SSmlHandle *info = smlBuildSmlInfo(taos, request, TSDB_SML_TELNET_PROTOCOL, TSDB_SML_TIMESTAMP_NANO_SECONDS); @@ -1244,7 +1244,7 @@ TEST(testCase, sml_TD15742_Test) { pRes = taos_query(taos, "use TD15742"); taos_free_result(pRes); - SRequestObj *request = (SRequestObj *)createRequest((STscObj*)taos, NULL, NULL, TSDB_SQL_INSERT); + SRequestObj *request = (SRequestObj *)createRequest((STscObj*)taos, NULL, TSDB_SQL_INSERT); ASSERT_NE(request, nullptr); SSmlHandle *info = smlBuildSmlInfo(taos, request, TSDB_SML_LINE_PROTOCOL, TSDB_SML_TIMESTAMP_MILLI_SECONDS); diff --git a/source/common/src/systable.c b/source/common/src/systable.c index 38a6bafe9a..d8d7479a93 100644 --- a/source/common/src/systable.c +++ b/source/common/src/systable.c @@ -77,7 +77,7 @@ static const SSysDbTableSchema userDBSchema[] = { {.name = "replica", .bytes = 1, .type = TSDB_DATA_TYPE_TINYINT}, {.name = "strict", .bytes = 9 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR}, {.name = "duration", .bytes = 4, .type = TSDB_DATA_TYPE_INT}, - {.name = "keep", .bytes = 24 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR}, + {.name = "keep", .bytes = 32 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR}, {.name = "buffer", .bytes = 4, .type = TSDB_DATA_TYPE_INT}, {.name = "pagesize", .bytes = 4, .type = TSDB_DATA_TYPE_INT}, {.name = "pages", .bytes = 4, .type = TSDB_DATA_TYPE_INT}, diff --git a/source/libs/parser/src/parTranslater.c b/source/libs/parser/src/parTranslater.c index c6a7f95d5e..4b7fdadc18 100644 --- a/source/libs/parser/src/parTranslater.c +++ b/source/libs/parser/src/parTranslater.c @@ -2839,7 +2839,6 @@ static int32_t buildRollupAst(STranslateContext* pCxt, SCreateTableStmt* pStmt, int32_t code = getDBCfg(pCxt, pStmt->dbName, &dbCfg); int32_t num = taosArrayGetSize(dbCfg.pRetensions); if (TSDB_CODE_SUCCESS != code || num < 2) { - taosArrayDestroy(dbCfg.pRetensions); return code; } for (int32_t i = 1; i < num; ++i) { @@ -4942,9 +4941,6 @@ int32_t translate(SParseContext* pParseCxt, SQuery* pQuery) { STranslateContext cxt = {0}; int32_t code = initTranslateContext(pParseCxt, pQuery->pMetaCache, &cxt); - if (TSDB_CODE_SUCCESS == code) { - code = fmFuncMgtInit(); - } if (TSDB_CODE_SUCCESS == code) { code = rewriteQuery(&cxt, pQuery); } diff --git a/source/libs/scheduler/src/schDbg.c b/source/libs/scheduler/src/schDbg.c index 4b5f74114d..5c0c6fbb76 100644 --- a/source/libs/scheduler/src/schDbg.c +++ b/source/libs/scheduler/src/schDbg.c @@ -30,13 +30,4 @@ void schdExecCallback(SQueryResult* pResult, void* param, int32_t code) { tsem_post(&schdRspSem); } -void schdFetchCallback(void* pResult, void* param, int32_t code) { - SSchdFetchParam* fParam = (SSchdFetchParam*)param; - - *fParam->pData = pResult; - *fParam->code = code; - - tsem_post(&schdRspSem); -} - diff --git a/source/libs/scheduler/src/scheduler.c b/source/libs/scheduler/src/scheduler.c index 3ecc4f4a30..e731b8e416 100644 --- a/source/libs/scheduler/src/scheduler.c +++ b/source/libs/scheduler/src/scheduler.c @@ -79,12 +79,19 @@ int32_t schedulerExecJob(void *pTrans, SArray *pNodeList, SQueryPlan *pDag, int6 int32_t schedulerAsyncExecJob(void *pTrans, SArray *pNodeList, SQueryPlan *pDag, int64_t *pJob, const char *sql, int64_t startTs, schedulerExecCallback fp, void* param) { - if (NULL == pTrans || NULL == pDag || NULL == pDag->pSubplans || NULL == pJob || NULL == fp || NULL == param) { - SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT); - } - - SSchResInfo resInfo = {.execFp = fp, .userParam = param}; - SCH_RET(schAsyncExecJob(pTrans, pNodeList, pDag, pJob, sql, startTs, &resInfo)); + int32_t code = 0; + if (NULL == pTrans || NULL == pDag || NULL == pDag->pSubplans || NULL == pJob || NULL == fp) { + code = TSDB_CODE_QRY_INVALID_INPUT; + } else { + SSchResInfo resInfo = {.execFp = fp, .userParam = param}; + code = schAsyncExecJob(pTrans, pNodeList, pDag, pJob, sql, startTs, &resInfo); + } + + if (code != TSDB_CODE_SUCCESS) { + fp(NULL, param, code); + } + + return code; } int32_t schedulerFetchRows(int64_t job, void **pData) { diff --git a/tests/script/tsim/testsuit.sim b/tests/script/tsim/testsuit.sim index 0b1f0df04e..64fe198dce 100644 --- a/tests/script/tsim/testsuit.sim +++ b/tests/script/tsim/testsuit.sim @@ -1,80 +1,103 @@ -#run user/pass_alter.sim -#run user/basic1.sim -#run user/privilege2.sim -#run user/user_len.sim -#run user/privilege1.sim -#run user/pass_len.sim -#run tstream/basic1.sim -#run tstream/basic0.sim -#run table/basic1.sim -#run trans/create_db.sim -#run stable/alter1.sim -#run stable/vnode3.sim -#run stable/metrics.sim -#run stable/show.sim -#run stable/values.sim -#run stable/dnode3.sim -#run stable/refcount.sim -#run stable/disk.sim -#run db/basic1.sim -#run db/basic3.sim -#run db/basic7.sim -#run db/basic6.sim -#run db/create_all_options.sim -#run db/basic2.sim -#run db/error1.sim -#run db/taosdlog.sim -#run db/alter_option.sim -#run mnode/basic1.sim -#run parser/fourArithmetic-basic.sim -#run parser/groupby-basic.sim -#run snode/basic1.sim -#run query/time_process.sim -#run query/stddev.sim -#run query/interval-offset.sim -#run query/charScalarFunction.sim -#run query/complex_select.sim -#run query/explain.sim -#run query/crash_sql.sim -#run query/diff.sim -#run query/complex_limit.sim -#run query/complex_having.sim -#run query/udf.sim -#run query/complex_group.sim -#run query/interval.sim -#run query/session.sim - -print ========> dead lock failed when 2 rows in outputCapacity -run query/scalarFunction.sim -run query/scalarNull.sim -run query/complex_where.sim -run tmq/basic1.sim -run tmq/basic4.sim -run tmq/basic1Of2Cons.sim -run tmq/prepareBasicEnv-1vgrp.sim -run tmq/topic.sim -run tmq/basic4Of2Cons.sim -run tmq/prepareBasicEnv-4vgrp.sim -run tmq/basic3.sim -run tmq/basic2Of2Cons.sim -run tmq/basic2.sim -run tmq/basic3Of2Cons.sim -run tmq/basic2Of2ConsOverlap.sim -run tmq/clearConsume.sim -run qnode/basic1.sim -run dnode/basic1.sim -run show/basic.sim -run insert/basic1.sim -run insert/basic0.sim -run insert/backquote.sim -run insert/null.sim -run sync/oneReplica1VgElectWithInsert.sim -run sync/threeReplica1VgElect.sim -run sync/oneReplica1VgElect.sim -run sync/insertDataByRunBack.sim -run sync/threeReplica1VgElectWihtInsert.sim -run sma/tsmaCreateInsertData.sim -run sma/rsmaCreateInsertQuery.sim -run valgrind/checkError.sim -run bnode/basic1.sim - +#run tsim/user/pass_alter.sim +#run tsim/user/basic1.sim +#run tsim/user/privilege2.sim +#run tsim/user/user_len.sim +run tsim/user/privilege1.sim +run tsim/user/pass_len.sim +run tsim/table/basic1.sim +run tsim/trans/lossdata1.sim +run tsim/trans/create_db.sim +run tsim/stable/alter_metrics.sim +run tsim/stable/tag_modify.sim +run tsim/stable/alter_comment.sim +run tsim/stable/column_drop.sim +run tsim/stable/column_modify.sim +run tsim/stable/tag_rename.sim +run tsim/stable/vnode3.sim +run tsim/stable/metrics.sim +run tsim/stable/alter_insert2.sim +run tsim/stable/show.sim +run tsim/stable/alter_import.sim +run tsim/stable/tag_add.sim +run tsim/stable/tag_drop.sim +run tsim/stable/column_add.sim +run tsim/stable/alter_count.sim +run tsim/stable/values.sim +run tsim/stable/dnode3.sim +run tsim/stable/alter_insert1.sim +run tsim/stable/refcount.sim +run tsim/stable/disk.sim +run tsim/db/basic1.sim +run tsim/db/basic3.sim +run tsim/db/basic7.sim +run tsim/db/basic6.sim +run tsim/db/create_all_options.sim +run tsim/db/basic2.sim +run tsim/db/error1.sim +run tsim/db/taosdlog.sim +run tsim/db/alter_option.sim +run tsim/mnode/basic1.sim +run tsim/mnode/basic3.sim +run tsim/mnode/basic2.sim +run tsim/parser/fourArithmetic-basic.sim +run tsim/parser/groupby-basic.sim +run tsim/snode/basic1.sim +run tsim/query/time_process.sim +run tsim/query/stddev.sim +run tsim/query/interval-offset.sim +run tsim/query/charScalarFunction.sim +run tsim/query/complex_select.sim +run tsim/query/explain.sim +run tsim/query/crash_sql.sim +run tsim/query/diff.sim +run tsim/query/complex_limit.sim +run tsim/query/complex_having.sim +run tsim/query/udf.sim +run tsim/query/complex_group.sim +run tsim/query/interval.sim +run tsim/query/session.sim +run tsim/query/scalarFunction.sim +run tsim/query/scalarNull.sim +run tsim/query/complex_where.sim +run tsim/tmq/basic1.sim +run tsim/tmq/basic4.sim +run tsim/tmq/basic1Of2Cons.sim +run tsim/tmq/prepareBasicEnv-1vgrp.sim +run tsim/tmq/topic.sim +run tsim/tmq/basic4Of2Cons.sim +run tsim/tmq/prepareBasicEnv-4vgrp.sim +run tsim/tmq/basic3.sim +run tsim/tmq/basic2Of2Cons.sim +run tsim/tmq/basic2.sim +run tsim/tmq/basic3Of2Cons.sim +run tsim/tmq/basic2Of2ConsOverlap.sim +run tsim/tmq/clearConsume.sim +run tsim/qnode/basic1.sim +run tsim/dnode/basic1.sim +run tsim/show/basic.sim +run tsim/stream/basic1.sim +run tsim/stream/triggerInterval0.sim +run tsim/stream/triggerSession0.sim +run tsim/stream/basic0.sim +run tsim/stream/session0.sim +run tsim/stream/session1.sim +run tsim/stream/basic2.sim +run tsim/insert/basic1.sim +run tsim/insert/commit-merge0.sim +run tsim/insert/basic0.sim +run tsim/insert/update0.sim +run tsim/insert/backquote.sim +run tsim/insert/null.sim +run tsim/sync/oneReplica1VgElectWithInsert.sim +run tsim/sync/threeReplica1VgElect.sim +run tsim/sync/oneReplica1VgElect.sim +run tsim/sync/3Replica5VgElect.sim +run tsim/sync/insertDataByRunBack.sim +run tsim/sync/oneReplica5VgElect.sim +run tsim/sync/3Replica1VgElect.sim +run tsim/sync/threeReplica1VgElectWihtInsert.sim +run tsim/sma/tsmaCreateInsertData.sim +run tsim/sma/rsmaCreateInsertQuery.sim +run tsim/valgrind/basic.sim +run tsim/valgrind/checkError.sim +run tsim/bnode/basic1.sim \ No newline at end of file