From 170182fecd1b65810edca8752fbc10453604482f Mon Sep 17 00:00:00 2001 From: dapan1121 Date: Mon, 12 Jun 2023 16:10:54 +0800 Subject: [PATCH 1/5] feat: support fill history with sub request --- include/common/tmsg.h | 1 + include/libs/catalog/catalog.h | 1 + include/libs/function/functionMgt.h | 1 + include/libs/nodes/cmdnodes.h | 22 +- include/libs/nodes/plannodes.h | 1 + include/libs/nodes/querynodes.h | 2 + include/libs/parser/parser.h | 1 + include/libs/planner/planner.h | 1 + source/client/inc/clientInt.h | 16 ++ source/client/src/clientEnv.c | 42 ++++ source/client/src/clientImpl.c | 193 ++++++++++++++++- source/client/src/clientMain.c | 282 ++++++++++++++++--------- source/common/src/tmsg.c | 5 + source/libs/function/src/functionMgt.c | 2 +- source/libs/nodes/src/nodesUtilFuncs.c | 3 + source/libs/parser/inc/parInt.h | 1 + source/libs/parser/src/parAstParser.c | 4 + source/libs/parser/src/parTranslater.c | 176 +++++++++++++-- source/libs/parser/src/parser.c | 15 +- source/libs/planner/src/planner.c | 6 + 20 files changed, 636 insertions(+), 139 deletions(-) diff --git a/include/common/tmsg.h b/include/common/tmsg.h index d78e771fcf..86308984ef 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -1975,6 +1975,7 @@ typedef struct { SArray* fillNullCols; // array of SColLocation int64_t deleteMark; int8_t igUpdate; + int64_t lastTs; } SCMCreateStreamReq; typedef struct { diff --git a/include/libs/catalog/catalog.h b/include/libs/catalog/catalog.h index 9300deeb9a..7a7a13b285 100644 --- a/include/libs/catalog/catalog.h +++ b/include/libs/catalog/catalog.h @@ -87,6 +87,7 @@ typedef struct SCatalogReq { bool dNodeRequired; // valid dnode bool svrVerRequired; bool forceUpdate; + bool cloned; } SCatalogReq; typedef struct SMetaRes { diff --git a/include/libs/function/functionMgt.h b/include/libs/function/functionMgt.h index f0c9cffd0f..55af50e0bc 100644 --- a/include/libs/function/functionMgt.h +++ b/include/libs/function/functionMgt.h @@ -233,6 +233,7 @@ bool fmIsGroupKeyFunc(int32_t funcId); bool fmIsBlockDistFunc(int32_t funcId); void getLastCacheDataType(SDataType* pType); +SFunctionNode* createFunction(const char* pName, SNodeList* pParameterList); int32_t fmGetDistMethod(const SFunctionNode* pFunc, SFunctionNode** pPartialFunc, SFunctionNode** pMergeFunc); diff --git a/include/libs/nodes/cmdnodes.h b/include/libs/nodes/cmdnodes.h index c8ce9634f5..3a36601b11 100644 --- a/include/libs/nodes/cmdnodes.h +++ b/include/libs/nodes/cmdnodes.h @@ -425,16 +425,18 @@ typedef struct SStreamOptions { } SStreamOptions; typedef struct SCreateStreamStmt { - ENodeType type; - char streamName[TSDB_TABLE_NAME_LEN]; - char targetDbName[TSDB_DB_NAME_LEN]; - char targetTabName[TSDB_TABLE_NAME_LEN]; - bool ignoreExists; - SStreamOptions* pOptions; - SNode* pQuery; - SNodeList* pTags; - SNode* pSubtable; - SNodeList* pCols; + ENodeType type; + char streamName[TSDB_TABLE_NAME_LEN]; + char targetDbName[TSDB_DB_NAME_LEN]; + char targetTabName[TSDB_TABLE_NAME_LEN]; + bool ignoreExists; + SStreamOptions* pOptions; + SNode* pQuery; + SNode* pPrevQuery; + SNodeList* pTags; + SNode* pSubtable; + SNodeList* pCols; + SCMCreateStreamReq* pReq; } SCreateStreamStmt; typedef struct SDropStreamStmt { diff --git a/include/libs/nodes/plannodes.h b/include/libs/nodes/plannodes.h index 02459ed951..f44b622cc0 100644 --- a/include/libs/nodes/plannodes.h +++ b/include/libs/nodes/plannodes.h @@ -617,6 +617,7 @@ typedef struct SQueryPlan { int32_t numOfSubplans; SNodeList* pSubplans; // Element is SNodeListNode. The execution level of subplan, starting from 0. SExplainInfo explainInfo; + void* pPostPlan; } SQueryPlan; const char* dataOrderStr(EDataOrderLevel order); diff --git a/include/libs/nodes/querynodes.h b/include/libs/nodes/querynodes.h index 12890571f9..f570698395 100644 --- a/include/libs/nodes/querynodes.h +++ b/include/libs/nodes/querynodes.h @@ -441,7 +441,9 @@ typedef struct SQuery { EQueryExecStage execStage; EQueryExecMode execMode; bool haveResultSet; + SNode* pPrevRoot; SNode* pRoot; + SNode* pPostRoot; int32_t numOfResCols; SSchema* pResSchema; int8_t precision; diff --git a/include/libs/parser/parser.h b/include/libs/parser/parser.h index 94fb6824d2..f253b47e50 100644 --- a/include/libs/parser/parser.h +++ b/include/libs/parser/parser.h @@ -74,6 +74,7 @@ int32_t qAnalyseSqlSemantic(SParseContext* pCxt, const struct SCatalogReq* pCata const struct SMetaData* pMetaData, SQuery* pQuery); int32_t qContinueParseSql(SParseContext* pCxt, struct SCatalogReq* pCatalogReq, const struct SMetaData* pMetaData, SQuery* pQuery); +int32_t qContinueParsePostQuery(SParseContext* pCxt, SQuery* pQuery, void** pResRow); void qDestroyParseContext(SParseContext* pCxt); diff --git a/include/libs/planner/planner.h b/include/libs/planner/planner.h index 41c0e98084..1b523c0323 100644 --- a/include/libs/planner/planner.h +++ b/include/libs/planner/planner.h @@ -52,6 +52,7 @@ int32_t qCreateQueryPlan(SPlanContext* pCxt, SQueryPlan** pPlan, SArray* pExecNo // @groupId id of a group of datasource subplans of this @pSubplan // @pSource one execution location of this group of datasource subplans int32_t qSetSubplanExecutionNode(SSubplan* pSubplan, int32_t groupId, SDownstreamSourceNode* pSource); +int32_t qContinuePlanPostQuery(void *pPostPlan); void qClearSubplanExecutionNode(SSubplan* pSubplan); diff --git a/source/client/inc/clientInt.h b/source/client/inc/clientInt.h index 18891bb932..afa19c674e 100644 --- a/source/client/inc/clientInt.h +++ b/source/client/inc/clientInt.h @@ -227,6 +227,12 @@ typedef struct { STaosxRsp rsp; } SMqTaosxRspObj; +typedef struct SReqRelInfo { + uint64_t userRefId; + uint64_t prevRefId; + uint64_t nextRefId; +} SReqRelInfo; + typedef struct SRequestObj { int8_t resType; // query or tmq uint64_t requestId; @@ -254,6 +260,9 @@ typedef struct SRequestObj { uint32_t retry; int64_t allocatorRefId; SQuery* pQuery; + void* pPostPlan; + SReqRelInfo relation; + void* pWrapper; } SRequestObj; typedef struct SSyncQueryParam { @@ -279,6 +288,7 @@ TAOS_RES* taosQueryImplWithReqid(TAOS* taos, const char* sql, bool validateOnly, void taosAsyncQueryImpl(uint64_t connId, const char* sql, __taos_async_fn_t fp, void* param, bool validateOnly); void taosAsyncQueryImplWithReqid(uint64_t connId, const char* sql, __taos_async_fn_t fp, void* param, bool validateOnly, int64_t reqid); +void taosAsyncFetchImpl(SRequestObj *pRequest, __taos_async_fn_t fp, void *param); int32_t getVersion1BlockMetaSize(const char* p, int32_t numOfCols); @@ -368,6 +378,7 @@ typedef struct SSqlCallbackWrapper { SParseContext* pParseCtx; SCatalogReq* pCatalogReq; SRequestObj* pRequest; + void* pPlanInfo; } SSqlCallbackWrapper; SRequestObj* launchQueryImpl(SRequestObj* pRequest, SQuery* pQuery, bool keepQuery, void** res); @@ -382,6 +393,11 @@ int32_t handleCreateTbExecRes(void* res, SCatalog* pCatalog); bool qnodeRequired(SRequestObj* pRequest); void continueInsertFromCsv(SSqlCallbackWrapper* pWrapper, SRequestObj* pRequest); void destorySqlCallbackWrapper(SSqlCallbackWrapper* pWrapper); +void handleQueryAnslyseRes(SSqlCallbackWrapper *pWrapper, SMetaData *pResultMeta, int32_t code); +void restartAsyncQuery(SRequestObj *pRequest, int32_t code); +int32_t buildPreviousRequest(SRequestObj *pRequest, const char* sql, SRequestObj** pNewRequest); +int32_t prepareAndParseSqlSyntax(SSqlCallbackWrapper **ppWrapper, SRequestObj *pRequest, bool updateMetaForce); +void returnToUser(SRequestObj* pRequest); #ifdef __cplusplus } diff --git a/source/client/src/clientEnv.c b/source/client/src/clientEnv.c index 045642c2c2..4f883ca1f4 100644 --- a/source/client/src/clientEnv.c +++ b/source/client/src/clientEnv.c @@ -358,6 +358,44 @@ int32_t releaseRequest(int64_t rid) { return taosReleaseRef(clientReqRefPool, ri int32_t removeRequest(int64_t rid) { return taosRemoveRef(clientReqRefPool, rid); } + +void destroySubRequests(SRequestObj *pRequest) { + int32_t reqIdx = -1; + SRequestObj *pReqList[16] = {NULL}; + uint64_t tmpRefId = 0; + SRequestObj* pTmp = pRequest; + while (pTmp->relation.prevRefId) { + tmpRefId = pTmp->relation.prevRefId; + pTmp = acquireRequest(tmpRefId); + if (pTmp) { + pReqList[++reqIdx] = pTmp; + releaseRequest(tmpRefId); + } else { + tscError("0x%" PRIx64 ", prev req ref 0x%" PRIx64 " is not there, reqId:0x%" PRIx64, pTmp->self, + tmpRefId, pTmp->requestId); + break; + } + } + + for (int32_t i = reqIdx; i >= 0; i--) { + removeRequest(pReqList[i]->self); + } + + tmpRefId = pRequest->relation.nextRefId; + while (tmpRefId) { + pTmp = acquireRequest(tmpRefId); + if (pTmp) { + tmpRefId = pTmp->relation.nextRefId; + removeRequest(pTmp->self); + releaseRequest(pTmp->self); + } else { + tscError("0x%" PRIx64 " is not there", tmpRefId); + break; + } + } +} + + void doDestroyRequest(void *p) { if (NULL == p) { return; @@ -368,10 +406,14 @@ void doDestroyRequest(void *p) { uint64_t reqId = pRequest->requestId; tscTrace("begin to destroy request %" PRIx64 " p:%p", reqId, pRequest); + destroySubRequests(pRequest); + taosHashRemove(pRequest->pTscObj->pRequests, &pRequest->self, sizeof(pRequest->self)); schedulerFreeJob(&pRequest->body.queryJob, 0); + destorySqlCallbackWrapper(pRequest->pWrapper); + taosMemoryFreeClear(pRequest->msgBuf); taosMemoryFreeClear(pRequest->pDb); diff --git a/source/client/src/clientImpl.c b/source/client/src/clientImpl.c index 5963e419e1..de14266099 100644 --- a/source/client/src/clientImpl.c +++ b/source/client/src/clientImpl.c @@ -237,6 +237,16 @@ int32_t buildRequest(uint64_t connId, const char* sql, int sqlLen, void* param, return TSDB_CODE_SUCCESS; } +int32_t buildPreviousRequest(SRequestObj *pRequest, const char* sql, SRequestObj** pNewRequest) { + int32_t code = buildRequest(pRequest->pTscObj->id, sql, strlen(sql), pRequest, pRequest->validateOnly, pNewRequest, 0); + if (TSDB_CODE_SUCCESS == code) { + pRequest->relation.prevRefId = (*pNewRequest)->self; + (*pNewRequest)->relation.nextRefId = pRequest->self; + (*pNewRequest)->relation.userRefId = pRequest->self; + } + return code; +} + int32_t parseSql(SRequestObj* pRequest, bool topicQuery, SQuery** pQuery, SStmtCallback* pStmtCb) { STscObj* pTscObj = pRequest->pTscObj; @@ -878,6 +888,81 @@ static bool incompletaFileParsing(SNode* pStmt) { return QUERY_NODE_VNODE_MODIFY_STMT != nodeType(pStmt) ? false : ((SVnodeModifyOpStmt*)pStmt)->fileProcessing; } +void continuePostSubQuery(SRequestObj* pRequest, TAOS_ROW row) { + SSqlCallbackWrapper* pWrapper = pRequest->pWrapper; + int32_t code = nodesAcquireAllocator(pWrapper->pParseCtx->allocatorId); + if (TSDB_CODE_SUCCESS == code) { + int64_t analyseStart = taosGetTimestampUs(); + code = qContinueParsePostQuery(pWrapper->pParseCtx, pRequest->pQuery, (void**)row); + pRequest->metric.analyseCostUs += taosGetTimestampUs() - analyseStart; + } + if (TSDB_CODE_SUCCESS == code) { + code = qContinuePlanPostQuery(pRequest->pPostPlan); + } + nodesReleaseAllocator(pWrapper->pParseCtx->allocatorId); + + handleQueryAnslyseRes(pWrapper, NULL, code); +} + +void returnToUser(SRequestObj* pRequest) { + if (pRequest->relation.userRefId == pRequest->self || 0 == pRequest->relation.userRefId) { + // return to client + pRequest->body.queryFp(pRequest->body.param, pRequest, pRequest->code); + return; + } + + SRequestObj* pUserReq = acquireRequest(pRequest->relation.userRefId); + if (pUserReq) { + pUserReq->code = pRequest->code; + // return to client + pUserReq->body.queryFp(pUserReq->body.param, pUserReq, pUserReq->code); + releaseRequest(pRequest->relation.userRefId); + return; + } else { + tscError("0x%" PRIx64 ", user ref 0x%" PRIx64 " is not there, reqId:0x%" PRIx64, pRequest->self, + pRequest->relation.userRefId, pRequest->requestId); + } +} + +void postSubQueryFetchCb(void* param, TAOS_RES* res, int32_t rowNum) { + SRequestObj* pRequest = (SRequestObj*)res; + if (pRequest->code) { + returnToUser(pRequest); + return; + } + + TAOS_ROW row = NULL; + if (rowNum > 0) { + row = taos_fetch_row(res); // for single row only now + } + + SRequestObj* pNextReq = acquireRequest(pRequest->relation.nextRefId); + if (pNextReq) { + continuePostSubQuery(pNextReq, row); + releaseRequest(pRequest->relation.nextRefId); + } else { + tscError("0x%" PRIx64 ", next req ref 0x%" PRIx64 " is not there, reqId:0x%" PRIx64, pRequest->self, + pRequest->relation.nextRefId, pRequest->requestId); + } +} + +void handlePostSubQuery(SSqlCallbackWrapper* pWrapper) { + SRequestObj* pRequest = pWrapper->pRequest; + if (TD_RES_QUERY(pRequest)) { + taosAsyncFetchImpl(pRequest, postSubQueryFetchCb, pWrapper); + return; + } + + SRequestObj* pNextReq = acquireRequest(pRequest->relation.nextRefId); + if (pNextReq) { + continuePostSubQuery(pNextReq, NULL); + releaseRequest(pRequest->relation.nextRefId); + } else { + tscError("0x%" PRIx64 ", next req ref 0x%" PRIx64 " is not there, reqId:0x%" PRIx64, pRequest->self, + pRequest->relation.nextRefId, pRequest->requestId); + } +} + // todo refacto the error code mgmt void schedulerExecCb(SExecResult* pResult, void* param, int32_t code) { SSqlCallbackWrapper* pWrapper = param; @@ -912,12 +997,7 @@ void schedulerExecCb(SExecResult* pResult, void* param, int32_t code) { if (code != TSDB_CODE_SUCCESS && NEED_CLIENT_HANDLE_ERROR(code) && pRequest->sqlstr != NULL) { tscDebug("0x%" PRIx64 " client retry to handle the error, code:%s, tryCount:%d, reqId:0x%" PRIx64, pRequest->self, tstrerror(code), pRequest->retry, pRequest->requestId); - pRequest->prevCode = code; - schedulerFreeJob(&pRequest->body.queryJob, 0); - qDestroyQuery(pRequest->pQuery); - pRequest->pQuery = NULL; - destorySqlCallbackWrapper(pWrapper); - doAsyncQuery(pRequest, true); + restartAsyncQuery(pRequest, code); return; } @@ -938,10 +1018,15 @@ void schedulerExecCb(SExecResult* pResult, void* param, int32_t code) { return; } - destorySqlCallbackWrapper(pWrapper); + if (pRequest->relation.nextRefId) { + handlePostSubQuery(pWrapper); + } else { + destorySqlCallbackWrapper(pWrapper); + pRequest->pWrapper = NULL; - // return to client - pRequest->body.queryFp(pRequest->body.param, pRequest, code); + // return to client + pRequest->body.queryFp(pRequest->body.param, pRequest, code); + } } SRequestObj* launchQueryImpl(SRequestObj* pRequest, SQuery* pQuery, bool keepQuery, void** res) { @@ -1049,6 +1134,7 @@ static int32_t asyncExecSchQuery(SRequestObj* pRequest, SQuery* pQuery, SMetaDat pRequest->requestId); } else { pRequest->body.subplanNum = pDag->numOfSubplans; + TSWAP(pRequest->pPostPlan, pDag->pPostPlan); } pRequest->metric.execStart = taosGetTimestampUs(); @@ -1084,6 +1170,7 @@ static int32_t asyncExecSchQuery(SRequestObj* pRequest, SQuery* pQuery, SMetaDat tscDebug("0x%" PRIx64 " plan not executed, code:%s 0x%" PRIx64, pRequest->self, tstrerror(code), pRequest->requestId); destorySqlCallbackWrapper(pWrapper); + pRequest->pWrapper = NULL; if (TSDB_CODE_SUCCESS != code) { pRequest->code = terrno; } @@ -1103,6 +1190,7 @@ void launchAsyncQuery(SRequestObj* pRequest, SQuery* pQuery, SMetaData* pResultM pRequest->body.execMode = pQuery->execMode; if (QUERY_EXEC_MODE_SCHEDULE != pRequest->body.execMode) { destorySqlCallbackWrapper(pWrapper); + pRequest->pWrapper = NULL; } if (pQuery->pRoot && !pRequest->inRetry) { @@ -2402,3 +2490,90 @@ TAOS_RES* taosQueryImplWithReqid(TAOS* taos, const char* sql, bool validateOnly, return pRequest; } + + +static void fetchCallback(void *pResult, void *param, int32_t code) { + SRequestObj *pRequest = (SRequestObj *)param; + + SReqResultInfo *pResultInfo = &pRequest->body.resInfo; + + tscDebug("0x%" PRIx64 " enter scheduler fetch cb, code:%d - %s, reqId:0x%" PRIx64, pRequest->self, code, + tstrerror(code), pRequest->requestId); + + pResultInfo->pData = pResult; + pResultInfo->numOfRows = 0; + + if (code != TSDB_CODE_SUCCESS) { + pRequest->code = code; + taosMemoryFreeClear(pResultInfo->pData); + pRequest->body.fetchFp(pRequest->body.param, pRequest, 0); + return; + } + + if (pRequest->code != TSDB_CODE_SUCCESS) { + taosMemoryFreeClear(pResultInfo->pData); + pRequest->body.fetchFp(pRequest->body.param, pRequest, 0); + return; + } + + pRequest->code = + setQueryResultFromRsp(pResultInfo, (const SRetrieveTableRsp *)pResultInfo->pData, pResultInfo->convertUcs4, true); + if (pRequest->code != TSDB_CODE_SUCCESS) { + pResultInfo->numOfRows = 0; + pRequest->code = code; + tscError("0x%" PRIx64 " fetch results failed, code:%s, reqId:0x%" PRIx64, pRequest->self, tstrerror(code), + pRequest->requestId); + } else { + tscDebug("0x%" PRIx64 " fetch results, numOfRows:%" PRId64 " total Rows:%" PRId64 ", complete:%d, reqId:0x%" PRIx64, + pRequest->self, pResultInfo->numOfRows, pResultInfo->totalRows, pResultInfo->completed, + pRequest->requestId); + + STscObj *pTscObj = pRequest->pTscObj; + SAppClusterSummary *pActivity = &pTscObj->pAppInfo->summary; + atomic_add_fetch_64((int64_t *)&pActivity->fetchBytes, pRequest->body.resInfo.payloadLen); + } + + pRequest->body.fetchFp(pRequest->body.param, pRequest, pResultInfo->numOfRows); +} + +void taosAsyncFetchImpl(SRequestObj *pRequest, __taos_async_fn_t fp, void *param) { + pRequest->body.fetchFp = fp; + pRequest->body.param = param; + + SReqResultInfo *pResultInfo = &pRequest->body.resInfo; + + // this query has no results or error exists, return directly + if (taos_num_fields(pRequest) == 0 || pRequest->code != TSDB_CODE_SUCCESS) { + pResultInfo->numOfRows = 0; + pRequest->body.fetchFp(param, pRequest, pResultInfo->numOfRows); + return; + } + + // all data has returned to App already, no need to try again + if (pResultInfo->completed) { + // it is a local executed query, no need to do async fetch + if (QUERY_EXEC_MODE_SCHEDULE != pRequest->body.execMode) { + if (pResultInfo->localResultFetched) { + pResultInfo->numOfRows = 0; + pResultInfo->current = 0; + } else { + pResultInfo->localResultFetched = true; + } + } else { + pResultInfo->numOfRows = 0; + } + + pRequest->body.fetchFp(param, pRequest, pResultInfo->numOfRows); + return; + } + + SSchedulerReq req = { + .syncReq = false, + .fetchFp = fetchCallback, + .cbParam = pRequest, + }; + + schedulerFetchRows(pRequest->body.queryJob, &req); +} + + diff --git a/source/client/src/clientMain.c b/source/client/src/clientMain.c index 63a4e5d2e5..ca4acf3edf 100644 --- a/source/client/src/clientMain.c +++ b/source/client/src/clientMain.c @@ -774,8 +774,13 @@ static void destoryCatalogReq(SCatalogReq *pCatalogReq) { taosArrayDestroy(pCatalogReq->pDbVgroup); taosArrayDestroy(pCatalogReq->pDbCfg); taosArrayDestroy(pCatalogReq->pDbInfo); - taosArrayDestroyEx(pCatalogReq->pTableMeta, destoryTablesReq); - taosArrayDestroyEx(pCatalogReq->pTableHash, destoryTablesReq); + if (pCatalogReq->cloned) { + taosArrayDestroy(pCatalogReq->pTableMeta); + taosArrayDestroy(pCatalogReq->pTableHash); + } else { + taosArrayDestroyEx(pCatalogReq->pTableMeta, destoryTablesReq); + taosArrayDestroyEx(pCatalogReq->pTableHash, destoryTablesReq); + } taosArrayDestroy(pCatalogReq->pUdf); taosArrayDestroy(pCatalogReq->pIndex); taosArrayDestroy(pCatalogReq->pUser); @@ -794,26 +799,108 @@ void destorySqlCallbackWrapper(SSqlCallbackWrapper *pWrapper) { taosMemoryFree(pWrapper); } +void destroyCtxInRequest(SRequestObj* pRequest) { + schedulerFreeJob(&pRequest->body.queryJob, 0); + qDestroyQuery(pRequest->pQuery); + pRequest->pQuery = NULL; + destorySqlCallbackWrapper(pRequest->pWrapper); + pRequest->pWrapper = NULL; +} + + static void doAsyncQueryFromAnalyse(SMetaData *pResultMeta, void *param, int32_t code) { SSqlCallbackWrapper *pWrapper = (SSqlCallbackWrapper *)param; SRequestObj *pRequest = pWrapper->pRequest; SQuery *pQuery = pRequest->pQuery; - int64_t analyseStart = taosGetTimestampUs(); - pRequest->metric.ctgCostUs = analyseStart - pRequest->metric.ctgStart; qDebug("0x%" PRIx64 " start to semantic analysis, reqId:0x%" PRIx64, pRequest->self, pRequest->requestId); - if (code == TSDB_CODE_SUCCESS) { + int64_t analyseStart = taosGetTimestampUs(); + pRequest->metric.ctgCostUs = analyseStart - pRequest->metric.ctgStart; + + if (TSDB_CODE_SUCCESS == code) { code = qAnalyseSqlSemantic(pWrapper->pParseCtx, pWrapper->pCatalogReq, pResultMeta, pQuery); + } + + pRequest->metric.analyseCostUs += taosGetTimestampUs() - analyseStart; + + handleQueryAnslyseRes(pWrapper, pResultMeta, code); +} + +int32_t cloneCatalogReq(SCatalogReq* * ppTarget, SCatalogReq* pSrc) { + int32_t code = TSDB_CODE_SUCCESS; + SCatalogReq* pTarget = taosMemoryCalloc(1, sizeof(SCatalogReq)); + if (pTarget == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + } else { + pTarget->pDbVgroup = taosArrayDup(pSrc->pDbVgroup, NULL); + pTarget->pDbCfg = taosArrayDup(pSrc->pDbCfg, NULL); + pTarget->pDbInfo = taosArrayDup(pSrc->pDbInfo, NULL); + pTarget->pTableMeta = taosArrayDup(pSrc->pTableMeta, NULL); + pTarget->pTableHash = taosArrayDup(pSrc->pTableHash, NULL); + pTarget->pUdf = taosArrayDup(pSrc->pUdf, NULL); + pTarget->pIndex = taosArrayDup(pSrc->pIndex, NULL); + pTarget->pUser = taosArrayDup(pSrc->pUser, NULL); + pTarget->pTableIndex = taosArrayDup(pSrc->pTableIndex, NULL); + pTarget->pTableCfg = taosArrayDup(pSrc->pTableCfg, NULL); + pTarget->pTableTag = taosArrayDup(pSrc->pTableTag, NULL); + pTarget->qNodeRequired = pSrc->qNodeRequired; + pTarget->dNodeRequired = pSrc->dNodeRequired; + pTarget->svrVerRequired = pSrc->svrVerRequired; + pTarget->forceUpdate = pSrc->forceUpdate; + pTarget->cloned = true; + + *ppTarget = pTarget; + } + + return code; +} + + +void handleSubQueryFromAnalyse(SSqlCallbackWrapper *pWrapper, SMetaData *pResultMeta, SNode* pRoot) { + SRequestObj* pNewRequest = NULL; + SSqlCallbackWrapper* pNewWrapper = NULL; + int32_t code = buildPreviousRequest(pWrapper->pRequest, "", &pNewRequest); + if (code) { + handleQueryAnslyseRes(pWrapper, pResultMeta, code); + return; + } + + pNewRequest->pQuery = (SQuery*)nodesMakeNode(QUERY_NODE_QUERY); + if (NULL == pNewRequest->pQuery) { + code = TSDB_CODE_OUT_OF_MEMORY; + } else { + pNewRequest->pQuery->pRoot = pRoot; + pRoot = NULL; + pNewRequest->pQuery->execStage = QUERY_EXEC_STAGE_ANALYSE; + } + if (TSDB_CODE_SUCCESS == code) { + code = prepareAndParseSqlSyntax(&pNewWrapper, pNewRequest, false); + } + if (TSDB_CODE_SUCCESS == code) { + code = cloneCatalogReq(&pNewWrapper->pCatalogReq, pWrapper->pCatalogReq); + } + doAsyncQueryFromAnalyse(pResultMeta, pNewWrapper, code); + nodesDestroyNode(pRoot); +} + +void handleQueryAnslyseRes(SSqlCallbackWrapper *pWrapper, SMetaData *pResultMeta, int32_t code) { + SRequestObj *pRequest = pWrapper->pRequest; + SQuery *pQuery = pRequest->pQuery; + + if (code == TSDB_CODE_SUCCESS && pQuery->pPrevRoot) { + SNode* prevRoot = pQuery->pPrevRoot; + pQuery->pPrevRoot = NULL; + handleSubQueryFromAnalyse(pWrapper, pResultMeta, prevRoot); + return; + } + + if (code == TSDB_CODE_SUCCESS) { pRequest->stableQuery = pQuery->stableQuery; if (pQuery->pRoot) { pRequest->stmtType = pQuery->pRoot->type; } - } - pRequest->metric.analyseCostUs = taosGetTimestampUs() - analyseStart; - - if (code == TSDB_CODE_SUCCESS) { if (pQuery->haveResultSet) { setResSchemaInfo(&pRequest->body.resInfo, pQuery->pResSchema, pQuery->numOfResCols); setResPrecision(&pRequest->body.resInfo, pQuery->precision); @@ -826,14 +913,14 @@ static void doAsyncQueryFromAnalyse(SMetaData *pResultMeta, void *param, int32_t launchAsyncQuery(pRequest, pQuery, pResultMeta, pWrapper); } else { destorySqlCallbackWrapper(pWrapper); + pRequest->pWrapper = NULL; qDestroyQuery(pRequest->pQuery); pRequest->pQuery = NULL; if (NEED_CLIENT_HANDLE_ERROR(code)) { tscDebug("0x%" PRIx64 " client retry to handle the error, code:%d - %s, tryCount:%d, reqId:0x%" PRIx64, pRequest->self, code, tstrerror(code), pRequest->retry, pRequest->requestId); - pRequest->prevCode = code; - doAsyncQuery(pRequest, true); + restartAsyncQuery(pRequest, code); return; } @@ -841,7 +928,7 @@ static void doAsyncQueryFromAnalyse(SMetaData *pResultMeta, void *param, int32_t tscError("0x%" PRIx64 " error occurs, code:%s, return to user app, reqId:0x%" PRIx64, pRequest->self, tstrerror(code), pRequest->requestId); pRequest->code = code; - pRequest->body.queryFp(pRequest->body.param, pRequest, code); + returnToUser(pRequest); } } @@ -904,6 +991,7 @@ static void doAsyncQueryFromParse(SMetaData *pResultMeta, void *param, int32_t c tscError("0x%" PRIx64 " error happens, code:%d - %s, reqId:0x%" PRIx64, pWrapper->pRequest->self, code, tstrerror(code), pWrapper->pRequest->requestId); destorySqlCallbackWrapper(pWrapper); + pRequest->pWrapper = NULL; terrno = code; pRequest->code = code; pRequest->body.queryFp(pRequest->body.param, pRequest, code); @@ -920,6 +1008,7 @@ void continueInsertFromCsv(SSqlCallbackWrapper *pWrapper, SRequestObj *pRequest) tscError("0x%" PRIx64 " error happens, code:%d - %s, reqId:0x%" PRIx64, pWrapper->pRequest->self, code, tstrerror(code), pWrapper->pRequest->requestId); destorySqlCallbackWrapper(pWrapper); + pRequest->pWrapper = NULL; terrno = code; pRequest->code = code; pRequest->body.queryFp(pRequest->body.param, pRequest, code); @@ -967,27 +1056,16 @@ int32_t createParseContext(const SRequestObj *pRequest, SParseContext **pCxt) { return TSDB_CODE_SUCCESS; } -void doAsyncQuery(SRequestObj *pRequest, bool updateMetaForce) { +int32_t prepareAndParseSqlSyntax(SSqlCallbackWrapper **ppWrapper, SRequestObj *pRequest, bool updateMetaForce) { + int32_t code = TSDB_CODE_SUCCESS; STscObj *pTscObj = pRequest->pTscObj; - SSqlCallbackWrapper *pWrapper = NULL; - int32_t code = TSDB_CODE_SUCCESS; - - if (pRequest->retry++ > REQUEST_TOTAL_EXEC_TIMES) { - code = pRequest->prevCode; - terrno = code; - pRequest->code = code; - tscDebug("call sync query cb with code: %s", tstrerror(code)); - pRequest->body.queryFp(pRequest->body.param, pRequest, code); - return; - } - - if (TSDB_CODE_SUCCESS == code) { - pWrapper = taosMemoryCalloc(1, sizeof(SSqlCallbackWrapper)); - if (pWrapper == NULL) { - code = TSDB_CODE_OUT_OF_MEMORY; - } else { - pWrapper->pRequest = pRequest; - } + SSqlCallbackWrapper *pWrapper = taosMemoryCalloc(1, sizeof(SSqlCallbackWrapper)); + if (pWrapper == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + } else { + pWrapper->pRequest = pRequest; + pRequest->pWrapper = pWrapper; + *ppWrapper = pWrapper; } if (TSDB_CODE_SUCCESS == code) { @@ -999,7 +1077,7 @@ void doAsyncQuery(SRequestObj *pRequest, bool updateMetaForce) { code = catalogGetHandle(pTscObj->pAppInfo->clusterId, &pWrapper->pParseCtx->pCatalog); } - if (TSDB_CODE_SUCCESS == code) { + if (TSDB_CODE_SUCCESS == code && NULL == pRequest->pQuery) { int64_t syntaxStart = taosGetTimestampUs(); pWrapper->pCatalogReq = taosMemoryCalloc(1, sizeof(SCatalogReq)); @@ -1014,6 +1092,27 @@ void doAsyncQuery(SRequestObj *pRequest, bool updateMetaForce) { pRequest->metric.parseCostUs += taosGetTimestampUs() - syntaxStart; } + return code; +} + + +void doAsyncQuery(SRequestObj *pRequest, bool updateMetaForce) { + SSqlCallbackWrapper *pWrapper = NULL; + int32_t code = TSDB_CODE_SUCCESS; + + if (pRequest->retry++ > REQUEST_TOTAL_EXEC_TIMES) { + code = pRequest->prevCode; + terrno = code; + pRequest->code = code; + tscDebug("call sync query cb with code: %s", tstrerror(code)); + pRequest->body.queryFp(pRequest->body.param, pRequest, code); + return; + } + + if (TSDB_CODE_SUCCESS == code) { + code = prepareAndParseSqlSyntax(&pWrapper, pRequest, updateMetaForce); + } + if (TSDB_CODE_SUCCESS == code) { pRequest->stmtType = pRequest->pQuery->pRoot->type; code = phaseAsyncQuery(pWrapper); @@ -1023,6 +1122,7 @@ void doAsyncQuery(SRequestObj *pRequest, bool updateMetaForce) { tscError("0x%" PRIx64 " error happens, code:%d - %s, reqId:0x%" PRIx64, pRequest->self, code, tstrerror(code), pRequest->requestId); destorySqlCallbackWrapper(pWrapper); + pRequest->pWrapper = NULL; qDestroyQuery(pRequest->pQuery); pRequest->pQuery = NULL; @@ -1040,48 +1140,57 @@ void doAsyncQuery(SRequestObj *pRequest, bool updateMetaForce) { } } -static void fetchCallback(void *pResult, void *param, int32_t code) { - SRequestObj *pRequest = (SRequestObj *)param; - - SReqResultInfo *pResultInfo = &pRequest->body.resInfo; - - tscDebug("0x%" PRIx64 " enter scheduler fetch cb, code:%d - %s, reqId:0x%" PRIx64, pRequest->self, code, - tstrerror(code), pRequest->requestId); - - pResultInfo->pData = pResult; - pResultInfo->numOfRows = 0; - - if (code != TSDB_CODE_SUCCESS) { - pRequest->code = code; - taosMemoryFreeClear(pResultInfo->pData); - pRequest->body.fetchFp(pRequest->body.param, pRequest, 0); - return; +void restartAsyncQuery(SRequestObj *pRequest, int32_t code) { + int32_t reqIdx = 0; + SRequestObj *pReqList[16] = {NULL}; + SRequestObj *pUserReq = NULL; + pReqList[0] = pRequest; + uint64_t tmpRefId = 0; + SRequestObj* pTmp = pRequest; + while (pTmp->relation.prevRefId) { + tmpRefId = pTmp->relation.prevRefId; + pTmp = acquireRequest(tmpRefId); + if (pTmp) { + pReqList[++reqIdx] = pTmp; + releaseRequest(tmpRefId); + } else { + tscError("0x%" PRIx64 ", prev req ref 0x%" PRIx64 " is not there, reqId:0x%" PRIx64, pTmp->self, + tmpRefId, pTmp->requestId); + break; + } } - if (pRequest->code != TSDB_CODE_SUCCESS) { - taosMemoryFreeClear(pResultInfo->pData); - pRequest->body.fetchFp(pRequest->body.param, pRequest, 0); - return; + tmpRefId = pRequest->relation.nextRefId; + while (tmpRefId) { + pTmp = acquireRequest(tmpRefId); + if (pTmp) { + tmpRefId = pTmp->relation.nextRefId; + removeRequest(pTmp->self); + releaseRequest(pTmp->self); + } else { + tscError("0x%" PRIx64 " is not there", tmpRefId); + break; + } } - pRequest->code = - setQueryResultFromRsp(pResultInfo, (const SRetrieveTableRsp *)pResultInfo->pData, pResultInfo->convertUcs4, true); - if (pRequest->code != TSDB_CODE_SUCCESS) { - pResultInfo->numOfRows = 0; - pRequest->code = code; - tscError("0x%" PRIx64 " fetch results failed, code:%s, reqId:0x%" PRIx64, pRequest->self, tstrerror(code), - pRequest->requestId); + for (int32_t i = reqIdx; i >= 0; i--) { + destroyCtxInRequest(pReqList[i]); + if (pReqList[i]->relation.userRefId == pReqList[i]->self || 0 == pReqList[i]->relation.userRefId) { + pUserReq = pReqList[i]; + } else { + removeRequest(pReqList[i]->self); + } + } + + if (pUserReq) { + pUserReq->prevCode = code; + memset(&pUserReq->relation, 0, sizeof(pUserReq->relation)); } else { - tscDebug("0x%" PRIx64 " fetch results, numOfRows:%" PRId64 " total Rows:%" PRId64 ", complete:%d, reqId:0x%" PRIx64, - pRequest->self, pResultInfo->numOfRows, pResultInfo->totalRows, pResultInfo->completed, - pRequest->requestId); - - STscObj *pTscObj = pRequest->pTscObj; - SAppClusterSummary *pActivity = &pTscObj->pAppInfo->summary; - atomic_add_fetch_64((int64_t *)&pActivity->fetchBytes, pRequest->body.resInfo.payloadLen); + tscError("user req is missing"); + return; } - pRequest->body.fetchFp(pRequest->body.param, pRequest, pResultInfo->numOfRows); + doAsyncQuery(pUserReq, true); } void taos_fetch_rows_a(TAOS_RES *res, __taos_async_fn_t fp, void *param) { @@ -1095,43 +1204,8 @@ void taos_fetch_rows_a(TAOS_RES *res, __taos_async_fn_t fp, void *param) { } SRequestObj *pRequest = res; - pRequest->body.fetchFp = fp; - pRequest->body.param = param; - SReqResultInfo *pResultInfo = &pRequest->body.resInfo; - - // this query has no results or error exists, return directly - if (taos_num_fields(pRequest) == 0 || pRequest->code != TSDB_CODE_SUCCESS) { - pResultInfo->numOfRows = 0; - pRequest->body.fetchFp(param, pRequest, pResultInfo->numOfRows); - return; - } - - // all data has returned to App already, no need to try again - if (pResultInfo->completed) { - // it is a local executed query, no need to do async fetch - if (QUERY_EXEC_MODE_SCHEDULE != pRequest->body.execMode) { - if (pResultInfo->localResultFetched) { - pResultInfo->numOfRows = 0; - pResultInfo->current = 0; - } else { - pResultInfo->localResultFetched = true; - } - } else { - pResultInfo->numOfRows = 0; - } - - pRequest->body.fetchFp(param, pRequest, pResultInfo->numOfRows); - return; - } - - SSchedulerReq req = { - .syncReq = false, - .fetchFp = fetchCallback, - .cbParam = pRequest, - }; - - schedulerFetchRows(pRequest->body.queryJob, &req); + taosAsyncFetchImpl(pRequest, fp, param); } void taos_fetch_raw_block_a(TAOS_RES *res, __taos_async_fn_t fp, void *param) { diff --git a/source/common/src/tmsg.c b/source/common/src/tmsg.c index ac035e0a2b..cd874dad50 100644 --- a/source/common/src/tmsg.c +++ b/source/common/src/tmsg.c @@ -6122,6 +6122,7 @@ int32_t tSerializeSCMCreateStreamReq(void *buf, int32_t bufLen, const SCMCreateS } if (tEncodeI64(&encoder, pReq->deleteMark) < 0) return -1; if (tEncodeI8(&encoder, pReq->igUpdate) < 0) return -1; + if (tEncodeI64(&encoder, pReq->lastTs) < 0) return -1; tEndEncode(&encoder); @@ -6207,6 +6208,7 @@ int32_t tDeserializeSCMCreateStreamReq(void *buf, int32_t bufLen, SCMCreateStrea if (tDecodeI64(&decoder, &pReq->deleteMark) < 0) return -1; if (tDecodeI8(&decoder, &pReq->igUpdate) < 0) return -1; + if (tDecodeI64(&decoder, &pReq->lastTs) < 0) return -1; tEndDecode(&decoder); @@ -6273,6 +6275,9 @@ int32_t tDeserializeSMRecoverStreamReq(void *buf, int32_t bufLen, SMRecoverStrea } void tFreeSCMCreateStreamReq(SCMCreateStreamReq *pReq) { + if (NULL == pReq) { + return; + } taosArrayDestroy(pReq->pTags); taosMemoryFreeClear(pReq->sql); taosMemoryFreeClear(pReq->ast); diff --git a/source/libs/function/src/functionMgt.c b/source/libs/function/src/functionMgt.c index 18f6e8050b..327bc7da71 100644 --- a/source/libs/function/src/functionMgt.c +++ b/source/libs/function/src/functionMgt.c @@ -344,7 +344,7 @@ static int32_t getFuncInfo(SFunctionNode* pFunc) { return fmGetFuncInfo(pFunc, msg, sizeof(msg)); } -static SFunctionNode* createFunction(const char* pName, SNodeList* pParameterList) { +SFunctionNode* createFunction(const char* pName, SNodeList* pParameterList) { SFunctionNode* pFunc = (SFunctionNode*)nodesMakeNode(QUERY_NODE_FUNCTION); if (NULL == pFunc) { return NULL; diff --git a/source/libs/nodes/src/nodesUtilFuncs.c b/source/libs/nodes/src/nodesUtilFuncs.c index 39e288f694..ee2bc7e442 100644 --- a/source/libs/nodes/src/nodesUtilFuncs.c +++ b/source/libs/nodes/src/nodesUtilFuncs.c @@ -953,6 +953,8 @@ void nodesDestroyNode(SNode* pNode) { nodesDestroyNode(pStmt->pQuery); nodesDestroyList(pStmt->pTags); nodesDestroyNode(pStmt->pSubtable); + tFreeSCMCreateStreamReq(pStmt->pReq); + taosMemoryFreeClear(pStmt->pReq); break; } case QUERY_NODE_DROP_STREAM_STMT: // no pointer field @@ -1052,6 +1054,7 @@ void nodesDestroyNode(SNode* pNode) { case QUERY_NODE_QUERY: { SQuery* pQuery = (SQuery*)pNode; nodesDestroyNode(pQuery->pRoot); + nodesDestroyNode(pQuery->pPostRoot); taosMemoryFreeClear(pQuery->pResSchema); if (NULL != pQuery->pCmdMsg) { taosMemoryFreeClear(pQuery->pCmdMsg->pMsg); diff --git a/source/libs/parser/inc/parInt.h b/source/libs/parser/inc/parInt.h index 66aec272d7..d79aa84bb8 100644 --- a/source/libs/parser/inc/parInt.h +++ b/source/libs/parser/inc/parInt.h @@ -34,6 +34,7 @@ int32_t authenticate(SParseContext* pParseCxt, SQuery* pQuery, SParseMetaCache* int32_t translate(SParseContext* pParseCxt, SQuery* pQuery, SParseMetaCache* pMetaCache); int32_t extractResultSchema(const SNode* pRoot, int32_t* numOfCols, SSchema** pSchema); int32_t calculateConstant(SParseContext* pParseCxt, SQuery* pQuery); +int32_t translatePostCreateStream(SParseContext* pParseCxt, SQuery* pQuery, void** pResRow); #ifdef __cplusplus } diff --git a/source/libs/parser/src/parAstParser.c b/source/libs/parser/src/parAstParser.c index 801d43e2a4..fdec9cba79 100644 --- a/source/libs/parser/src/parAstParser.c +++ b/source/libs/parser/src/parAstParser.c @@ -384,6 +384,10 @@ static int32_t collectMetaKeyFromCreateStream(SCollectMetaKeyCxt* pCxt, SCreateS if (TSDB_CODE_SUCCESS == code) { code = collectMetaKeyFromQuery(pCxt, pStmt->pQuery); } + if (TSDB_CODE_SUCCESS == code && pStmt->pOptions->fillHistory) { + SSelectStmt* pSelect = (SSelectStmt*)pStmt->pQuery; + code = reserveDbCfgForLastRow(pCxt, pSelect->pFromTable); + } return code; } diff --git a/source/libs/parser/src/parTranslater.c b/source/libs/parser/src/parTranslater.c index d1c3da0041..905b32a5f3 100644 --- a/source/libs/parser/src/parTranslater.c +++ b/source/libs/parser/src/parTranslater.c @@ -53,6 +53,8 @@ typedef struct STranslateContext { bool createStream; bool stableQuery; bool showRewrite; + SNode* pPrevRoot; + SNode* pPostRoot; } STranslateContext; typedef struct SBuildTopicContext { @@ -276,6 +278,10 @@ static const SSysTableShowAdapter sysTableShowAdapter[] = { static int32_t translateSubquery(STranslateContext* pCxt, SNode* pNode); static int32_t translateQuery(STranslateContext* pCxt, SNode* pNode); static EDealRes translateValue(STranslateContext* pCxt, SValueNode* pVal); +static int32_t createSimpleSelectStmtFromProjList(const char* pDb, const char* pTable, SNodeList* pProjectionList, SSelectStmt** pStmt); +static int32_t createLastTsSelectStmt(char* pDb, char* pTable, STableMeta* pMeta, SNode** pQuery); +static int32_t setQuery(STranslateContext* pCxt, SQuery* pQuery); +static int32_t setRefreshMate(STranslateContext* pCxt, SQuery* pQuery); static bool afterGroupBy(ESqlClause clause) { return clause > SQL_CLAUSE_GROUP_BY; } @@ -6730,6 +6736,54 @@ static int32_t translateStreamTargetTable(STranslateContext* pCxt, SCreateStream return code; } +static int32_t createLastTsSelectStmt(char* pDb, char* pTable, STableMeta* pMeta, SNode** pQuery) { + SColumnNode* col = (SColumnNode*)nodesMakeNode(QUERY_NODE_COLUMN); + if (NULL == col) { + return TSDB_CODE_OUT_OF_MEMORY; + } + + strcpy(col->tableAlias, pTable); + strcpy(col->colName, pMeta->schema[0].name); + SNodeList* pParamterList = nodesMakeList(); + if (NULL == pParamterList) { + nodesDestroyNode((SNode *)col); + return TSDB_CODE_OUT_OF_MEMORY; + } + + int32_t code = nodesListStrictAppend(pParamterList, (SNode *)col); + if (code) { + nodesDestroyNode((SNode *)col); + nodesDestroyList(pParamterList); + return code; + } + + SNode* pFunc = (SNode*)createFunction("last", pParamterList); + if (NULL == pFunc) { + nodesDestroyList(pParamterList); + return TSDB_CODE_OUT_OF_MEMORY; + } + + SNodeList* pProjectionList = nodesMakeList(); + if (NULL == pProjectionList) { + nodesDestroyList(pParamterList); + return TSDB_CODE_OUT_OF_MEMORY; + } + code = nodesListStrictAppend(pProjectionList, pFunc); + if (code) { + nodesDestroyNode(pFunc); + nodesDestroyList(pProjectionList); + return code; + } + + code = createSimpleSelectStmtFromProjList(pDb, pTable, pProjectionList, (SSelectStmt **)pQuery); + if (code) { + nodesDestroyList(pProjectionList); + return code; + } + + return code; +} + static int32_t buildCreateStreamQuery(STranslateContext* pCxt, SCreateStreamStmt* pStmt, SCMCreateStreamReq* pReq) { pCxt->createStream = true; STableMeta* pMeta = NULL; @@ -6756,6 +6810,18 @@ static int32_t buildCreateStreamQuery(STranslateContext* pCxt, SCreateStreamStmt getSourceDatabase(pStmt->pQuery, pCxt->pParseCxt->acctId, pReq->sourceDB); code = nodesNodeToString(pStmt->pQuery, false, &pReq->ast, NULL); } + if (TSDB_CODE_SUCCESS == code && pStmt->pOptions->fillHistory) { + SRealTableNode* pTable = (SRealTableNode*)(((SSelectStmt*)pStmt->pQuery)->pFromTable); + code = createLastTsSelectStmt(pTable->table.dbName, pTable->table.tableName, pTable->pMeta, &pStmt->pPrevQuery); +/* + if (TSDB_CODE_SUCCESS == code) { + STranslateContext cxt = {0}; + int32_t code = initTranslateContext(pCxt->pParseCxt, pCxt->pMetaCache, &cxt); + code = translateQuery(&cxt, pStmt->pPrevQuery); + destroyTranslateContext(&cxt); + } +*/ + } taosMemoryFree(pMeta); return code; } @@ -6822,13 +6888,80 @@ static int32_t translateCreateStream(STranslateContext* pCxt, SCreateStreamStmt* code = buildCreateStreamReq(pCxt, pStmt, &createReq); } if (TSDB_CODE_SUCCESS == code) { - code = buildCmdMsg(pCxt, TDMT_MND_CREATE_STREAM, (FSerializeFunc)tSerializeSCMCreateStreamReq, &createReq); + if (NULL == pStmt->pPrevQuery) { + code = buildCmdMsg(pCxt, TDMT_MND_CREATE_STREAM, (FSerializeFunc)tSerializeSCMCreateStreamReq, &createReq); + } else { + pStmt->pReq = taosMemoryMalloc(sizeof(createReq)); + if (NULL == pStmt->pReq) { + code = TSDB_CODE_OUT_OF_MEMORY; + } else { + memcpy(pStmt->pReq, &createReq, sizeof(createReq)); + memset(&createReq, 0, sizeof(createReq)); + TSWAP(pCxt->pPrevRoot, pStmt->pPrevQuery); + } + } } tFreeSCMCreateStreamReq(&createReq); return code; } +int32_t buildIntervalForCreateStream(SCreateStreamStmt* pStmt, SInterval* pInterval) { + int32_t code = TSDB_CODE_SUCCESS; + if (QUERY_NODE_SELECT_STMT != nodeType(pStmt->pQuery)) { + return code; + } + SSelectStmt* pSelect = (SSelectStmt*)pStmt->pQuery; + if (NULL == pSelect->pWindow || QUERY_NODE_INTERVAL_WINDOW != nodeType(pSelect->pWindow)) { + return code; + } + + SIntervalWindowNode* pWindow = (SIntervalWindowNode*)pSelect->pWindow; + pInterval->interval = ((SValueNode*)pWindow->pInterval)->datum.i; + pInterval->intervalUnit = ((SValueNode*)pWindow->pInterval)->unit; + pInterval->offset = (NULL != pWindow->pOffset ? ((SValueNode*)pWindow->pOffset)->datum.i : 0); + pInterval->sliding = (NULL != pWindow->pSliding ? ((SValueNode*)pWindow->pSliding)->datum.i : pInterval->interval); + pInterval->slidingUnit = + (NULL != pWindow->pSliding ? ((SValueNode*)pWindow->pSliding)->unit : pInterval->intervalUnit); + pInterval->precision = ((SColumnNode*)pWindow->pCol)->node.resType.precision; + + return code; +} + +int32_t translatePostCreateStream(SParseContext* pParseCxt, SQuery* pQuery, void** pResRow) { + SCreateStreamStmt* pStmt = (SCreateStreamStmt*)pQuery->pRoot; + STranslateContext cxt = {0}; + SInterval interval = {0}; + + if (NULL == pResRow || NULL == pResRow[0]) { + return TSDB_CODE_PAR_INTERNAL_ERROR; + } + + int32_t code = initTranslateContext(pParseCxt, NULL, &cxt); + if (TSDB_CODE_SUCCESS == code) { + code = buildIntervalForCreateStream(pStmt, &interval); + } + if (TSDB_CODE_SUCCESS == code) { + if (interval.interval > 0) { + pStmt->pReq->lastTs = taosTimeTruncate(*(int64_t*)pResRow[0], &interval); + } else { + pStmt->pReq->lastTs = *(int64_t*)pResRow[0]; + } + code = buildCmdMsg(&cxt, TDMT_MND_CREATE_STREAM, (FSerializeFunc)tSerializeSCMCreateStreamReq, pStmt->pReq); + } + if (TSDB_CODE_SUCCESS == code) { + code = setQuery(&cxt, pQuery); + } + setRefreshMate(&cxt, pQuery); + destroyTranslateContext(&cxt); + + tFreeSCMCreateStreamReq(pStmt->pReq); + taosMemoryFreeClear(pStmt->pReq); + + return code; +} + + static int32_t translateDropStream(STranslateContext* pCxt, SDropStreamStmt* pStmt) { SMDropStreamReq dropReq = {0}; SName name; @@ -7509,8 +7642,7 @@ static SNodeList* createProjectCols(int32_t ncols, const char* const pCols[]) { return pProjections; } -static int32_t createSimpleSelectStmt(const char* pDb, const char* pTable, int32_t numOfProjs, - const char* const pProjCol[], SSelectStmt** pStmt) { +static int32_t createSimpleSelectStmtImpl(const char* pDb, const char* pTable, SNodeList* pProjectionList, SSelectStmt** pStmt) { SSelectStmt* pSelect = (SSelectStmt*)nodesMakeNode(QUERY_NODE_SELECT_STMT); if (NULL == pSelect) { return TSDB_CODE_OUT_OF_MEMORY; @@ -7526,27 +7658,38 @@ static int32_t createSimpleSelectStmt(const char* pDb, const char* pTable, int32 snprintf(pRealTable->table.tableName, sizeof(pRealTable->table.tableName), "%s", pTable); snprintf(pRealTable->table.tableAlias, sizeof(pRealTable->table.tableAlias), "%s", pTable); pSelect->pFromTable = (SNode*)pRealTable; - - if (numOfProjs >= 0) { - pSelect->pProjectionList = createProjectCols(numOfProjs, pProjCol); - if (NULL == pSelect->pProjectionList) { - nodesDestroyNode((SNode*)pSelect); - return TSDB_CODE_OUT_OF_MEMORY; - } - } + pSelect->pProjectionList = pProjectionList; *pStmt = pSelect; return TSDB_CODE_SUCCESS; } + +static int32_t createSimpleSelectStmtFromCols(const char* pDb, const char* pTable, int32_t numOfProjs, + const char* const pProjCol[], SSelectStmt** pStmt) { + SNodeList* pProjectionList = NULL; + if (numOfProjs >= 0) { + pProjectionList = createProjectCols(numOfProjs, pProjCol); + if (NULL == pProjectionList) { + return TSDB_CODE_OUT_OF_MEMORY; + } + } + + return createSimpleSelectStmtImpl(pDb, pTable, pProjectionList, pStmt); +} + +static int32_t createSimpleSelectStmtFromProjList(const char* pDb, const char* pTable, SNodeList* pProjectionList, SSelectStmt** pStmt) { + return createSimpleSelectStmtImpl(pDb, pTable, pProjectionList, pStmt); +} + static int32_t createSelectStmtForShow(ENodeType showType, SSelectStmt** pStmt) { const SSysTableShowAdapter* pShow = &sysTableShowAdapter[showType - SYSTABLE_SHOW_TYPE_OFFSET]; - return createSimpleSelectStmt(pShow->pDbName, pShow->pTableName, pShow->numOfShowCols, pShow->pShowCols, pStmt); + return createSimpleSelectStmtFromCols(pShow->pDbName, pShow->pTableName, pShow->numOfShowCols, pShow->pShowCols, pStmt); } static int32_t createSelectStmtForShowTableDist(SShowTableDistributedStmt* pStmt, SSelectStmt** pOutput) { - return createSimpleSelectStmt(pStmt->dbName, pStmt->tableName, 0, NULL, pOutput); + return createSimpleSelectStmtFromCols(pStmt->dbName, pStmt->tableName, 0, NULL, pOutput); } static int32_t createOperatorNode(EOperatorType opType, const char* pColName, SNode* pRight, SNode** pOp) { @@ -7680,7 +7823,7 @@ static int32_t createShowTableTagsProjections(SNodeList** pProjections, SNodeLis static int32_t rewriteShowStableTags(STranslateContext* pCxt, SQuery* pQuery) { SShowTableTagsStmt* pShow = (SShowTableTagsStmt*)pQuery->pRoot; SSelectStmt* pSelect = NULL; - int32_t code = createSimpleSelectStmt(((SValueNode*)pShow->pDbName)->literal, ((SValueNode*)pShow->pTbName)->literal, + int32_t code = createSimpleSelectStmtFromCols(((SValueNode*)pShow->pDbName)->literal, ((SValueNode*)pShow->pTbName)->literal, -1, NULL, &pSelect); if (TSDB_CODE_SUCCESS == code) { code = createShowTableTagsProjections(&pSelect->pProjectionList, &pShow->pTags); @@ -9005,6 +9148,7 @@ static int32_t setQuery(STranslateContext* pCxt, SQuery* pQuery) { } break; default: + pQuery->haveResultSet = false; pQuery->execMode = QUERY_EXEC_MODE_RPC; if (NULL != pCxt->pCmdMsg) { TSWAP(pQuery->pCmdMsg, pCxt->pCmdMsg); @@ -9039,6 +9183,10 @@ int32_t translate(SParseContext* pParseCxt, SQuery* pQuery, SParseMetaCache* pMe if (TSDB_CODE_SUCCESS == code) { code = translateQuery(&cxt, pQuery->pRoot); } + if (TSDB_CODE_SUCCESS == code && (cxt.pPrevRoot || cxt.pPostRoot)) { + pQuery->pPrevRoot = cxt.pPrevRoot; + pQuery->pPostRoot = cxt.pPostRoot; + } if (TSDB_CODE_SUCCESS == code) { code = setQuery(&cxt, pQuery); } diff --git a/source/libs/parser/src/parser.c b/source/libs/parser/src/parser.c index 28d116c381..cbddaf8115 100644 --- a/source/libs/parser/src/parser.c +++ b/source/libs/parser/src/parser.c @@ -204,7 +204,7 @@ int32_t qAnalyseSqlSemantic(SParseContext* pCxt, const struct SCatalogReq* pCata const struct SMetaData* pMetaData, SQuery* pQuery) { SParseMetaCache metaCache = {0}; int32_t code = nodesAcquireAllocator(pCxt->allocatorId); - if (TSDB_CODE_SUCCESS == code) { + if (TSDB_CODE_SUCCESS == code && pCatalogReq) { code = putMetaDataToCache(pCatalogReq, pMetaData, &metaCache); } if (TSDB_CODE_SUCCESS == code) { @@ -221,6 +221,19 @@ int32_t qContinueParseSql(SParseContext* pCxt, struct SCatalogReq* pCatalogReq, return parseInsertSql(pCxt, &pQuery, pCatalogReq, pMetaData); } +int32_t qContinueParsePostQuery(SParseContext* pCxt, SQuery* pQuery, void** pResRow) { + int32_t code = TSDB_CODE_SUCCESS; + switch (nodeType(pQuery->pRoot)) { + case QUERY_NODE_CREATE_STREAM_STMT: + code = translatePostCreateStream(pCxt, pQuery, pResRow); + break; + default: + break; + } + + return code; +} + void qDestroyParseContext(SParseContext* pCxt) { if (NULL == pCxt) { return; diff --git a/source/libs/planner/src/planner.c b/source/libs/planner/src/planner.c index 58b8e53478..2fcc8510d4 100644 --- a/source/libs/planner/src/planner.c +++ b/source/libs/planner/src/planner.c @@ -97,6 +97,12 @@ static int32_t setSubplanExecutionNode(SPhysiNode* pNode, int32_t groupId, SDown return TSDB_CODE_SUCCESS; } +int32_t qContinuePlanPostQuery(void *pPostPlan) { + //TODO + return TSDB_CODE_SUCCESS; +} + + int32_t qSetSubplanExecutionNode(SSubplan* subplan, int32_t groupId, SDownstreamSourceNode* pSource) { planDebug("QID:0x%" PRIx64 " set subplan execution node, groupId:%d", subplan->id.queryId, groupId); return setSubplanExecutionNode(subplan->pNode, groupId, pSource); From 59b8cfbc7da720a63c9cce6ba59c4d95444f4577 Mon Sep 17 00:00:00 2001 From: dapan1121 Date: Mon, 12 Jun 2023 17:14:41 +0800 Subject: [PATCH 2/5] enh: add sub_query in systable and kill subquery --- include/common/tmsg.h | 1 + source/client/inc/clientInt.h | 2 + source/client/src/clientEnv.c | 62 +++++++++++++++++++++ source/client/src/clientHb.c | 1 + source/client/src/clientImpl.c | 1 + source/client/src/clientMain.c | 15 +---- source/common/src/systable.c | 1 + source/common/src/tmsg.c | 2 + source/dnode/mnode/impl/src/mndProfile.c | 3 + source/libs/executor/src/exchangeoperator.c | 2 + 10 files changed, 78 insertions(+), 12 deletions(-) diff --git a/include/common/tmsg.h b/include/common/tmsg.h index 86308984ef..5d9161b5a3 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -2488,6 +2488,7 @@ typedef struct { int64_t stime; // timestamp precision ms int64_t reqRid; bool stableQuery; + bool isSubQuery; char fqdn[TSDB_FQDN_LEN]; int32_t subPlanNum; SArray* subDesc; // SArray diff --git a/source/client/inc/clientInt.h b/source/client/inc/clientInt.h index afa19c674e..fa444779f3 100644 --- a/source/client/inc/clientInt.h +++ b/source/client/inc/clientInt.h @@ -256,6 +256,7 @@ typedef struct SRequestObj { bool validateOnly; // todo refactor bool killed; bool inRetry; + bool isSubReq; uint32_t prevCode; // previous error code: todo refactor, add update flag for catalog uint32_t retry; int64_t allocatorRefId; @@ -398,6 +399,7 @@ void restartAsyncQuery(SRequestObj *pRequest, int32_t code); int32_t buildPreviousRequest(SRequestObj *pRequest, const char* sql, SRequestObj** pNewRequest); int32_t prepareAndParseSqlSyntax(SSqlCallbackWrapper **ppWrapper, SRequestObj *pRequest, bool updateMetaForce); void returnToUser(SRequestObj* pRequest); +void stopAllQueries(SRequestObj *pRequest); #ifdef __cplusplus } diff --git a/source/client/src/clientEnv.c b/source/client/src/clientEnv.c index 4f883ca1f4..c64bbfbdb6 100644 --- a/source/client/src/clientEnv.c +++ b/source/client/src/clientEnv.c @@ -363,6 +363,11 @@ void destroySubRequests(SRequestObj *pRequest) { int32_t reqIdx = -1; SRequestObj *pReqList[16] = {NULL}; uint64_t tmpRefId = 0; + + if (pRequest->relation.userRefId && pRequest->relation.userRefId != pRequest->self) { + return; + } + SRequestObj* pTmp = pRequest; while (pTmp->relation.prevRefId) { tmpRefId = pTmp->relation.prevRefId; @@ -454,6 +459,63 @@ void destroyRequest(SRequestObj *pRequest) { removeRequest(pRequest->self); } +void taosStopQueryImpl(SRequestObj *pRequest) { + pRequest->killed = true; + + // It is not a query, no need to stop. + if (NULL == pRequest->pQuery || QUERY_EXEC_MODE_SCHEDULE != pRequest->pQuery->execMode) { + tscDebug("request 0x%" PRIx64 " no need to be killed since not query", pRequest->requestId); + return; + } + + schedulerFreeJob(&pRequest->body.queryJob, TSDB_CODE_TSC_QUERY_KILLED); + tscDebug("request %" PRIx64 " killed", pRequest->requestId); +} + +void stopAllQueries(SRequestObj *pRequest) { + int32_t reqIdx = -1; + SRequestObj *pReqList[16] = {NULL}; + uint64_t tmpRefId = 0; + + if (pRequest->relation.userRefId && pRequest->relation.userRefId != pRequest->self) { + return; + } + + SRequestObj* pTmp = pRequest; + while (pTmp->relation.prevRefId) { + tmpRefId = pTmp->relation.prevRefId; + pTmp = acquireRequest(tmpRefId); + if (pTmp) { + pReqList[++reqIdx] = pTmp; + releaseRequest(tmpRefId); + } else { + tscError("0x%" PRIx64 ", prev req ref 0x%" PRIx64 " is not there, reqId:0x%" PRIx64, pTmp->self, + tmpRefId, pTmp->requestId); + break; + } + } + + for (int32_t i = reqIdx; i >= 0; i--) { + taosStopQueryImpl(pReqList[i]); + } + + taosStopQueryImpl(pRequest); + + tmpRefId = pRequest->relation.nextRefId; + while (tmpRefId) { + pTmp = acquireRequest(tmpRefId); + if (pTmp) { + tmpRefId = pTmp->relation.nextRefId; + taosStopQueryImpl(pTmp); + releaseRequest(pTmp->self); + } else { + tscError("0x%" PRIx64 " is not there", tmpRefId); + break; + } + } +} + + void crashReportThreadFuncUnexpectedStopped(void) { atomic_store_32(&clientStop, -1); } static void *tscCrashReportThreadFp(void *param) { diff --git a/source/client/src/clientHb.c b/source/client/src/clientHb.c index 2dddfec2bd..cbfa48b322 100644 --- a/source/client/src/clientHb.c +++ b/source/client/src/clientHb.c @@ -464,6 +464,7 @@ int32_t hbBuildQueryDesc(SQueryHbReqBasic *hbBasic, STscObj *pObj) { desc.useconds = now - pRequest->metric.start; desc.reqRid = pRequest->self; desc.stableQuery = pRequest->stableQuery; + desc.isSubQuery = pRequest->isSubReq; taosGetFqdn(desc.fqdn); desc.subPlanNum = pRequest->body.subplanNum; diff --git a/source/client/src/clientImpl.c b/source/client/src/clientImpl.c index de14266099..2a73156e8a 100644 --- a/source/client/src/clientImpl.c +++ b/source/client/src/clientImpl.c @@ -243,6 +243,7 @@ int32_t buildPreviousRequest(SRequestObj *pRequest, const char* sql, SRequestObj pRequest->relation.prevRefId = (*pNewRequest)->self; (*pNewRequest)->relation.nextRefId = pRequest->self; (*pNewRequest)->relation.userRefId = pRequest->self; + (*pNewRequest)->isSubReq = true; } return code; } diff --git a/source/client/src/clientMain.c b/source/client/src/clientMain.c index ca4acf3edf..7573fd5968 100644 --- a/source/client/src/clientMain.c +++ b/source/client/src/clientMain.c @@ -563,22 +563,13 @@ int taos_select_db(TAOS *taos, const char *db) { return code; } + void taos_stop_query(TAOS_RES *res) { if (res == NULL || TD_RES_TMQ(res) || TD_RES_TMQ_META(res) || TD_RES_TMQ_METADATA(res)) { return; } - SRequestObj *pRequest = (SRequestObj *)res; - pRequest->killed = true; - - // It is not a query, no need to stop. - if (NULL == pRequest->pQuery || QUERY_EXEC_MODE_SCHEDULE != pRequest->pQuery->execMode) { - tscDebug("request 0x%" PRIx64 " no need to be killed since not query", pRequest->requestId); - return; - } - - schedulerFreeJob(&pRequest->body.queryJob, TSDB_CODE_TSC_QUERY_KILLED); - tscDebug("request %" PRIx64 " killed", pRequest->requestId); + stopAllQueries((SRequestObj*)res); } bool taos_is_null(TAOS_RES *res, int32_t row, int32_t col) { @@ -860,7 +851,7 @@ int32_t cloneCatalogReq(SCatalogReq* * ppTarget, SCatalogReq* pSrc) { void handleSubQueryFromAnalyse(SSqlCallbackWrapper *pWrapper, SMetaData *pResultMeta, SNode* pRoot) { SRequestObj* pNewRequest = NULL; SSqlCallbackWrapper* pNewWrapper = NULL; - int32_t code = buildPreviousRequest(pWrapper->pRequest, "", &pNewRequest); + int32_t code = buildPreviousRequest(pWrapper->pRequest, pWrapper->pRequest->sqlstr, &pNewRequest); if (code) { handleQueryAnslyseRes(pWrapper, pResultMeta, code); return; diff --git a/source/common/src/systable.c b/source/common/src/systable.c index db0cc78de6..f751a9aa9b 100644 --- a/source/common/src/systable.c +++ b/source/common/src/systable.c @@ -381,6 +381,7 @@ static const SSysDbTableSchema querySchema[] = { {.name = "create_time", .bytes = 8, .type = TSDB_DATA_TYPE_TIMESTAMP, .sysInfo = false}, {.name = "exec_usec", .bytes = 8, .type = TSDB_DATA_TYPE_BIGINT, .sysInfo = false}, {.name = "stable_query", .bytes = 1, .type = TSDB_DATA_TYPE_BOOL, .sysInfo = false}, + {.name = "sub_query", .bytes = 1, .type = TSDB_DATA_TYPE_BOOL, .sysInfo = false}, {.name = "sub_num", .bytes = 4, .type = TSDB_DATA_TYPE_INT, .sysInfo = false}, {.name = "sub_status", .bytes = TSDB_SHOW_SUBQUERY_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false}, {.name = "sql", .bytes = TSDB_SHOW_SQL_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false}, diff --git a/source/common/src/tmsg.c b/source/common/src/tmsg.c index cd874dad50..c7db62cdec 100644 --- a/source/common/src/tmsg.c +++ b/source/common/src/tmsg.c @@ -224,6 +224,7 @@ static int32_t tSerializeSClientHbReq(SEncoder *pEncoder, const SClientHbReq *pR if (tEncodeI64(pEncoder, desc->stime) < 0) return -1; if (tEncodeI64(pEncoder, desc->reqRid) < 0) return -1; if (tEncodeI8(pEncoder, desc->stableQuery) < 0) return -1; + if (tEncodeI8(pEncoder, desc->isSubQuery) < 0) return -1; if (tEncodeCStr(pEncoder, desc->fqdn) < 0) return -1; if (tEncodeI32(pEncoder, desc->subPlanNum) < 0) return -1; @@ -291,6 +292,7 @@ static int32_t tDeserializeSClientHbReq(SDecoder *pDecoder, SClientHbReq *pReq) if (tDecodeI64(pDecoder, &desc.stime) < 0) return -1; if (tDecodeI64(pDecoder, &desc.reqRid) < 0) return -1; if (tDecodeI8(pDecoder, (int8_t *)&desc.stableQuery) < 0) return -1; + if (tDecodeI8(pDecoder, (int8_t *)&desc.isSubQuery) < 0) return -1; if (tDecodeCStrTo(pDecoder, desc.fqdn) < 0) return -1; if (tDecodeI32(pDecoder, &desc.subPlanNum) < 0) return -1; diff --git a/source/dnode/mnode/impl/src/mndProfile.c b/source/dnode/mnode/impl/src/mndProfile.c index a1d815189c..460e75b422 100644 --- a/source/dnode/mnode/impl/src/mndProfile.c +++ b/source/dnode/mnode/impl/src/mndProfile.c @@ -834,6 +834,9 @@ static int32_t mndRetrieveQueries(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *p pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); colDataSetVal(pColInfo, numOfRows, (const char *)&pQuery->stableQuery, false); + pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); + colDataSetVal(pColInfo, numOfRows, (const char *)&pQuery->isSubQuery, false); + pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); colDataSetVal(pColInfo, numOfRows, (const char *)&pQuery->subPlanNum, false); diff --git a/source/libs/executor/src/exchangeoperator.c b/source/libs/executor/src/exchangeoperator.c index 94041140d4..b2d1790faf 100644 --- a/source/libs/executor/src/exchangeoperator.c +++ b/source/libs/executor/src/exchangeoperator.c @@ -204,6 +204,8 @@ static SSDataBlock* loadRemoteData(SOperatorInfo* pOperator) { SExchangeInfo* pExchangeInfo = pOperator->info; SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; + taosSsleep(5); + if (pOperator->status == OP_EXEC_DONE) { return NULL; } From 7adea3570641953e54bd443276d4b08679986915 Mon Sep 17 00:00:00 2001 From: dapan1121 Date: Mon, 12 Jun 2023 17:18:38 +0800 Subject: [PATCH 3/5] fix: remove debug info --- source/libs/executor/src/exchangeoperator.c | 2 -- 1 file changed, 2 deletions(-) diff --git a/source/libs/executor/src/exchangeoperator.c b/source/libs/executor/src/exchangeoperator.c index b2d1790faf..94041140d4 100644 --- a/source/libs/executor/src/exchangeoperator.c +++ b/source/libs/executor/src/exchangeoperator.c @@ -204,8 +204,6 @@ static SSDataBlock* loadRemoteData(SOperatorInfo* pOperator) { SExchangeInfo* pExchangeInfo = pOperator->info; SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; - taosSsleep(5); - if (pOperator->status == OP_EXEC_DONE) { return NULL; } From 48b9cac75c4a81050463616a13075b3cdd2b9c3b Mon Sep 17 00:00:00 2001 From: dapan1121 Date: Tue, 13 Jun 2023 11:44:13 +0800 Subject: [PATCH 4/5] fix: fix no data in db issue --- source/libs/parser/src/parTranslater.c | 20 +++++++++++++------- source/libs/parser/test/parInitialCTest.cpp | 6 +++--- 2 files changed, 16 insertions(+), 10 deletions(-) diff --git a/source/libs/parser/src/parTranslater.c b/source/libs/parser/src/parTranslater.c index 905b32a5f3..b6191dfae5 100644 --- a/source/libs/parser/src/parTranslater.c +++ b/source/libs/parser/src/parTranslater.c @@ -6932,20 +6932,26 @@ int32_t translatePostCreateStream(SParseContext* pParseCxt, SQuery* pQuery, void SCreateStreamStmt* pStmt = (SCreateStreamStmt*)pQuery->pRoot; STranslateContext cxt = {0}; SInterval interval = {0}; - - if (NULL == pResRow || NULL == pResRow[0]) { - return TSDB_CODE_PAR_INTERNAL_ERROR; - } + int64_t lastTs = 0; int32_t code = initTranslateContext(pParseCxt, NULL, &cxt); if (TSDB_CODE_SUCCESS == code) { code = buildIntervalForCreateStream(pStmt, &interval); } if (TSDB_CODE_SUCCESS == code) { - if (interval.interval > 0) { - pStmt->pReq->lastTs = taosTimeTruncate(*(int64_t*)pResRow[0], &interval); + if (pResRow && pResRow[0]) { + lastTs = *(int64_t*)pResRow[0]; + } else if (interval.interval > 0) { + lastTs = convertTimePrecision(taosGetTimestampMs(), TSDB_TIME_PRECISION_MILLI, interval.precision); } else { - pStmt->pReq->lastTs = *(int64_t*)pResRow[0]; + lastTs = taosGetTimestampMs(); + } + } + if (TSDB_CODE_SUCCESS == code) { + if (interval.interval > 0) { + pStmt->pReq->lastTs = taosTimeTruncate(lastTs, &interval); + } else { + pStmt->pReq->lastTs = lastTs; } code = buildCmdMsg(&cxt, TDMT_MND_CREATE_STREAM, (FSerializeFunc)tSerializeSCMCreateStreamReq, pStmt->pReq); } diff --git a/source/libs/parser/test/parInitialCTest.cpp b/source/libs/parser/test/parInitialCTest.cpp index a4e8bdd87a..f6dfa93ab2 100644 --- a/source/libs/parser/test/parInitialCTest.cpp +++ b/source/libs/parser/test/parInitialCTest.cpp @@ -885,12 +885,12 @@ TEST_F(ParserInitialCTest, createStream) { setCreateStreamReq( "s1", "test", - "create stream if not exists s1 trigger max_delay 20s watermark 10s ignore expired 0 fill_history 1 ignore " + "create stream if not exists s1 trigger max_delay 20s watermark 10s ignore expired 0 fill_history 0 ignore " "update 1 into st3 as select count(*) from t1 interval(10s)", "st3", 1); setStreamOptions(STREAM_CREATE_STABLE_TRUE, STREAM_TRIGGER_MAX_DELAY, 20 * MILLISECOND_PER_SECOND, - 10 * MILLISECOND_PER_SECOND, 0, 1, 1); - run("CREATE STREAM IF NOT EXISTS s1 TRIGGER MAX_DELAY 20s WATERMARK 10s IGNORE EXPIRED 0 FILL_HISTORY 1 IGNORE " + 10 * MILLISECOND_PER_SECOND, 0, 0, 1); + run("CREATE STREAM IF NOT EXISTS s1 TRIGGER MAX_DELAY 20s WATERMARK 10s IGNORE EXPIRED 0 FILL_HISTORY 0 IGNORE " "UPDATE 1 INTO st3 AS SELECT COUNT(*) FROM t1 INTERVAL(10S)"); clearCreateStreamReq(); From b8b3036905e893ec0424445c88eb44f8939be7e5 Mon Sep 17 00:00:00 2001 From: dapan1121 Date: Tue, 13 Jun 2023 13:53:40 +0800 Subject: [PATCH 5/5] fix: fix case issue --- tests/system-test/2-query/odbc.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/system-test/2-query/odbc.py b/tests/system-test/2-query/odbc.py index 5241406b65..9f51bc58da 100644 --- a/tests/system-test/2-query/odbc.py +++ b/tests/system-test/2-query/odbc.py @@ -22,8 +22,8 @@ class TDTestCase: tdSql.execute("insert into db.ctb using db.stb tags(1) (ts, c1) values (now, 1)") tdSql.query("select count(*) from information_schema.ins_columns") - # enterprise version: 288, community version: 280 - tdSql.checkData(0, 0, 288) + # enterprise version: 289, community version: 281 + tdSql.checkData(0, 0, 289) tdSql.query("select * from information_schema.ins_columns where table_name = 'ntb'") tdSql.checkRows(14)