Merge branch '3.0' into feature/3_liaohj
This commit is contained in:
commit
1f7821e64b
|
@ -251,6 +251,7 @@ typedef struct SRequestObj {
|
||||||
uint32_t prevCode; // previous error code: todo refactor, add update flag for catalog
|
uint32_t prevCode; // previous error code: todo refactor, add update flag for catalog
|
||||||
uint32_t retry;
|
uint32_t retry;
|
||||||
int64_t allocatorRefId;
|
int64_t allocatorRefId;
|
||||||
|
SQuery* pQuery;
|
||||||
} SRequestObj;
|
} SRequestObj;
|
||||||
|
|
||||||
typedef struct SSyncQueryParam {
|
typedef struct SSyncQueryParam {
|
||||||
|
|
|
@ -350,6 +350,7 @@ void doDestroyRequest(void *p) {
|
||||||
taosArrayDestroy(pRequest->tableList);
|
taosArrayDestroy(pRequest->tableList);
|
||||||
taosArrayDestroy(pRequest->dbList);
|
taosArrayDestroy(pRequest->dbList);
|
||||||
taosArrayDestroy(pRequest->targetTableList);
|
taosArrayDestroy(pRequest->targetTableList);
|
||||||
|
qDestroyQuery(pRequest->pQuery);
|
||||||
nodesDestroyAllocator(pRequest->allocatorRefId);
|
nodesDestroyAllocator(pRequest->allocatorRefId);
|
||||||
|
|
||||||
destroyQueryExecRes(&pRequest->body.resInfo.execRes);
|
destroyQueryExecRes(&pRequest->body.resInfo.execRes);
|
||||||
|
|
|
@ -670,7 +670,6 @@ typedef struct SqlParseWrapper {
|
||||||
SParseContext *pCtx;
|
SParseContext *pCtx;
|
||||||
SCatalogReq catalogReq;
|
SCatalogReq catalogReq;
|
||||||
SRequestObj *pRequest;
|
SRequestObj *pRequest;
|
||||||
SQuery *pQuery;
|
|
||||||
} SqlParseWrapper;
|
} SqlParseWrapper;
|
||||||
|
|
||||||
static void destoryTablesReq(void *p) {
|
static void destoryTablesReq(void *p) {
|
||||||
|
@ -696,8 +695,8 @@ static void destorySqlParseWrapper(SqlParseWrapper *pWrapper) {
|
||||||
|
|
||||||
void retrieveMetaCallback(SMetaData *pResultMeta, void *param, int32_t code) {
|
void retrieveMetaCallback(SMetaData *pResultMeta, void *param, int32_t code) {
|
||||||
SqlParseWrapper *pWrapper = (SqlParseWrapper *)param;
|
SqlParseWrapper *pWrapper = (SqlParseWrapper *)param;
|
||||||
SQuery *pQuery = pWrapper->pQuery;
|
|
||||||
SRequestObj *pRequest = pWrapper->pRequest;
|
SRequestObj *pRequest = pWrapper->pRequest;
|
||||||
|
SQuery *pQuery = pRequest->pQuery;
|
||||||
|
|
||||||
pRequest->metric.ctgEnd = taosGetTimestampUs();
|
pRequest->metric.ctgEnd = taosGetTimestampUs();
|
||||||
|
|
||||||
|
@ -726,10 +725,10 @@ void retrieveMetaCallback(SMetaData *pResultMeta, void *param, int32_t code) {
|
||||||
tscDebug("0x%" PRIx64 " analysis semantics completed, start async query, reqId:0x%" PRIx64, pRequest->self,
|
tscDebug("0x%" PRIx64 " analysis semantics completed, start async query, reqId:0x%" PRIx64, pRequest->self,
|
||||||
pRequest->requestId);
|
pRequest->requestId);
|
||||||
launchAsyncQuery(pRequest, pQuery, pResultMeta);
|
launchAsyncQuery(pRequest, pQuery, pResultMeta);
|
||||||
qDestroyQuery(pQuery);
|
|
||||||
} else {
|
} else {
|
||||||
destorySqlParseWrapper(pWrapper);
|
destorySqlParseWrapper(pWrapper);
|
||||||
qDestroyQuery(pQuery);
|
qDestroyQuery(pRequest->pQuery);
|
||||||
|
pRequest->pQuery = NULL;
|
||||||
if (NEED_CLIENT_HANDLE_ERROR(code)) {
|
if (NEED_CLIENT_HANDLE_ERROR(code)) {
|
||||||
tscDebug("0x%" PRIx64 " client retry to handle the error, code:%d - %s, tryCount:%d, reqId:0x%" PRIx64,
|
tscDebug("0x%" PRIx64 " client retry to handle the error, code:%d - %s, tryCount:%d, reqId:0x%" PRIx64,
|
||||||
pRequest->self, code, tstrerror(code), pRequest->retry, pRequest->requestId);
|
pRequest->self, code, tstrerror(code), pRequest->retry, pRequest->requestId);
|
||||||
|
@ -802,12 +801,10 @@ void doAsyncQuery(SRequestObj *pRequest, bool updateMetaForce) {
|
||||||
goto _error;
|
goto _error;
|
||||||
}
|
}
|
||||||
|
|
||||||
SQuery *pQuery = NULL;
|
|
||||||
|
|
||||||
pRequest->metric.syntaxStart = taosGetTimestampUs();
|
pRequest->metric.syntaxStart = taosGetTimestampUs();
|
||||||
|
|
||||||
SCatalogReq catalogReq = {.forceUpdate = updateMetaForce, .qNodeRequired = qnodeRequired(pRequest)};
|
SCatalogReq catalogReq = {.forceUpdate = updateMetaForce, .qNodeRequired = qnodeRequired(pRequest)};
|
||||||
code = qParseSqlSyntax(pCxt, &pQuery, &catalogReq);
|
code = qParseSqlSyntax(pCxt, &pRequest->pQuery, &catalogReq);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
goto _error;
|
goto _error;
|
||||||
}
|
}
|
||||||
|
@ -817,9 +814,9 @@ void doAsyncQuery(SRequestObj *pRequest, bool updateMetaForce) {
|
||||||
if (!updateMetaForce) {
|
if (!updateMetaForce) {
|
||||||
STscObj *pTscObj = pRequest->pTscObj;
|
STscObj *pTscObj = pRequest->pTscObj;
|
||||||
SAppClusterSummary *pActivity = &pTscObj->pAppInfo->summary;
|
SAppClusterSummary *pActivity = &pTscObj->pAppInfo->summary;
|
||||||
if (NULL == pQuery->pRoot) {
|
if (NULL == pRequest->pQuery->pRoot) {
|
||||||
atomic_add_fetch_64((int64_t *)&pActivity->numOfInsertsReq, 1);
|
atomic_add_fetch_64((int64_t *)&pActivity->numOfInsertsReq, 1);
|
||||||
} else if (QUERY_NODE_SELECT_STMT == pQuery->pRoot->type) {
|
} else if (QUERY_NODE_SELECT_STMT == pRequest->pQuery->pRoot->type) {
|
||||||
atomic_add_fetch_64((int64_t *)&pActivity->numOfQueryReq, 1);
|
atomic_add_fetch_64((int64_t *)&pActivity->numOfQueryReq, 1);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -831,7 +828,6 @@ void doAsyncQuery(SRequestObj *pRequest, bool updateMetaForce) {
|
||||||
}
|
}
|
||||||
|
|
||||||
pWrapper->pCtx = pCxt;
|
pWrapper->pCtx = pCxt;
|
||||||
pWrapper->pQuery = pQuery;
|
|
||||||
pWrapper->pRequest = pRequest;
|
pWrapper->pRequest = pRequest;
|
||||||
pWrapper->catalogReq = catalogReq;
|
pWrapper->catalogReq = catalogReq;
|
||||||
|
|
||||||
|
|
|
@ -1115,7 +1115,7 @@ static bool compareVal(const char* v, const SStateKeys* pKey) {
|
||||||
if (varDataLen(v) != varDataLen(pKey->pData)) {
|
if (varDataLen(v) != varDataLen(pKey->pData)) {
|
||||||
return false;
|
return false;
|
||||||
} else {
|
} else {
|
||||||
return strncmp(varDataVal(v), varDataVal(pKey->pData), varDataLen(v)) == 0;
|
return memcmp(varDataVal(v), varDataVal(pKey->pData), varDataLen(v)) == 0;
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
return memcmp(pKey->pData, v, pKey->bytes) == 0;
|
return memcmp(pKey->pData, v, pKey->bytes) == 0;
|
||||||
|
|
Loading…
Reference in New Issue