feat: support fill history with sub request
This commit is contained in:
parent
5ed3d5a629
commit
170182fecd
|
@ -1975,6 +1975,7 @@ typedef struct {
|
|||
SArray* fillNullCols; // array of SColLocation
|
||||
int64_t deleteMark;
|
||||
int8_t igUpdate;
|
||||
int64_t lastTs;
|
||||
} SCMCreateStreamReq;
|
||||
|
||||
typedef struct {
|
||||
|
|
|
@ -87,6 +87,7 @@ typedef struct SCatalogReq {
|
|||
bool dNodeRequired; // valid dnode
|
||||
bool svrVerRequired;
|
||||
bool forceUpdate;
|
||||
bool cloned;
|
||||
} SCatalogReq;
|
||||
|
||||
typedef struct SMetaRes {
|
||||
|
|
|
@ -233,6 +233,7 @@ bool fmIsGroupKeyFunc(int32_t funcId);
|
|||
bool fmIsBlockDistFunc(int32_t funcId);
|
||||
|
||||
void getLastCacheDataType(SDataType* pType);
|
||||
SFunctionNode* createFunction(const char* pName, SNodeList* pParameterList);
|
||||
|
||||
int32_t fmGetDistMethod(const SFunctionNode* pFunc, SFunctionNode** pPartialFunc, SFunctionNode** pMergeFunc);
|
||||
|
||||
|
|
|
@ -425,16 +425,18 @@ typedef struct SStreamOptions {
|
|||
} SStreamOptions;
|
||||
|
||||
typedef struct SCreateStreamStmt {
|
||||
ENodeType type;
|
||||
char streamName[TSDB_TABLE_NAME_LEN];
|
||||
char targetDbName[TSDB_DB_NAME_LEN];
|
||||
char targetTabName[TSDB_TABLE_NAME_LEN];
|
||||
bool ignoreExists;
|
||||
SStreamOptions* pOptions;
|
||||
SNode* pQuery;
|
||||
SNodeList* pTags;
|
||||
SNode* pSubtable;
|
||||
SNodeList* pCols;
|
||||
ENodeType type;
|
||||
char streamName[TSDB_TABLE_NAME_LEN];
|
||||
char targetDbName[TSDB_DB_NAME_LEN];
|
||||
char targetTabName[TSDB_TABLE_NAME_LEN];
|
||||
bool ignoreExists;
|
||||
SStreamOptions* pOptions;
|
||||
SNode* pQuery;
|
||||
SNode* pPrevQuery;
|
||||
SNodeList* pTags;
|
||||
SNode* pSubtable;
|
||||
SNodeList* pCols;
|
||||
SCMCreateStreamReq* pReq;
|
||||
} SCreateStreamStmt;
|
||||
|
||||
typedef struct SDropStreamStmt {
|
||||
|
|
|
@ -617,6 +617,7 @@ typedef struct SQueryPlan {
|
|||
int32_t numOfSubplans;
|
||||
SNodeList* pSubplans; // Element is SNodeListNode. The execution level of subplan, starting from 0.
|
||||
SExplainInfo explainInfo;
|
||||
void* pPostPlan;
|
||||
} SQueryPlan;
|
||||
|
||||
const char* dataOrderStr(EDataOrderLevel order);
|
||||
|
|
|
@ -441,7 +441,9 @@ typedef struct SQuery {
|
|||
EQueryExecStage execStage;
|
||||
EQueryExecMode execMode;
|
||||
bool haveResultSet;
|
||||
SNode* pPrevRoot;
|
||||
SNode* pRoot;
|
||||
SNode* pPostRoot;
|
||||
int32_t numOfResCols;
|
||||
SSchema* pResSchema;
|
||||
int8_t precision;
|
||||
|
|
|
@ -74,6 +74,7 @@ int32_t qAnalyseSqlSemantic(SParseContext* pCxt, const struct SCatalogReq* pCata
|
|||
const struct SMetaData* pMetaData, SQuery* pQuery);
|
||||
int32_t qContinueParseSql(SParseContext* pCxt, struct SCatalogReq* pCatalogReq, const struct SMetaData* pMetaData,
|
||||
SQuery* pQuery);
|
||||
int32_t qContinueParsePostQuery(SParseContext* pCxt, SQuery* pQuery, void** pResRow);
|
||||
|
||||
void qDestroyParseContext(SParseContext* pCxt);
|
||||
|
||||
|
|
|
@ -52,6 +52,7 @@ int32_t qCreateQueryPlan(SPlanContext* pCxt, SQueryPlan** pPlan, SArray* pExecNo
|
|||
// @groupId id of a group of datasource subplans of this @pSubplan
|
||||
// @pSource one execution location of this group of datasource subplans
|
||||
int32_t qSetSubplanExecutionNode(SSubplan* pSubplan, int32_t groupId, SDownstreamSourceNode* pSource);
|
||||
int32_t qContinuePlanPostQuery(void *pPostPlan);
|
||||
|
||||
void qClearSubplanExecutionNode(SSubplan* pSubplan);
|
||||
|
||||
|
|
|
@ -227,6 +227,12 @@ typedef struct {
|
|||
STaosxRsp rsp;
|
||||
} SMqTaosxRspObj;
|
||||
|
||||
typedef struct SReqRelInfo {
|
||||
uint64_t userRefId;
|
||||
uint64_t prevRefId;
|
||||
uint64_t nextRefId;
|
||||
} SReqRelInfo;
|
||||
|
||||
typedef struct SRequestObj {
|
||||
int8_t resType; // query or tmq
|
||||
uint64_t requestId;
|
||||
|
@ -254,6 +260,9 @@ typedef struct SRequestObj {
|
|||
uint32_t retry;
|
||||
int64_t allocatorRefId;
|
||||
SQuery* pQuery;
|
||||
void* pPostPlan;
|
||||
SReqRelInfo relation;
|
||||
void* pWrapper;
|
||||
} SRequestObj;
|
||||
|
||||
typedef struct SSyncQueryParam {
|
||||
|
@ -279,6 +288,7 @@ TAOS_RES* taosQueryImplWithReqid(TAOS* taos, const char* sql, bool validateOnly,
|
|||
void taosAsyncQueryImpl(uint64_t connId, const char* sql, __taos_async_fn_t fp, void* param, bool validateOnly);
|
||||
void taosAsyncQueryImplWithReqid(uint64_t connId, const char* sql, __taos_async_fn_t fp, void* param, bool validateOnly,
|
||||
int64_t reqid);
|
||||
void taosAsyncFetchImpl(SRequestObj *pRequest, __taos_async_fn_t fp, void *param);
|
||||
|
||||
int32_t getVersion1BlockMetaSize(const char* p, int32_t numOfCols);
|
||||
|
||||
|
@ -368,6 +378,7 @@ typedef struct SSqlCallbackWrapper {
|
|||
SParseContext* pParseCtx;
|
||||
SCatalogReq* pCatalogReq;
|
||||
SRequestObj* pRequest;
|
||||
void* pPlanInfo;
|
||||
} SSqlCallbackWrapper;
|
||||
|
||||
SRequestObj* launchQueryImpl(SRequestObj* pRequest, SQuery* pQuery, bool keepQuery, void** res);
|
||||
|
@ -382,6 +393,11 @@ int32_t handleCreateTbExecRes(void* res, SCatalog* pCatalog);
|
|||
bool qnodeRequired(SRequestObj* pRequest);
|
||||
void continueInsertFromCsv(SSqlCallbackWrapper* pWrapper, SRequestObj* pRequest);
|
||||
void destorySqlCallbackWrapper(SSqlCallbackWrapper* pWrapper);
|
||||
void handleQueryAnslyseRes(SSqlCallbackWrapper *pWrapper, SMetaData *pResultMeta, int32_t code);
|
||||
void restartAsyncQuery(SRequestObj *pRequest, int32_t code);
|
||||
int32_t buildPreviousRequest(SRequestObj *pRequest, const char* sql, SRequestObj** pNewRequest);
|
||||
int32_t prepareAndParseSqlSyntax(SSqlCallbackWrapper **ppWrapper, SRequestObj *pRequest, bool updateMetaForce);
|
||||
void returnToUser(SRequestObj* pRequest);
|
||||
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
|
|
|
@ -358,6 +358,44 @@ int32_t releaseRequest(int64_t rid) { return taosReleaseRef(clientReqRefPool, ri
|
|||
|
||||
int32_t removeRequest(int64_t rid) { return taosRemoveRef(clientReqRefPool, rid); }
|
||||
|
||||
|
||||
void destroySubRequests(SRequestObj *pRequest) {
|
||||
int32_t reqIdx = -1;
|
||||
SRequestObj *pReqList[16] = {NULL};
|
||||
uint64_t tmpRefId = 0;
|
||||
SRequestObj* pTmp = pRequest;
|
||||
while (pTmp->relation.prevRefId) {
|
||||
tmpRefId = pTmp->relation.prevRefId;
|
||||
pTmp = acquireRequest(tmpRefId);
|
||||
if (pTmp) {
|
||||
pReqList[++reqIdx] = pTmp;
|
||||
releaseRequest(tmpRefId);
|
||||
} else {
|
||||
tscError("0x%" PRIx64 ", prev req ref 0x%" PRIx64 " is not there, reqId:0x%" PRIx64, pTmp->self,
|
||||
tmpRefId, pTmp->requestId);
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
for (int32_t i = reqIdx; i >= 0; i--) {
|
||||
removeRequest(pReqList[i]->self);
|
||||
}
|
||||
|
||||
tmpRefId = pRequest->relation.nextRefId;
|
||||
while (tmpRefId) {
|
||||
pTmp = acquireRequest(tmpRefId);
|
||||
if (pTmp) {
|
||||
tmpRefId = pTmp->relation.nextRefId;
|
||||
removeRequest(pTmp->self);
|
||||
releaseRequest(pTmp->self);
|
||||
} else {
|
||||
tscError("0x%" PRIx64 " is not there", tmpRefId);
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
void doDestroyRequest(void *p) {
|
||||
if (NULL == p) {
|
||||
return;
|
||||
|
@ -368,10 +406,14 @@ void doDestroyRequest(void *p) {
|
|||
uint64_t reqId = pRequest->requestId;
|
||||
tscTrace("begin to destroy request %" PRIx64 " p:%p", reqId, pRequest);
|
||||
|
||||
destroySubRequests(pRequest);
|
||||
|
||||
taosHashRemove(pRequest->pTscObj->pRequests, &pRequest->self, sizeof(pRequest->self));
|
||||
|
||||
schedulerFreeJob(&pRequest->body.queryJob, 0);
|
||||
|
||||
destorySqlCallbackWrapper(pRequest->pWrapper);
|
||||
|
||||
taosMemoryFreeClear(pRequest->msgBuf);
|
||||
taosMemoryFreeClear(pRequest->pDb);
|
||||
|
||||
|
|
|
@ -237,6 +237,16 @@ int32_t buildRequest(uint64_t connId, const char* sql, int sqlLen, void* param,
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t buildPreviousRequest(SRequestObj *pRequest, const char* sql, SRequestObj** pNewRequest) {
|
||||
int32_t code = buildRequest(pRequest->pTscObj->id, sql, strlen(sql), pRequest, pRequest->validateOnly, pNewRequest, 0);
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
pRequest->relation.prevRefId = (*pNewRequest)->self;
|
||||
(*pNewRequest)->relation.nextRefId = pRequest->self;
|
||||
(*pNewRequest)->relation.userRefId = pRequest->self;
|
||||
}
|
||||
return code;
|
||||
}
|
||||
|
||||
int32_t parseSql(SRequestObj* pRequest, bool topicQuery, SQuery** pQuery, SStmtCallback* pStmtCb) {
|
||||
STscObj* pTscObj = pRequest->pTscObj;
|
||||
|
||||
|
@ -878,6 +888,81 @@ static bool incompletaFileParsing(SNode* pStmt) {
|
|||
return QUERY_NODE_VNODE_MODIFY_STMT != nodeType(pStmt) ? false : ((SVnodeModifyOpStmt*)pStmt)->fileProcessing;
|
||||
}
|
||||
|
||||
void continuePostSubQuery(SRequestObj* pRequest, TAOS_ROW row) {
|
||||
SSqlCallbackWrapper* pWrapper = pRequest->pWrapper;
|
||||
int32_t code = nodesAcquireAllocator(pWrapper->pParseCtx->allocatorId);
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
int64_t analyseStart = taosGetTimestampUs();
|
||||
code = qContinueParsePostQuery(pWrapper->pParseCtx, pRequest->pQuery, (void**)row);
|
||||
pRequest->metric.analyseCostUs += taosGetTimestampUs() - analyseStart;
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = qContinuePlanPostQuery(pRequest->pPostPlan);
|
||||
}
|
||||
nodesReleaseAllocator(pWrapper->pParseCtx->allocatorId);
|
||||
|
||||
handleQueryAnslyseRes(pWrapper, NULL, code);
|
||||
}
|
||||
|
||||
void returnToUser(SRequestObj* pRequest) {
|
||||
if (pRequest->relation.userRefId == pRequest->self || 0 == pRequest->relation.userRefId) {
|
||||
// return to client
|
||||
pRequest->body.queryFp(pRequest->body.param, pRequest, pRequest->code);
|
||||
return;
|
||||
}
|
||||
|
||||
SRequestObj* pUserReq = acquireRequest(pRequest->relation.userRefId);
|
||||
if (pUserReq) {
|
||||
pUserReq->code = pRequest->code;
|
||||
// return to client
|
||||
pUserReq->body.queryFp(pUserReq->body.param, pUserReq, pUserReq->code);
|
||||
releaseRequest(pRequest->relation.userRefId);
|
||||
return;
|
||||
} else {
|
||||
tscError("0x%" PRIx64 ", user ref 0x%" PRIx64 " is not there, reqId:0x%" PRIx64, pRequest->self,
|
||||
pRequest->relation.userRefId, pRequest->requestId);
|
||||
}
|
||||
}
|
||||
|
||||
void postSubQueryFetchCb(void* param, TAOS_RES* res, int32_t rowNum) {
|
||||
SRequestObj* pRequest = (SRequestObj*)res;
|
||||
if (pRequest->code) {
|
||||
returnToUser(pRequest);
|
||||
return;
|
||||
}
|
||||
|
||||
TAOS_ROW row = NULL;
|
||||
if (rowNum > 0) {
|
||||
row = taos_fetch_row(res); // for single row only now
|
||||
}
|
||||
|
||||
SRequestObj* pNextReq = acquireRequest(pRequest->relation.nextRefId);
|
||||
if (pNextReq) {
|
||||
continuePostSubQuery(pNextReq, row);
|
||||
releaseRequest(pRequest->relation.nextRefId);
|
||||
} else {
|
||||
tscError("0x%" PRIx64 ", next req ref 0x%" PRIx64 " is not there, reqId:0x%" PRIx64, pRequest->self,
|
||||
pRequest->relation.nextRefId, pRequest->requestId);
|
||||
}
|
||||
}
|
||||
|
||||
void handlePostSubQuery(SSqlCallbackWrapper* pWrapper) {
|
||||
SRequestObj* pRequest = pWrapper->pRequest;
|
||||
if (TD_RES_QUERY(pRequest)) {
|
||||
taosAsyncFetchImpl(pRequest, postSubQueryFetchCb, pWrapper);
|
||||
return;
|
||||
}
|
||||
|
||||
SRequestObj* pNextReq = acquireRequest(pRequest->relation.nextRefId);
|
||||
if (pNextReq) {
|
||||
continuePostSubQuery(pNextReq, NULL);
|
||||
releaseRequest(pRequest->relation.nextRefId);
|
||||
} else {
|
||||
tscError("0x%" PRIx64 ", next req ref 0x%" PRIx64 " is not there, reqId:0x%" PRIx64, pRequest->self,
|
||||
pRequest->relation.nextRefId, pRequest->requestId);
|
||||
}
|
||||
}
|
||||
|
||||
// todo refacto the error code mgmt
|
||||
void schedulerExecCb(SExecResult* pResult, void* param, int32_t code) {
|
||||
SSqlCallbackWrapper* pWrapper = param;
|
||||
|
@ -912,12 +997,7 @@ void schedulerExecCb(SExecResult* pResult, void* param, int32_t code) {
|
|||
if (code != TSDB_CODE_SUCCESS && NEED_CLIENT_HANDLE_ERROR(code) && pRequest->sqlstr != NULL) {
|
||||
tscDebug("0x%" PRIx64 " client retry to handle the error, code:%s, tryCount:%d, reqId:0x%" PRIx64, pRequest->self,
|
||||
tstrerror(code), pRequest->retry, pRequest->requestId);
|
||||
pRequest->prevCode = code;
|
||||
schedulerFreeJob(&pRequest->body.queryJob, 0);
|
||||
qDestroyQuery(pRequest->pQuery);
|
||||
pRequest->pQuery = NULL;
|
||||
destorySqlCallbackWrapper(pWrapper);
|
||||
doAsyncQuery(pRequest, true);
|
||||
restartAsyncQuery(pRequest, code);
|
||||
return;
|
||||
}
|
||||
|
||||
|
@ -938,10 +1018,15 @@ void schedulerExecCb(SExecResult* pResult, void* param, int32_t code) {
|
|||
return;
|
||||
}
|
||||
|
||||
destorySqlCallbackWrapper(pWrapper);
|
||||
if (pRequest->relation.nextRefId) {
|
||||
handlePostSubQuery(pWrapper);
|
||||
} else {
|
||||
destorySqlCallbackWrapper(pWrapper);
|
||||
pRequest->pWrapper = NULL;
|
||||
|
||||
// return to client
|
||||
pRequest->body.queryFp(pRequest->body.param, pRequest, code);
|
||||
// return to client
|
||||
pRequest->body.queryFp(pRequest->body.param, pRequest, code);
|
||||
}
|
||||
}
|
||||
|
||||
SRequestObj* launchQueryImpl(SRequestObj* pRequest, SQuery* pQuery, bool keepQuery, void** res) {
|
||||
|
@ -1049,6 +1134,7 @@ static int32_t asyncExecSchQuery(SRequestObj* pRequest, SQuery* pQuery, SMetaDat
|
|||
pRequest->requestId);
|
||||
} else {
|
||||
pRequest->body.subplanNum = pDag->numOfSubplans;
|
||||
TSWAP(pRequest->pPostPlan, pDag->pPostPlan);
|
||||
}
|
||||
|
||||
pRequest->metric.execStart = taosGetTimestampUs();
|
||||
|
@ -1084,6 +1170,7 @@ static int32_t asyncExecSchQuery(SRequestObj* pRequest, SQuery* pQuery, SMetaDat
|
|||
tscDebug("0x%" PRIx64 " plan not executed, code:%s 0x%" PRIx64, pRequest->self, tstrerror(code),
|
||||
pRequest->requestId);
|
||||
destorySqlCallbackWrapper(pWrapper);
|
||||
pRequest->pWrapper = NULL;
|
||||
if (TSDB_CODE_SUCCESS != code) {
|
||||
pRequest->code = terrno;
|
||||
}
|
||||
|
@ -1103,6 +1190,7 @@ void launchAsyncQuery(SRequestObj* pRequest, SQuery* pQuery, SMetaData* pResultM
|
|||
pRequest->body.execMode = pQuery->execMode;
|
||||
if (QUERY_EXEC_MODE_SCHEDULE != pRequest->body.execMode) {
|
||||
destorySqlCallbackWrapper(pWrapper);
|
||||
pRequest->pWrapper = NULL;
|
||||
}
|
||||
|
||||
if (pQuery->pRoot && !pRequest->inRetry) {
|
||||
|
@ -2402,3 +2490,90 @@ TAOS_RES* taosQueryImplWithReqid(TAOS* taos, const char* sql, bool validateOnly,
|
|||
|
||||
return pRequest;
|
||||
}
|
||||
|
||||
|
||||
static void fetchCallback(void *pResult, void *param, int32_t code) {
|
||||
SRequestObj *pRequest = (SRequestObj *)param;
|
||||
|
||||
SReqResultInfo *pResultInfo = &pRequest->body.resInfo;
|
||||
|
||||
tscDebug("0x%" PRIx64 " enter scheduler fetch cb, code:%d - %s, reqId:0x%" PRIx64, pRequest->self, code,
|
||||
tstrerror(code), pRequest->requestId);
|
||||
|
||||
pResultInfo->pData = pResult;
|
||||
pResultInfo->numOfRows = 0;
|
||||
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
pRequest->code = code;
|
||||
taosMemoryFreeClear(pResultInfo->pData);
|
||||
pRequest->body.fetchFp(pRequest->body.param, pRequest, 0);
|
||||
return;
|
||||
}
|
||||
|
||||
if (pRequest->code != TSDB_CODE_SUCCESS) {
|
||||
taosMemoryFreeClear(pResultInfo->pData);
|
||||
pRequest->body.fetchFp(pRequest->body.param, pRequest, 0);
|
||||
return;
|
||||
}
|
||||
|
||||
pRequest->code =
|
||||
setQueryResultFromRsp(pResultInfo, (const SRetrieveTableRsp *)pResultInfo->pData, pResultInfo->convertUcs4, true);
|
||||
if (pRequest->code != TSDB_CODE_SUCCESS) {
|
||||
pResultInfo->numOfRows = 0;
|
||||
pRequest->code = code;
|
||||
tscError("0x%" PRIx64 " fetch results failed, code:%s, reqId:0x%" PRIx64, pRequest->self, tstrerror(code),
|
||||
pRequest->requestId);
|
||||
} else {
|
||||
tscDebug("0x%" PRIx64 " fetch results, numOfRows:%" PRId64 " total Rows:%" PRId64 ", complete:%d, reqId:0x%" PRIx64,
|
||||
pRequest->self, pResultInfo->numOfRows, pResultInfo->totalRows, pResultInfo->completed,
|
||||
pRequest->requestId);
|
||||
|
||||
STscObj *pTscObj = pRequest->pTscObj;
|
||||
SAppClusterSummary *pActivity = &pTscObj->pAppInfo->summary;
|
||||
atomic_add_fetch_64((int64_t *)&pActivity->fetchBytes, pRequest->body.resInfo.payloadLen);
|
||||
}
|
||||
|
||||
pRequest->body.fetchFp(pRequest->body.param, pRequest, pResultInfo->numOfRows);
|
||||
}
|
||||
|
||||
void taosAsyncFetchImpl(SRequestObj *pRequest, __taos_async_fn_t fp, void *param) {
|
||||
pRequest->body.fetchFp = fp;
|
||||
pRequest->body.param = param;
|
||||
|
||||
SReqResultInfo *pResultInfo = &pRequest->body.resInfo;
|
||||
|
||||
// this query has no results or error exists, return directly
|
||||
if (taos_num_fields(pRequest) == 0 || pRequest->code != TSDB_CODE_SUCCESS) {
|
||||
pResultInfo->numOfRows = 0;
|
||||
pRequest->body.fetchFp(param, pRequest, pResultInfo->numOfRows);
|
||||
return;
|
||||
}
|
||||
|
||||
// all data has returned to App already, no need to try again
|
||||
if (pResultInfo->completed) {
|
||||
// it is a local executed query, no need to do async fetch
|
||||
if (QUERY_EXEC_MODE_SCHEDULE != pRequest->body.execMode) {
|
||||
if (pResultInfo->localResultFetched) {
|
||||
pResultInfo->numOfRows = 0;
|
||||
pResultInfo->current = 0;
|
||||
} else {
|
||||
pResultInfo->localResultFetched = true;
|
||||
}
|
||||
} else {
|
||||
pResultInfo->numOfRows = 0;
|
||||
}
|
||||
|
||||
pRequest->body.fetchFp(param, pRequest, pResultInfo->numOfRows);
|
||||
return;
|
||||
}
|
||||
|
||||
SSchedulerReq req = {
|
||||
.syncReq = false,
|
||||
.fetchFp = fetchCallback,
|
||||
.cbParam = pRequest,
|
||||
};
|
||||
|
||||
schedulerFetchRows(pRequest->body.queryJob, &req);
|
||||
}
|
||||
|
||||
|
||||
|
|
|
@ -774,8 +774,13 @@ static void destoryCatalogReq(SCatalogReq *pCatalogReq) {
|
|||
taosArrayDestroy(pCatalogReq->pDbVgroup);
|
||||
taosArrayDestroy(pCatalogReq->pDbCfg);
|
||||
taosArrayDestroy(pCatalogReq->pDbInfo);
|
||||
taosArrayDestroyEx(pCatalogReq->pTableMeta, destoryTablesReq);
|
||||
taosArrayDestroyEx(pCatalogReq->pTableHash, destoryTablesReq);
|
||||
if (pCatalogReq->cloned) {
|
||||
taosArrayDestroy(pCatalogReq->pTableMeta);
|
||||
taosArrayDestroy(pCatalogReq->pTableHash);
|
||||
} else {
|
||||
taosArrayDestroyEx(pCatalogReq->pTableMeta, destoryTablesReq);
|
||||
taosArrayDestroyEx(pCatalogReq->pTableHash, destoryTablesReq);
|
||||
}
|
||||
taosArrayDestroy(pCatalogReq->pUdf);
|
||||
taosArrayDestroy(pCatalogReq->pIndex);
|
||||
taosArrayDestroy(pCatalogReq->pUser);
|
||||
|
@ -794,26 +799,108 @@ void destorySqlCallbackWrapper(SSqlCallbackWrapper *pWrapper) {
|
|||
taosMemoryFree(pWrapper);
|
||||
}
|
||||
|
||||
void destroyCtxInRequest(SRequestObj* pRequest) {
|
||||
schedulerFreeJob(&pRequest->body.queryJob, 0);
|
||||
qDestroyQuery(pRequest->pQuery);
|
||||
pRequest->pQuery = NULL;
|
||||
destorySqlCallbackWrapper(pRequest->pWrapper);
|
||||
pRequest->pWrapper = NULL;
|
||||
}
|
||||
|
||||
|
||||
static void doAsyncQueryFromAnalyse(SMetaData *pResultMeta, void *param, int32_t code) {
|
||||
SSqlCallbackWrapper *pWrapper = (SSqlCallbackWrapper *)param;
|
||||
SRequestObj *pRequest = pWrapper->pRequest;
|
||||
SQuery *pQuery = pRequest->pQuery;
|
||||
|
||||
int64_t analyseStart = taosGetTimestampUs();
|
||||
pRequest->metric.ctgCostUs = analyseStart - pRequest->metric.ctgStart;
|
||||
qDebug("0x%" PRIx64 " start to semantic analysis, reqId:0x%" PRIx64, pRequest->self, pRequest->requestId);
|
||||
|
||||
if (code == TSDB_CODE_SUCCESS) {
|
||||
int64_t analyseStart = taosGetTimestampUs();
|
||||
pRequest->metric.ctgCostUs = analyseStart - pRequest->metric.ctgStart;
|
||||
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = qAnalyseSqlSemantic(pWrapper->pParseCtx, pWrapper->pCatalogReq, pResultMeta, pQuery);
|
||||
}
|
||||
|
||||
pRequest->metric.analyseCostUs += taosGetTimestampUs() - analyseStart;
|
||||
|
||||
handleQueryAnslyseRes(pWrapper, pResultMeta, code);
|
||||
}
|
||||
|
||||
int32_t cloneCatalogReq(SCatalogReq* * ppTarget, SCatalogReq* pSrc) {
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
SCatalogReq* pTarget = taosMemoryCalloc(1, sizeof(SCatalogReq));
|
||||
if (pTarget == NULL) {
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
} else {
|
||||
pTarget->pDbVgroup = taosArrayDup(pSrc->pDbVgroup, NULL);
|
||||
pTarget->pDbCfg = taosArrayDup(pSrc->pDbCfg, NULL);
|
||||
pTarget->pDbInfo = taosArrayDup(pSrc->pDbInfo, NULL);
|
||||
pTarget->pTableMeta = taosArrayDup(pSrc->pTableMeta, NULL);
|
||||
pTarget->pTableHash = taosArrayDup(pSrc->pTableHash, NULL);
|
||||
pTarget->pUdf = taosArrayDup(pSrc->pUdf, NULL);
|
||||
pTarget->pIndex = taosArrayDup(pSrc->pIndex, NULL);
|
||||
pTarget->pUser = taosArrayDup(pSrc->pUser, NULL);
|
||||
pTarget->pTableIndex = taosArrayDup(pSrc->pTableIndex, NULL);
|
||||
pTarget->pTableCfg = taosArrayDup(pSrc->pTableCfg, NULL);
|
||||
pTarget->pTableTag = taosArrayDup(pSrc->pTableTag, NULL);
|
||||
pTarget->qNodeRequired = pSrc->qNodeRequired;
|
||||
pTarget->dNodeRequired = pSrc->dNodeRequired;
|
||||
pTarget->svrVerRequired = pSrc->svrVerRequired;
|
||||
pTarget->forceUpdate = pSrc->forceUpdate;
|
||||
pTarget->cloned = true;
|
||||
|
||||
*ppTarget = pTarget;
|
||||
}
|
||||
|
||||
return code;
|
||||
}
|
||||
|
||||
|
||||
void handleSubQueryFromAnalyse(SSqlCallbackWrapper *pWrapper, SMetaData *pResultMeta, SNode* pRoot) {
|
||||
SRequestObj* pNewRequest = NULL;
|
||||
SSqlCallbackWrapper* pNewWrapper = NULL;
|
||||
int32_t code = buildPreviousRequest(pWrapper->pRequest, "", &pNewRequest);
|
||||
if (code) {
|
||||
handleQueryAnslyseRes(pWrapper, pResultMeta, code);
|
||||
return;
|
||||
}
|
||||
|
||||
pNewRequest->pQuery = (SQuery*)nodesMakeNode(QUERY_NODE_QUERY);
|
||||
if (NULL == pNewRequest->pQuery) {
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
} else {
|
||||
pNewRequest->pQuery->pRoot = pRoot;
|
||||
pRoot = NULL;
|
||||
pNewRequest->pQuery->execStage = QUERY_EXEC_STAGE_ANALYSE;
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = prepareAndParseSqlSyntax(&pNewWrapper, pNewRequest, false);
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = cloneCatalogReq(&pNewWrapper->pCatalogReq, pWrapper->pCatalogReq);
|
||||
}
|
||||
doAsyncQueryFromAnalyse(pResultMeta, pNewWrapper, code);
|
||||
nodesDestroyNode(pRoot);
|
||||
}
|
||||
|
||||
void handleQueryAnslyseRes(SSqlCallbackWrapper *pWrapper, SMetaData *pResultMeta, int32_t code) {
|
||||
SRequestObj *pRequest = pWrapper->pRequest;
|
||||
SQuery *pQuery = pRequest->pQuery;
|
||||
|
||||
if (code == TSDB_CODE_SUCCESS && pQuery->pPrevRoot) {
|
||||
SNode* prevRoot = pQuery->pPrevRoot;
|
||||
pQuery->pPrevRoot = NULL;
|
||||
handleSubQueryFromAnalyse(pWrapper, pResultMeta, prevRoot);
|
||||
return;
|
||||
}
|
||||
|
||||
if (code == TSDB_CODE_SUCCESS) {
|
||||
pRequest->stableQuery = pQuery->stableQuery;
|
||||
if (pQuery->pRoot) {
|
||||
pRequest->stmtType = pQuery->pRoot->type;
|
||||
}
|
||||
}
|
||||
|
||||
pRequest->metric.analyseCostUs = taosGetTimestampUs() - analyseStart;
|
||||
|
||||
if (code == TSDB_CODE_SUCCESS) {
|
||||
if (pQuery->haveResultSet) {
|
||||
setResSchemaInfo(&pRequest->body.resInfo, pQuery->pResSchema, pQuery->numOfResCols);
|
||||
setResPrecision(&pRequest->body.resInfo, pQuery->precision);
|
||||
|
@ -826,14 +913,14 @@ static void doAsyncQueryFromAnalyse(SMetaData *pResultMeta, void *param, int32_t
|
|||
launchAsyncQuery(pRequest, pQuery, pResultMeta, pWrapper);
|
||||
} else {
|
||||
destorySqlCallbackWrapper(pWrapper);
|
||||
pRequest->pWrapper = NULL;
|
||||
qDestroyQuery(pRequest->pQuery);
|
||||
pRequest->pQuery = NULL;
|
||||
|
||||
if (NEED_CLIENT_HANDLE_ERROR(code)) {
|
||||
tscDebug("0x%" PRIx64 " client retry to handle the error, code:%d - %s, tryCount:%d, reqId:0x%" PRIx64,
|
||||
pRequest->self, code, tstrerror(code), pRequest->retry, pRequest->requestId);
|
||||
pRequest->prevCode = code;
|
||||
doAsyncQuery(pRequest, true);
|
||||
restartAsyncQuery(pRequest, code);
|
||||
return;
|
||||
}
|
||||
|
||||
|
@ -841,7 +928,7 @@ static void doAsyncQueryFromAnalyse(SMetaData *pResultMeta, void *param, int32_t
|
|||
tscError("0x%" PRIx64 " error occurs, code:%s, return to user app, reqId:0x%" PRIx64, pRequest->self,
|
||||
tstrerror(code), pRequest->requestId);
|
||||
pRequest->code = code;
|
||||
pRequest->body.queryFp(pRequest->body.param, pRequest, code);
|
||||
returnToUser(pRequest);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -904,6 +991,7 @@ static void doAsyncQueryFromParse(SMetaData *pResultMeta, void *param, int32_t c
|
|||
tscError("0x%" PRIx64 " error happens, code:%d - %s, reqId:0x%" PRIx64, pWrapper->pRequest->self, code,
|
||||
tstrerror(code), pWrapper->pRequest->requestId);
|
||||
destorySqlCallbackWrapper(pWrapper);
|
||||
pRequest->pWrapper = NULL;
|
||||
terrno = code;
|
||||
pRequest->code = code;
|
||||
pRequest->body.queryFp(pRequest->body.param, pRequest, code);
|
||||
|
@ -920,6 +1008,7 @@ void continueInsertFromCsv(SSqlCallbackWrapper *pWrapper, SRequestObj *pRequest)
|
|||
tscError("0x%" PRIx64 " error happens, code:%d - %s, reqId:0x%" PRIx64, pWrapper->pRequest->self, code,
|
||||
tstrerror(code), pWrapper->pRequest->requestId);
|
||||
destorySqlCallbackWrapper(pWrapper);
|
||||
pRequest->pWrapper = NULL;
|
||||
terrno = code;
|
||||
pRequest->code = code;
|
||||
pRequest->body.queryFp(pRequest->body.param, pRequest, code);
|
||||
|
@ -967,27 +1056,16 @@ int32_t createParseContext(const SRequestObj *pRequest, SParseContext **pCxt) {
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
void doAsyncQuery(SRequestObj *pRequest, bool updateMetaForce) {
|
||||
int32_t prepareAndParseSqlSyntax(SSqlCallbackWrapper **ppWrapper, SRequestObj *pRequest, bool updateMetaForce) {
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
STscObj *pTscObj = pRequest->pTscObj;
|
||||
SSqlCallbackWrapper *pWrapper = NULL;
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
|
||||
if (pRequest->retry++ > REQUEST_TOTAL_EXEC_TIMES) {
|
||||
code = pRequest->prevCode;
|
||||
terrno = code;
|
||||
pRequest->code = code;
|
||||
tscDebug("call sync query cb with code: %s", tstrerror(code));
|
||||
pRequest->body.queryFp(pRequest->body.param, pRequest, code);
|
||||
return;
|
||||
}
|
||||
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
pWrapper = taosMemoryCalloc(1, sizeof(SSqlCallbackWrapper));
|
||||
if (pWrapper == NULL) {
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
} else {
|
||||
pWrapper->pRequest = pRequest;
|
||||
}
|
||||
SSqlCallbackWrapper *pWrapper = taosMemoryCalloc(1, sizeof(SSqlCallbackWrapper));
|
||||
if (pWrapper == NULL) {
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
} else {
|
||||
pWrapper->pRequest = pRequest;
|
||||
pRequest->pWrapper = pWrapper;
|
||||
*ppWrapper = pWrapper;
|
||||
}
|
||||
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
|
@ -999,7 +1077,7 @@ void doAsyncQuery(SRequestObj *pRequest, bool updateMetaForce) {
|
|||
code = catalogGetHandle(pTscObj->pAppInfo->clusterId, &pWrapper->pParseCtx->pCatalog);
|
||||
}
|
||||
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
if (TSDB_CODE_SUCCESS == code && NULL == pRequest->pQuery) {
|
||||
int64_t syntaxStart = taosGetTimestampUs();
|
||||
|
||||
pWrapper->pCatalogReq = taosMemoryCalloc(1, sizeof(SCatalogReq));
|
||||
|
@ -1014,6 +1092,27 @@ void doAsyncQuery(SRequestObj *pRequest, bool updateMetaForce) {
|
|||
pRequest->metric.parseCostUs += taosGetTimestampUs() - syntaxStart;
|
||||
}
|
||||
|
||||
return code;
|
||||
}
|
||||
|
||||
|
||||
void doAsyncQuery(SRequestObj *pRequest, bool updateMetaForce) {
|
||||
SSqlCallbackWrapper *pWrapper = NULL;
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
|
||||
if (pRequest->retry++ > REQUEST_TOTAL_EXEC_TIMES) {
|
||||
code = pRequest->prevCode;
|
||||
terrno = code;
|
||||
pRequest->code = code;
|
||||
tscDebug("call sync query cb with code: %s", tstrerror(code));
|
||||
pRequest->body.queryFp(pRequest->body.param, pRequest, code);
|
||||
return;
|
||||
}
|
||||
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = prepareAndParseSqlSyntax(&pWrapper, pRequest, updateMetaForce);
|
||||
}
|
||||
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
pRequest->stmtType = pRequest->pQuery->pRoot->type;
|
||||
code = phaseAsyncQuery(pWrapper);
|
||||
|
@ -1023,6 +1122,7 @@ void doAsyncQuery(SRequestObj *pRequest, bool updateMetaForce) {
|
|||
tscError("0x%" PRIx64 " error happens, code:%d - %s, reqId:0x%" PRIx64, pRequest->self, code, tstrerror(code),
|
||||
pRequest->requestId);
|
||||
destorySqlCallbackWrapper(pWrapper);
|
||||
pRequest->pWrapper = NULL;
|
||||
qDestroyQuery(pRequest->pQuery);
|
||||
pRequest->pQuery = NULL;
|
||||
|
||||
|
@ -1040,48 +1140,57 @@ void doAsyncQuery(SRequestObj *pRequest, bool updateMetaForce) {
|
|||
}
|
||||
}
|
||||
|
||||
static void fetchCallback(void *pResult, void *param, int32_t code) {
|
||||
SRequestObj *pRequest = (SRequestObj *)param;
|
||||
|
||||
SReqResultInfo *pResultInfo = &pRequest->body.resInfo;
|
||||
|
||||
tscDebug("0x%" PRIx64 " enter scheduler fetch cb, code:%d - %s, reqId:0x%" PRIx64, pRequest->self, code,
|
||||
tstrerror(code), pRequest->requestId);
|
||||
|
||||
pResultInfo->pData = pResult;
|
||||
pResultInfo->numOfRows = 0;
|
||||
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
pRequest->code = code;
|
||||
taosMemoryFreeClear(pResultInfo->pData);
|
||||
pRequest->body.fetchFp(pRequest->body.param, pRequest, 0);
|
||||
return;
|
||||
void restartAsyncQuery(SRequestObj *pRequest, int32_t code) {
|
||||
int32_t reqIdx = 0;
|
||||
SRequestObj *pReqList[16] = {NULL};
|
||||
SRequestObj *pUserReq = NULL;
|
||||
pReqList[0] = pRequest;
|
||||
uint64_t tmpRefId = 0;
|
||||
SRequestObj* pTmp = pRequest;
|
||||
while (pTmp->relation.prevRefId) {
|
||||
tmpRefId = pTmp->relation.prevRefId;
|
||||
pTmp = acquireRequest(tmpRefId);
|
||||
if (pTmp) {
|
||||
pReqList[++reqIdx] = pTmp;
|
||||
releaseRequest(tmpRefId);
|
||||
} else {
|
||||
tscError("0x%" PRIx64 ", prev req ref 0x%" PRIx64 " is not there, reqId:0x%" PRIx64, pTmp->self,
|
||||
tmpRefId, pTmp->requestId);
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
if (pRequest->code != TSDB_CODE_SUCCESS) {
|
||||
taosMemoryFreeClear(pResultInfo->pData);
|
||||
pRequest->body.fetchFp(pRequest->body.param, pRequest, 0);
|
||||
return;
|
||||
tmpRefId = pRequest->relation.nextRefId;
|
||||
while (tmpRefId) {
|
||||
pTmp = acquireRequest(tmpRefId);
|
||||
if (pTmp) {
|
||||
tmpRefId = pTmp->relation.nextRefId;
|
||||
removeRequest(pTmp->self);
|
||||
releaseRequest(pTmp->self);
|
||||
} else {
|
||||
tscError("0x%" PRIx64 " is not there", tmpRefId);
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
pRequest->code =
|
||||
setQueryResultFromRsp(pResultInfo, (const SRetrieveTableRsp *)pResultInfo->pData, pResultInfo->convertUcs4, true);
|
||||
if (pRequest->code != TSDB_CODE_SUCCESS) {
|
||||
pResultInfo->numOfRows = 0;
|
||||
pRequest->code = code;
|
||||
tscError("0x%" PRIx64 " fetch results failed, code:%s, reqId:0x%" PRIx64, pRequest->self, tstrerror(code),
|
||||
pRequest->requestId);
|
||||
for (int32_t i = reqIdx; i >= 0; i--) {
|
||||
destroyCtxInRequest(pReqList[i]);
|
||||
if (pReqList[i]->relation.userRefId == pReqList[i]->self || 0 == pReqList[i]->relation.userRefId) {
|
||||
pUserReq = pReqList[i];
|
||||
} else {
|
||||
removeRequest(pReqList[i]->self);
|
||||
}
|
||||
}
|
||||
|
||||
if (pUserReq) {
|
||||
pUserReq->prevCode = code;
|
||||
memset(&pUserReq->relation, 0, sizeof(pUserReq->relation));
|
||||
} else {
|
||||
tscDebug("0x%" PRIx64 " fetch results, numOfRows:%" PRId64 " total Rows:%" PRId64 ", complete:%d, reqId:0x%" PRIx64,
|
||||
pRequest->self, pResultInfo->numOfRows, pResultInfo->totalRows, pResultInfo->completed,
|
||||
pRequest->requestId);
|
||||
|
||||
STscObj *pTscObj = pRequest->pTscObj;
|
||||
SAppClusterSummary *pActivity = &pTscObj->pAppInfo->summary;
|
||||
atomic_add_fetch_64((int64_t *)&pActivity->fetchBytes, pRequest->body.resInfo.payloadLen);
|
||||
tscError("user req is missing");
|
||||
return;
|
||||
}
|
||||
|
||||
pRequest->body.fetchFp(pRequest->body.param, pRequest, pResultInfo->numOfRows);
|
||||
doAsyncQuery(pUserReq, true);
|
||||
}
|
||||
|
||||
void taos_fetch_rows_a(TAOS_RES *res, __taos_async_fn_t fp, void *param) {
|
||||
|
@ -1095,43 +1204,8 @@ void taos_fetch_rows_a(TAOS_RES *res, __taos_async_fn_t fp, void *param) {
|
|||
}
|
||||
|
||||
SRequestObj *pRequest = res;
|
||||
pRequest->body.fetchFp = fp;
|
||||
pRequest->body.param = param;
|
||||
|
||||
SReqResultInfo *pResultInfo = &pRequest->body.resInfo;
|
||||
|
||||
// this query has no results or error exists, return directly
|
||||
if (taos_num_fields(pRequest) == 0 || pRequest->code != TSDB_CODE_SUCCESS) {
|
||||
pResultInfo->numOfRows = 0;
|
||||
pRequest->body.fetchFp(param, pRequest, pResultInfo->numOfRows);
|
||||
return;
|
||||
}
|
||||
|
||||
// all data has returned to App already, no need to try again
|
||||
if (pResultInfo->completed) {
|
||||
// it is a local executed query, no need to do async fetch
|
||||
if (QUERY_EXEC_MODE_SCHEDULE != pRequest->body.execMode) {
|
||||
if (pResultInfo->localResultFetched) {
|
||||
pResultInfo->numOfRows = 0;
|
||||
pResultInfo->current = 0;
|
||||
} else {
|
||||
pResultInfo->localResultFetched = true;
|
||||
}
|
||||
} else {
|
||||
pResultInfo->numOfRows = 0;
|
||||
}
|
||||
|
||||
pRequest->body.fetchFp(param, pRequest, pResultInfo->numOfRows);
|
||||
return;
|
||||
}
|
||||
|
||||
SSchedulerReq req = {
|
||||
.syncReq = false,
|
||||
.fetchFp = fetchCallback,
|
||||
.cbParam = pRequest,
|
||||
};
|
||||
|
||||
schedulerFetchRows(pRequest->body.queryJob, &req);
|
||||
taosAsyncFetchImpl(pRequest, fp, param);
|
||||
}
|
||||
|
||||
void taos_fetch_raw_block_a(TAOS_RES *res, __taos_async_fn_t fp, void *param) {
|
||||
|
|
|
@ -6122,6 +6122,7 @@ int32_t tSerializeSCMCreateStreamReq(void *buf, int32_t bufLen, const SCMCreateS
|
|||
}
|
||||
if (tEncodeI64(&encoder, pReq->deleteMark) < 0) return -1;
|
||||
if (tEncodeI8(&encoder, pReq->igUpdate) < 0) return -1;
|
||||
if (tEncodeI64(&encoder, pReq->lastTs) < 0) return -1;
|
||||
|
||||
tEndEncode(&encoder);
|
||||
|
||||
|
@ -6207,6 +6208,7 @@ int32_t tDeserializeSCMCreateStreamReq(void *buf, int32_t bufLen, SCMCreateStrea
|
|||
|
||||
if (tDecodeI64(&decoder, &pReq->deleteMark) < 0) return -1;
|
||||
if (tDecodeI8(&decoder, &pReq->igUpdate) < 0) return -1;
|
||||
if (tDecodeI64(&decoder, &pReq->lastTs) < 0) return -1;
|
||||
|
||||
tEndDecode(&decoder);
|
||||
|
||||
|
@ -6273,6 +6275,9 @@ int32_t tDeserializeSMRecoverStreamReq(void *buf, int32_t bufLen, SMRecoverStrea
|
|||
}
|
||||
|
||||
void tFreeSCMCreateStreamReq(SCMCreateStreamReq *pReq) {
|
||||
if (NULL == pReq) {
|
||||
return;
|
||||
}
|
||||
taosArrayDestroy(pReq->pTags);
|
||||
taosMemoryFreeClear(pReq->sql);
|
||||
taosMemoryFreeClear(pReq->ast);
|
||||
|
|
|
@ -344,7 +344,7 @@ static int32_t getFuncInfo(SFunctionNode* pFunc) {
|
|||
return fmGetFuncInfo(pFunc, msg, sizeof(msg));
|
||||
}
|
||||
|
||||
static SFunctionNode* createFunction(const char* pName, SNodeList* pParameterList) {
|
||||
SFunctionNode* createFunction(const char* pName, SNodeList* pParameterList) {
|
||||
SFunctionNode* pFunc = (SFunctionNode*)nodesMakeNode(QUERY_NODE_FUNCTION);
|
||||
if (NULL == pFunc) {
|
||||
return NULL;
|
||||
|
|
|
@ -953,6 +953,8 @@ void nodesDestroyNode(SNode* pNode) {
|
|||
nodesDestroyNode(pStmt->pQuery);
|
||||
nodesDestroyList(pStmt->pTags);
|
||||
nodesDestroyNode(pStmt->pSubtable);
|
||||
tFreeSCMCreateStreamReq(pStmt->pReq);
|
||||
taosMemoryFreeClear(pStmt->pReq);
|
||||
break;
|
||||
}
|
||||
case QUERY_NODE_DROP_STREAM_STMT: // no pointer field
|
||||
|
@ -1052,6 +1054,7 @@ void nodesDestroyNode(SNode* pNode) {
|
|||
case QUERY_NODE_QUERY: {
|
||||
SQuery* pQuery = (SQuery*)pNode;
|
||||
nodesDestroyNode(pQuery->pRoot);
|
||||
nodesDestroyNode(pQuery->pPostRoot);
|
||||
taosMemoryFreeClear(pQuery->pResSchema);
|
||||
if (NULL != pQuery->pCmdMsg) {
|
||||
taosMemoryFreeClear(pQuery->pCmdMsg->pMsg);
|
||||
|
|
|
@ -34,6 +34,7 @@ int32_t authenticate(SParseContext* pParseCxt, SQuery* pQuery, SParseMetaCache*
|
|||
int32_t translate(SParseContext* pParseCxt, SQuery* pQuery, SParseMetaCache* pMetaCache);
|
||||
int32_t extractResultSchema(const SNode* pRoot, int32_t* numOfCols, SSchema** pSchema);
|
||||
int32_t calculateConstant(SParseContext* pParseCxt, SQuery* pQuery);
|
||||
int32_t translatePostCreateStream(SParseContext* pParseCxt, SQuery* pQuery, void** pResRow);
|
||||
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
|
|
|
@ -384,6 +384,10 @@ static int32_t collectMetaKeyFromCreateStream(SCollectMetaKeyCxt* pCxt, SCreateS
|
|||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = collectMetaKeyFromQuery(pCxt, pStmt->pQuery);
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code && pStmt->pOptions->fillHistory) {
|
||||
SSelectStmt* pSelect = (SSelectStmt*)pStmt->pQuery;
|
||||
code = reserveDbCfgForLastRow(pCxt, pSelect->pFromTable);
|
||||
}
|
||||
return code;
|
||||
}
|
||||
|
||||
|
|
|
@ -53,6 +53,8 @@ typedef struct STranslateContext {
|
|||
bool createStream;
|
||||
bool stableQuery;
|
||||
bool showRewrite;
|
||||
SNode* pPrevRoot;
|
||||
SNode* pPostRoot;
|
||||
} STranslateContext;
|
||||
|
||||
typedef struct SBuildTopicContext {
|
||||
|
@ -276,6 +278,10 @@ static const SSysTableShowAdapter sysTableShowAdapter[] = {
|
|||
static int32_t translateSubquery(STranslateContext* pCxt, SNode* pNode);
|
||||
static int32_t translateQuery(STranslateContext* pCxt, SNode* pNode);
|
||||
static EDealRes translateValue(STranslateContext* pCxt, SValueNode* pVal);
|
||||
static int32_t createSimpleSelectStmtFromProjList(const char* pDb, const char* pTable, SNodeList* pProjectionList, SSelectStmt** pStmt);
|
||||
static int32_t createLastTsSelectStmt(char* pDb, char* pTable, STableMeta* pMeta, SNode** pQuery);
|
||||
static int32_t setQuery(STranslateContext* pCxt, SQuery* pQuery);
|
||||
static int32_t setRefreshMate(STranslateContext* pCxt, SQuery* pQuery);
|
||||
|
||||
static bool afterGroupBy(ESqlClause clause) { return clause > SQL_CLAUSE_GROUP_BY; }
|
||||
|
||||
|
@ -6730,6 +6736,54 @@ static int32_t translateStreamTargetTable(STranslateContext* pCxt, SCreateStream
|
|||
return code;
|
||||
}
|
||||
|
||||
static int32_t createLastTsSelectStmt(char* pDb, char* pTable, STableMeta* pMeta, SNode** pQuery) {
|
||||
SColumnNode* col = (SColumnNode*)nodesMakeNode(QUERY_NODE_COLUMN);
|
||||
if (NULL == col) {
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
|
||||
strcpy(col->tableAlias, pTable);
|
||||
strcpy(col->colName, pMeta->schema[0].name);
|
||||
SNodeList* pParamterList = nodesMakeList();
|
||||
if (NULL == pParamterList) {
|
||||
nodesDestroyNode((SNode *)col);
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
|
||||
int32_t code = nodesListStrictAppend(pParamterList, (SNode *)col);
|
||||
if (code) {
|
||||
nodesDestroyNode((SNode *)col);
|
||||
nodesDestroyList(pParamterList);
|
||||
return code;
|
||||
}
|
||||
|
||||
SNode* pFunc = (SNode*)createFunction("last", pParamterList);
|
||||
if (NULL == pFunc) {
|
||||
nodesDestroyList(pParamterList);
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
|
||||
SNodeList* pProjectionList = nodesMakeList();
|
||||
if (NULL == pProjectionList) {
|
||||
nodesDestroyList(pParamterList);
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
code = nodesListStrictAppend(pProjectionList, pFunc);
|
||||
if (code) {
|
||||
nodesDestroyNode(pFunc);
|
||||
nodesDestroyList(pProjectionList);
|
||||
return code;
|
||||
}
|
||||
|
||||
code = createSimpleSelectStmtFromProjList(pDb, pTable, pProjectionList, (SSelectStmt **)pQuery);
|
||||
if (code) {
|
||||
nodesDestroyList(pProjectionList);
|
||||
return code;
|
||||
}
|
||||
|
||||
return code;
|
||||
}
|
||||
|
||||
static int32_t buildCreateStreamQuery(STranslateContext* pCxt, SCreateStreamStmt* pStmt, SCMCreateStreamReq* pReq) {
|
||||
pCxt->createStream = true;
|
||||
STableMeta* pMeta = NULL;
|
||||
|
@ -6756,6 +6810,18 @@ static int32_t buildCreateStreamQuery(STranslateContext* pCxt, SCreateStreamStmt
|
|||
getSourceDatabase(pStmt->pQuery, pCxt->pParseCxt->acctId, pReq->sourceDB);
|
||||
code = nodesNodeToString(pStmt->pQuery, false, &pReq->ast, NULL);
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code && pStmt->pOptions->fillHistory) {
|
||||
SRealTableNode* pTable = (SRealTableNode*)(((SSelectStmt*)pStmt->pQuery)->pFromTable);
|
||||
code = createLastTsSelectStmt(pTable->table.dbName, pTable->table.tableName, pTable->pMeta, &pStmt->pPrevQuery);
|
||||
/*
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
STranslateContext cxt = {0};
|
||||
int32_t code = initTranslateContext(pCxt->pParseCxt, pCxt->pMetaCache, &cxt);
|
||||
code = translateQuery(&cxt, pStmt->pPrevQuery);
|
||||
destroyTranslateContext(&cxt);
|
||||
}
|
||||
*/
|
||||
}
|
||||
taosMemoryFree(pMeta);
|
||||
return code;
|
||||
}
|
||||
|
@ -6822,13 +6888,80 @@ static int32_t translateCreateStream(STranslateContext* pCxt, SCreateStreamStmt*
|
|||
code = buildCreateStreamReq(pCxt, pStmt, &createReq);
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = buildCmdMsg(pCxt, TDMT_MND_CREATE_STREAM, (FSerializeFunc)tSerializeSCMCreateStreamReq, &createReq);
|
||||
if (NULL == pStmt->pPrevQuery) {
|
||||
code = buildCmdMsg(pCxt, TDMT_MND_CREATE_STREAM, (FSerializeFunc)tSerializeSCMCreateStreamReq, &createReq);
|
||||
} else {
|
||||
pStmt->pReq = taosMemoryMalloc(sizeof(createReq));
|
||||
if (NULL == pStmt->pReq) {
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
} else {
|
||||
memcpy(pStmt->pReq, &createReq, sizeof(createReq));
|
||||
memset(&createReq, 0, sizeof(createReq));
|
||||
TSWAP(pCxt->pPrevRoot, pStmt->pPrevQuery);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
tFreeSCMCreateStreamReq(&createReq);
|
||||
return code;
|
||||
}
|
||||
|
||||
int32_t buildIntervalForCreateStream(SCreateStreamStmt* pStmt, SInterval* pInterval) {
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
if (QUERY_NODE_SELECT_STMT != nodeType(pStmt->pQuery)) {
|
||||
return code;
|
||||
}
|
||||
SSelectStmt* pSelect = (SSelectStmt*)pStmt->pQuery;
|
||||
if (NULL == pSelect->pWindow || QUERY_NODE_INTERVAL_WINDOW != nodeType(pSelect->pWindow)) {
|
||||
return code;
|
||||
}
|
||||
|
||||
SIntervalWindowNode* pWindow = (SIntervalWindowNode*)pSelect->pWindow;
|
||||
pInterval->interval = ((SValueNode*)pWindow->pInterval)->datum.i;
|
||||
pInterval->intervalUnit = ((SValueNode*)pWindow->pInterval)->unit;
|
||||
pInterval->offset = (NULL != pWindow->pOffset ? ((SValueNode*)pWindow->pOffset)->datum.i : 0);
|
||||
pInterval->sliding = (NULL != pWindow->pSliding ? ((SValueNode*)pWindow->pSliding)->datum.i : pInterval->interval);
|
||||
pInterval->slidingUnit =
|
||||
(NULL != pWindow->pSliding ? ((SValueNode*)pWindow->pSliding)->unit : pInterval->intervalUnit);
|
||||
pInterval->precision = ((SColumnNode*)pWindow->pCol)->node.resType.precision;
|
||||
|
||||
return code;
|
||||
}
|
||||
|
||||
int32_t translatePostCreateStream(SParseContext* pParseCxt, SQuery* pQuery, void** pResRow) {
|
||||
SCreateStreamStmt* pStmt = (SCreateStreamStmt*)pQuery->pRoot;
|
||||
STranslateContext cxt = {0};
|
||||
SInterval interval = {0};
|
||||
|
||||
if (NULL == pResRow || NULL == pResRow[0]) {
|
||||
return TSDB_CODE_PAR_INTERNAL_ERROR;
|
||||
}
|
||||
|
||||
int32_t code = initTranslateContext(pParseCxt, NULL, &cxt);
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = buildIntervalForCreateStream(pStmt, &interval);
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
if (interval.interval > 0) {
|
||||
pStmt->pReq->lastTs = taosTimeTruncate(*(int64_t*)pResRow[0], &interval);
|
||||
} else {
|
||||
pStmt->pReq->lastTs = *(int64_t*)pResRow[0];
|
||||
}
|
||||
code = buildCmdMsg(&cxt, TDMT_MND_CREATE_STREAM, (FSerializeFunc)tSerializeSCMCreateStreamReq, pStmt->pReq);
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = setQuery(&cxt, pQuery);
|
||||
}
|
||||
setRefreshMate(&cxt, pQuery);
|
||||
destroyTranslateContext(&cxt);
|
||||
|
||||
tFreeSCMCreateStreamReq(pStmt->pReq);
|
||||
taosMemoryFreeClear(pStmt->pReq);
|
||||
|
||||
return code;
|
||||
}
|
||||
|
||||
|
||||
static int32_t translateDropStream(STranslateContext* pCxt, SDropStreamStmt* pStmt) {
|
||||
SMDropStreamReq dropReq = {0};
|
||||
SName name;
|
||||
|
@ -7509,8 +7642,7 @@ static SNodeList* createProjectCols(int32_t ncols, const char* const pCols[]) {
|
|||
return pProjections;
|
||||
}
|
||||
|
||||
static int32_t createSimpleSelectStmt(const char* pDb, const char* pTable, int32_t numOfProjs,
|
||||
const char* const pProjCol[], SSelectStmt** pStmt) {
|
||||
static int32_t createSimpleSelectStmtImpl(const char* pDb, const char* pTable, SNodeList* pProjectionList, SSelectStmt** pStmt) {
|
||||
SSelectStmt* pSelect = (SSelectStmt*)nodesMakeNode(QUERY_NODE_SELECT_STMT);
|
||||
if (NULL == pSelect) {
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
|
@ -7526,27 +7658,38 @@ static int32_t createSimpleSelectStmt(const char* pDb, const char* pTable, int32
|
|||
snprintf(pRealTable->table.tableName, sizeof(pRealTable->table.tableName), "%s", pTable);
|
||||
snprintf(pRealTable->table.tableAlias, sizeof(pRealTable->table.tableAlias), "%s", pTable);
|
||||
pSelect->pFromTable = (SNode*)pRealTable;
|
||||
|
||||
if (numOfProjs >= 0) {
|
||||
pSelect->pProjectionList = createProjectCols(numOfProjs, pProjCol);
|
||||
if (NULL == pSelect->pProjectionList) {
|
||||
nodesDestroyNode((SNode*)pSelect);
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
}
|
||||
pSelect->pProjectionList = pProjectionList;
|
||||
|
||||
*pStmt = pSelect;
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
|
||||
static int32_t createSimpleSelectStmtFromCols(const char* pDb, const char* pTable, int32_t numOfProjs,
|
||||
const char* const pProjCol[], SSelectStmt** pStmt) {
|
||||
SNodeList* pProjectionList = NULL;
|
||||
if (numOfProjs >= 0) {
|
||||
pProjectionList = createProjectCols(numOfProjs, pProjCol);
|
||||
if (NULL == pProjectionList) {
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
}
|
||||
|
||||
return createSimpleSelectStmtImpl(pDb, pTable, pProjectionList, pStmt);
|
||||
}
|
||||
|
||||
static int32_t createSimpleSelectStmtFromProjList(const char* pDb, const char* pTable, SNodeList* pProjectionList, SSelectStmt** pStmt) {
|
||||
return createSimpleSelectStmtImpl(pDb, pTable, pProjectionList, pStmt);
|
||||
}
|
||||
|
||||
static int32_t createSelectStmtForShow(ENodeType showType, SSelectStmt** pStmt) {
|
||||
const SSysTableShowAdapter* pShow = &sysTableShowAdapter[showType - SYSTABLE_SHOW_TYPE_OFFSET];
|
||||
return createSimpleSelectStmt(pShow->pDbName, pShow->pTableName, pShow->numOfShowCols, pShow->pShowCols, pStmt);
|
||||
return createSimpleSelectStmtFromCols(pShow->pDbName, pShow->pTableName, pShow->numOfShowCols, pShow->pShowCols, pStmt);
|
||||
}
|
||||
|
||||
static int32_t createSelectStmtForShowTableDist(SShowTableDistributedStmt* pStmt, SSelectStmt** pOutput) {
|
||||
return createSimpleSelectStmt(pStmt->dbName, pStmt->tableName, 0, NULL, pOutput);
|
||||
return createSimpleSelectStmtFromCols(pStmt->dbName, pStmt->tableName, 0, NULL, pOutput);
|
||||
}
|
||||
|
||||
static int32_t createOperatorNode(EOperatorType opType, const char* pColName, SNode* pRight, SNode** pOp) {
|
||||
|
@ -7680,7 +7823,7 @@ static int32_t createShowTableTagsProjections(SNodeList** pProjections, SNodeLis
|
|||
static int32_t rewriteShowStableTags(STranslateContext* pCxt, SQuery* pQuery) {
|
||||
SShowTableTagsStmt* pShow = (SShowTableTagsStmt*)pQuery->pRoot;
|
||||
SSelectStmt* pSelect = NULL;
|
||||
int32_t code = createSimpleSelectStmt(((SValueNode*)pShow->pDbName)->literal, ((SValueNode*)pShow->pTbName)->literal,
|
||||
int32_t code = createSimpleSelectStmtFromCols(((SValueNode*)pShow->pDbName)->literal, ((SValueNode*)pShow->pTbName)->literal,
|
||||
-1, NULL, &pSelect);
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = createShowTableTagsProjections(&pSelect->pProjectionList, &pShow->pTags);
|
||||
|
@ -9005,6 +9148,7 @@ static int32_t setQuery(STranslateContext* pCxt, SQuery* pQuery) {
|
|||
}
|
||||
break;
|
||||
default:
|
||||
pQuery->haveResultSet = false;
|
||||
pQuery->execMode = QUERY_EXEC_MODE_RPC;
|
||||
if (NULL != pCxt->pCmdMsg) {
|
||||
TSWAP(pQuery->pCmdMsg, pCxt->pCmdMsg);
|
||||
|
@ -9039,6 +9183,10 @@ int32_t translate(SParseContext* pParseCxt, SQuery* pQuery, SParseMetaCache* pMe
|
|||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = translateQuery(&cxt, pQuery->pRoot);
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code && (cxt.pPrevRoot || cxt.pPostRoot)) {
|
||||
pQuery->pPrevRoot = cxt.pPrevRoot;
|
||||
pQuery->pPostRoot = cxt.pPostRoot;
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = setQuery(&cxt, pQuery);
|
||||
}
|
||||
|
|
|
@ -204,7 +204,7 @@ int32_t qAnalyseSqlSemantic(SParseContext* pCxt, const struct SCatalogReq* pCata
|
|||
const struct SMetaData* pMetaData, SQuery* pQuery) {
|
||||
SParseMetaCache metaCache = {0};
|
||||
int32_t code = nodesAcquireAllocator(pCxt->allocatorId);
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
if (TSDB_CODE_SUCCESS == code && pCatalogReq) {
|
||||
code = putMetaDataToCache(pCatalogReq, pMetaData, &metaCache);
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
|
@ -221,6 +221,19 @@ int32_t qContinueParseSql(SParseContext* pCxt, struct SCatalogReq* pCatalogReq,
|
|||
return parseInsertSql(pCxt, &pQuery, pCatalogReq, pMetaData);
|
||||
}
|
||||
|
||||
int32_t qContinueParsePostQuery(SParseContext* pCxt, SQuery* pQuery, void** pResRow) {
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
switch (nodeType(pQuery->pRoot)) {
|
||||
case QUERY_NODE_CREATE_STREAM_STMT:
|
||||
code = translatePostCreateStream(pCxt, pQuery, pResRow);
|
||||
break;
|
||||
default:
|
||||
break;
|
||||
}
|
||||
|
||||
return code;
|
||||
}
|
||||
|
||||
void qDestroyParseContext(SParseContext* pCxt) {
|
||||
if (NULL == pCxt) {
|
||||
return;
|
||||
|
|
|
@ -97,6 +97,12 @@ static int32_t setSubplanExecutionNode(SPhysiNode* pNode, int32_t groupId, SDown
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t qContinuePlanPostQuery(void *pPostPlan) {
|
||||
//TODO
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
|
||||
int32_t qSetSubplanExecutionNode(SSubplan* subplan, int32_t groupId, SDownstreamSourceNode* pSource) {
|
||||
planDebug("QID:0x%" PRIx64 " set subplan execution node, groupId:%d", subplan->id.queryId, groupId);
|
||||
return setSubplanExecutionNode(subplan->pNode, groupId, pSource);
|
||||
|
|
Loading…
Reference in New Issue