Merge branch 'main' of https://github.com/taosdata/TDengine into main
This commit is contained in:
commit
6de705a58d
|
@ -81,10 +81,6 @@ Set<String> subscription() throws SQLException;
|
|||
|
||||
ConsumerRecords<V> poll(Duration timeout) throws SQLException;
|
||||
|
||||
void commitAsync();
|
||||
|
||||
void commitAsync(OffsetCommitCallback callback);
|
||||
|
||||
void commitSync() throws SQLException;
|
||||
|
||||
void close() throws SQLException;
|
||||
|
|
|
@ -36,15 +36,16 @@ REST connection supports all platforms that can run Java.
|
|||
|
||||
| taos-jdbcdriver version | major changes | TDengine version |
|
||||
| :---------------------: | :------------------------------------------------------------------------------------------------------------------------------------------------: | :--------------: |
|
||||
| 3.2.4 | Subscription add the enable.auto.commit parameter and the unsubscribe() method in the WebSocket connection | 3.0.5.0 or later |
|
||||
| 3.2.3 | Fixed resultSet data parsing failure in some cases | 3.0.5.0 or later |
|
||||
| 3.2.2 | subscription add seek function | 3.0.5.0 or later |
|
||||
| 3.2.2 | Subscription add seek function | 3.0.5.0 or later |
|
||||
| 3.2.1 | JDBC REST connection supports schemaless/prepareStatement over WebSocket | 3.0.3.0 or later |
|
||||
| 3.2.0 | This version has been deprecated | - |
|
||||
| 3.1.0 | JDBC REST connection supports subscription over WebSocket | - |
|
||||
| 3.0.1 - 3.0.4 | fix the resultSet data is parsed incorrectly sometimes. 3.0.1 is compiled on JDK 11, you are advised to use other version in the JDK 8 environment | - |
|
||||
| 3.0.0 | Support for TDengine 3.0 | 3.0.0.0 or later |
|
||||
| 2.0.42 | fix wasNull interface return value in WebSocket connection | - |
|
||||
| 2.0.41 | fix decode method of username and password in REST connection | - |
|
||||
| 2.0.42 | Fix wasNull interface return value in WebSocket connection | - |
|
||||
| 2.0.41 | Fix decode method of username and password in REST connection | - |
|
||||
| 2.0.39 - 2.0.40 | Add REST connection/request timeout parameters | - |
|
||||
| 2.0.38 | JDBC REST connections add bulk pull function | - |
|
||||
| 2.0.37 | Support json tags | - |
|
||||
|
|
|
@ -81,10 +81,6 @@ Set<String> subscription() throws SQLException;
|
|||
|
||||
ConsumerRecords<V> poll(Duration timeout) throws SQLException;
|
||||
|
||||
void commitAsync();
|
||||
|
||||
void commitAsync(OffsetCommitCallback callback);
|
||||
|
||||
void commitSync() throws SQLException;
|
||||
|
||||
void close() throws SQLException;
|
||||
|
|
|
@ -36,6 +36,7 @@ REST 连接支持所有能运行 Java 的平台。
|
|||
|
||||
| taos-jdbcdriver 版本 | 主要变化 | TDengine 版本 |
|
||||
| :------------------: | :----------------------------------------------------------------------------------------------------------------------------------------------------: | :----------------: |
|
||||
| 3.2.4 | 数据订阅在 WebSocket 连接下增加 enable.auto.commit 参数,以及 unsubscribe() 方法。 | - |
|
||||
| 3.2.3 | 修复 ResultSet 在一些情况数据解析失败 | - |
|
||||
| 3.2.2 | 新增功能:数据订阅支持 seek 功能。 | 3.0.5.0 及更高版本 |
|
||||
| 3.2.1 | 新增功能:WebSocket 连接支持 schemaless 与 prepareStatement 写入。变更:consumer poll 返回结果集为 ConsumerRecord,可通过 value() 获取指定结果集数据。 | 3.0.3.0 及更高版本 |
|
||||
|
|
|
@ -46,6 +46,7 @@ typedef struct SRpcHandleInfo {
|
|||
int8_t noResp; // has response or not(default 0, 0: resp, 1: no resp)
|
||||
int8_t persistHandle; // persist handle or not
|
||||
int8_t hasEpSet;
|
||||
int32_t cliVer;
|
||||
|
||||
// app info
|
||||
void *ahandle; // app handle set by client
|
||||
|
@ -83,6 +84,7 @@ typedef struct SRpcInit {
|
|||
int32_t sessions; // number of sessions allowed
|
||||
int8_t connType; // TAOS_CONN_UDP, TAOS_CONN_TCPC, TAOS_CONN_TCPS
|
||||
int32_t idleTime; // milliseconds, 0 means idle timer is disabled
|
||||
int32_t compatibilityVer;
|
||||
|
||||
int32_t retryMinInterval; // retry init interval
|
||||
int32_t retryStepFactor; // retry interval factor
|
||||
|
|
|
@ -29,6 +29,7 @@
|
|||
#include "trpc.h"
|
||||
#include "tsched.h"
|
||||
#include "ttime.h"
|
||||
#include "tversion.h"
|
||||
|
||||
#if defined(CUS_NAME) || defined(CUS_PROMPT) || defined(CUS_EMAIL)
|
||||
#include "cus_name.h"
|
||||
|
@ -111,7 +112,8 @@ static void deregisterRequest(SRequestObj *pRequest) {
|
|||
atomic_add_fetch_64((int64_t *)&pActivity->numOfSlowQueries, 1);
|
||||
if (tsSlowLogScope & reqType) {
|
||||
taosPrintSlowLog("PID:%d, Conn:%u, QID:0x%" PRIx64 ", Start:%" PRId64 ", Duration:%" PRId64 "us, SQL:%s",
|
||||
taosGetPId(), pTscObj->connId, pRequest->requestId, pRequest->metric.start, duration, pRequest->sqlstr);
|
||||
taosGetPId(), pTscObj->connId, pRequest->requestId, pRequest->metric.start, duration,
|
||||
pRequest->sqlstr);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -175,6 +177,8 @@ void *openTransporter(const char *user, const char *auth, int32_t numOfThread) {
|
|||
rpcInit.connLimitNum = connLimitNum;
|
||||
rpcInit.timeToGetConn = tsTimeToGetAvailableConn;
|
||||
|
||||
taosVersionStrToInt(version, &(rpcInit.compatibilityVer));
|
||||
|
||||
void *pDnodeConn = rpcOpen(&rpcInit);
|
||||
if (pDnodeConn == NULL) {
|
||||
tscError("failed to init connection to server");
|
||||
|
@ -358,17 +362,16 @@ int32_t releaseRequest(int64_t rid) { return taosReleaseRef(clientReqRefPool, ri
|
|||
|
||||
int32_t removeRequest(int64_t rid) { return taosRemoveRef(clientReqRefPool, rid); }
|
||||
|
||||
|
||||
void destroySubRequests(SRequestObj *pRequest) {
|
||||
int32_t reqIdx = -1;
|
||||
int32_t reqIdx = -1;
|
||||
SRequestObj *pReqList[16] = {NULL};
|
||||
uint64_t tmpRefId = 0;
|
||||
uint64_t tmpRefId = 0;
|
||||
|
||||
if (pRequest->relation.userRefId && pRequest->relation.userRefId != pRequest->self) {
|
||||
return;
|
||||
}
|
||||
|
||||
SRequestObj* pTmp = pRequest;
|
||||
|
||||
SRequestObj *pTmp = pRequest;
|
||||
while (pTmp->relation.prevRefId) {
|
||||
tmpRefId = pTmp->relation.prevRefId;
|
||||
pTmp = acquireRequest(tmpRefId);
|
||||
|
@ -376,9 +379,9 @@ void destroySubRequests(SRequestObj *pRequest) {
|
|||
pReqList[++reqIdx] = pTmp;
|
||||
releaseRequest(tmpRefId);
|
||||
} else {
|
||||
tscError("0x%" PRIx64 ", prev req ref 0x%" PRIx64 " is not there, reqId:0x%" PRIx64, pTmp->self,
|
||||
tmpRefId, pTmp->requestId);
|
||||
break;
|
||||
tscError("0x%" PRIx64 ", prev req ref 0x%" PRIx64 " is not there, reqId:0x%" PRIx64, pTmp->self, tmpRefId,
|
||||
pTmp->requestId);
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -391,16 +394,15 @@ void destroySubRequests(SRequestObj *pRequest) {
|
|||
pTmp = acquireRequest(tmpRefId);
|
||||
if (pTmp) {
|
||||
tmpRefId = pTmp->relation.nextRefId;
|
||||
removeRequest(pTmp->self);
|
||||
removeRequest(pTmp->self);
|
||||
releaseRequest(pTmp->self);
|
||||
} else {
|
||||
tscError("0x%" PRIx64 " is not there", tmpRefId);
|
||||
break;
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
void doDestroyRequest(void *p) {
|
||||
if (NULL == p) {
|
||||
return;
|
||||
|
@ -412,7 +414,7 @@ void doDestroyRequest(void *p) {
|
|||
tscTrace("begin to destroy request %" PRIx64 " p:%p", reqId, pRequest);
|
||||
|
||||
destroySubRequests(pRequest);
|
||||
|
||||
|
||||
taosHashRemove(pRequest->pTscObj->pRequests, &pRequest->self, sizeof(pRequest->self));
|
||||
|
||||
schedulerFreeJob(&pRequest->body.queryJob, 0);
|
||||
|
@ -473,15 +475,15 @@ void taosStopQueryImpl(SRequestObj *pRequest) {
|
|||
}
|
||||
|
||||
void stopAllQueries(SRequestObj *pRequest) {
|
||||
int32_t reqIdx = -1;
|
||||
int32_t reqIdx = -1;
|
||||
SRequestObj *pReqList[16] = {NULL};
|
||||
uint64_t tmpRefId = 0;
|
||||
uint64_t tmpRefId = 0;
|
||||
|
||||
if (pRequest->relation.userRefId && pRequest->relation.userRefId != pRequest->self) {
|
||||
return;
|
||||
}
|
||||
|
||||
SRequestObj* pTmp = pRequest;
|
||||
|
||||
SRequestObj *pTmp = pRequest;
|
||||
while (pTmp->relation.prevRefId) {
|
||||
tmpRefId = pTmp->relation.prevRefId;
|
||||
pTmp = acquireRequest(tmpRefId);
|
||||
|
@ -489,9 +491,9 @@ void stopAllQueries(SRequestObj *pRequest) {
|
|||
pReqList[++reqIdx] = pTmp;
|
||||
releaseRequest(tmpRefId);
|
||||
} else {
|
||||
tscError("0x%" PRIx64 ", prev req ref 0x%" PRIx64 " is not there, reqId:0x%" PRIx64, pTmp->self,
|
||||
tmpRefId, pTmp->requestId);
|
||||
break;
|
||||
tscError("0x%" PRIx64 ", prev req ref 0x%" PRIx64 " is not there, reqId:0x%" PRIx64, pTmp->self, tmpRefId,
|
||||
pTmp->requestId);
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -510,12 +512,11 @@ void stopAllQueries(SRequestObj *pRequest) {
|
|||
releaseRequest(pTmp->self);
|
||||
} else {
|
||||
tscError("0x%" PRIx64 " is not there", tmpRefId);
|
||||
break;
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
void crashReportThreadFuncUnexpectedStopped(void) { atomic_store_32(&clientStop, -1); }
|
||||
|
||||
static void *tscCrashReportThreadFp(void *param) {
|
||||
|
|
|
@ -26,7 +26,7 @@
|
|||
#include "tpagedbuf.h"
|
||||
#include "tref.h"
|
||||
#include "tsched.h"
|
||||
|
||||
#include "tversion.h"
|
||||
static int32_t initEpSetFromCfg(const char* firstEp, const char* secondEp, SCorEpSet* pEpSet);
|
||||
static SMsgSendInfo* buildConnectMsg(SRequestObj* pRequest);
|
||||
|
||||
|
@ -237,8 +237,9 @@ int32_t buildRequest(uint64_t connId, const char* sql, int sqlLen, void* param,
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t buildPreviousRequest(SRequestObj *pRequest, const char* sql, SRequestObj** pNewRequest) {
|
||||
int32_t code = buildRequest(pRequest->pTscObj->id, sql, strlen(sql), pRequest, pRequest->validateOnly, pNewRequest, 0);
|
||||
int32_t buildPreviousRequest(SRequestObj* pRequest, const char* sql, SRequestObj** pNewRequest) {
|
||||
int32_t code =
|
||||
buildRequest(pRequest->pTscObj->id, sql, strlen(sql), pRequest, pRequest->validateOnly, pNewRequest, 0);
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
pRequest->relation.prevRefId = (*pNewRequest)->self;
|
||||
(*pNewRequest)->relation.nextRefId = pRequest->self;
|
||||
|
@ -502,8 +503,7 @@ void setResSchemaInfo(SReqResultInfo* pResInfo, const SSchema* pSchema, int32_t
|
|||
pResInfo->userFields[i].bytes = pSchema[i].bytes;
|
||||
pResInfo->userFields[i].type = pSchema[i].type;
|
||||
|
||||
if (pSchema[i].type == TSDB_DATA_TYPE_VARCHAR ||
|
||||
pSchema[i].type == TSDB_DATA_TYPE_GEOMETRY) {
|
||||
if (pSchema[i].type == TSDB_DATA_TYPE_VARCHAR || pSchema[i].type == TSDB_DATA_TYPE_GEOMETRY) {
|
||||
pResInfo->userFields[i].bytes -= VARSTR_HEADER_SIZE;
|
||||
} else if (pSchema[i].type == TSDB_DATA_TYPE_NCHAR || pSchema[i].type == TSDB_DATA_TYPE_JSON) {
|
||||
pResInfo->userFields[i].bytes = (pResInfo->userFields[i].bytes - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE;
|
||||
|
@ -891,7 +891,7 @@ static bool incompletaFileParsing(SNode* pStmt) {
|
|||
|
||||
void continuePostSubQuery(SRequestObj* pRequest, TAOS_ROW row) {
|
||||
SSqlCallbackWrapper* pWrapper = pRequest->pWrapper;
|
||||
int32_t code = nodesAcquireAllocator(pWrapper->pParseCtx->allocatorId);
|
||||
int32_t code = nodesAcquireAllocator(pWrapper->pParseCtx->allocatorId);
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
int64_t analyseStart = taosGetTimestampUs();
|
||||
code = qContinueParsePostQuery(pWrapper->pParseCtx, pRequest->pQuery, (void**)row);
|
||||
|
@ -934,7 +934,7 @@ void postSubQueryFetchCb(void* param, TAOS_RES* res, int32_t rowNum) {
|
|||
|
||||
TAOS_ROW row = NULL;
|
||||
if (rowNum > 0) {
|
||||
row = taos_fetch_row(res); // for single row only now
|
||||
row = taos_fetch_row(res); // for single row only now
|
||||
}
|
||||
|
||||
SRequestObj* pNextReq = acquireRequest(pRequest->relation.nextRefId);
|
||||
|
@ -2135,6 +2135,7 @@ TSDB_SERVER_STATUS taos_check_server_status(const char* fqdn, int port, char* de
|
|||
connLimitNum = TMIN(connLimitNum, 500);
|
||||
rpcInit.connLimitNum = connLimitNum;
|
||||
rpcInit.timeToGetConn = tsTimeToGetAvailableConn;
|
||||
taosVersionStrToInt(version, &(rpcInit.compatibilityVer));
|
||||
|
||||
clientRpc = rpcOpen(&rpcInit);
|
||||
if (clientRpc == NULL) {
|
||||
|
@ -2494,11 +2495,10 @@ TAOS_RES* taosQueryImplWithReqid(TAOS* taos, const char* sql, bool validateOnly,
|
|||
return pRequest;
|
||||
}
|
||||
|
||||
static void fetchCallback(void* pResult, void* param, int32_t code) {
|
||||
SRequestObj* pRequest = (SRequestObj*)param;
|
||||
|
||||
static void fetchCallback(void *pResult, void *param, int32_t code) {
|
||||
SRequestObj *pRequest = (SRequestObj *)param;
|
||||
|
||||
SReqResultInfo *pResultInfo = &pRequest->body.resInfo;
|
||||
SReqResultInfo* pResultInfo = &pRequest->body.resInfo;
|
||||
|
||||
tscDebug("0x%" PRIx64 " enter scheduler fetch cb, code:%d - %s, reqId:0x%" PRIx64, pRequest->self, code,
|
||||
tstrerror(code), pRequest->requestId);
|
||||
|
@ -2520,7 +2520,7 @@ static void fetchCallback(void *pResult, void *param, int32_t code) {
|
|||
}
|
||||
|
||||
pRequest->code =
|
||||
setQueryResultFromRsp(pResultInfo, (const SRetrieveTableRsp *)pResultInfo->pData, pResultInfo->convertUcs4, true);
|
||||
setQueryResultFromRsp(pResultInfo, (const SRetrieveTableRsp*)pResultInfo->pData, pResultInfo->convertUcs4, true);
|
||||
if (pRequest->code != TSDB_CODE_SUCCESS) {
|
||||
pResultInfo->numOfRows = 0;
|
||||
pRequest->code = code;
|
||||
|
@ -2531,19 +2531,19 @@ static void fetchCallback(void *pResult, void *param, int32_t code) {
|
|||
pRequest->self, pResultInfo->numOfRows, pResultInfo->totalRows, pResultInfo->completed,
|
||||
pRequest->requestId);
|
||||
|
||||
STscObj *pTscObj = pRequest->pTscObj;
|
||||
SAppClusterSummary *pActivity = &pTscObj->pAppInfo->summary;
|
||||
atomic_add_fetch_64((int64_t *)&pActivity->fetchBytes, pRequest->body.resInfo.payloadLen);
|
||||
STscObj* pTscObj = pRequest->pTscObj;
|
||||
SAppClusterSummary* pActivity = &pTscObj->pAppInfo->summary;
|
||||
atomic_add_fetch_64((int64_t*)&pActivity->fetchBytes, pRequest->body.resInfo.payloadLen);
|
||||
}
|
||||
|
||||
pRequest->body.fetchFp(pRequest->body.param, pRequest, pResultInfo->numOfRows);
|
||||
}
|
||||
|
||||
void taosAsyncFetchImpl(SRequestObj *pRequest, __taos_async_fn_t fp, void *param) {
|
||||
void taosAsyncFetchImpl(SRequestObj* pRequest, __taos_async_fn_t fp, void* param) {
|
||||
pRequest->body.fetchFp = fp;
|
||||
pRequest->body.param = param;
|
||||
|
||||
SReqResultInfo *pResultInfo = &pRequest->body.resInfo;
|
||||
SReqResultInfo* pResultInfo = &pRequest->body.resInfo;
|
||||
|
||||
// this query has no results or error exists, return directly
|
||||
if (taos_num_fields(pRequest) == 0 || pRequest->code != TSDB_CODE_SUCCESS) {
|
||||
|
@ -2578,5 +2578,3 @@ void taosAsyncFetchImpl(SRequestObj *pRequest, __taos_async_fn_t fp, void *param
|
|||
|
||||
schedulerFetchRows(pRequest->body.queryJob, &req);
|
||||
}
|
||||
|
||||
|
||||
|
|
|
@ -558,13 +558,12 @@ int taos_select_db(TAOS *taos, const char *db) {
|
|||
return code;
|
||||
}
|
||||
|
||||
|
||||
void taos_stop_query(TAOS_RES *res) {
|
||||
if (res == NULL || TD_RES_TMQ(res) || TD_RES_TMQ_META(res) || TD_RES_TMQ_METADATA(res)) {
|
||||
return;
|
||||
}
|
||||
|
||||
stopAllQueries((SRequestObj*)res);
|
||||
stopAllQueries((SRequestObj *)res);
|
||||
}
|
||||
|
||||
bool taos_is_null(TAOS_RES *res, int32_t row, int32_t col) {
|
||||
|
@ -785,7 +784,7 @@ void destorySqlCallbackWrapper(SSqlCallbackWrapper *pWrapper) {
|
|||
taosMemoryFree(pWrapper);
|
||||
}
|
||||
|
||||
void destroyCtxInRequest(SRequestObj* pRequest) {
|
||||
void destroyCtxInRequest(SRequestObj *pRequest) {
|
||||
schedulerFreeJob(&pRequest->body.queryJob, 0);
|
||||
qDestroyQuery(pRequest->pQuery);
|
||||
pRequest->pQuery = NULL;
|
||||
|
@ -793,7 +792,6 @@ void destroyCtxInRequest(SRequestObj* pRequest) {
|
|||
pRequest->pWrapper = NULL;
|
||||
}
|
||||
|
||||
|
||||
static void doAsyncQueryFromAnalyse(SMetaData *pResultMeta, void *param, int32_t code) {
|
||||
SSqlCallbackWrapper *pWrapper = (SSqlCallbackWrapper *)param;
|
||||
SRequestObj *pRequest = pWrapper->pRequest;
|
||||
|
@ -807,15 +805,15 @@ static void doAsyncQueryFromAnalyse(SMetaData *pResultMeta, void *param, int32_t
|
|||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = qAnalyseSqlSemantic(pWrapper->pParseCtx, pWrapper->pCatalogReq, pResultMeta, pQuery);
|
||||
}
|
||||
|
||||
|
||||
pRequest->metric.analyseCostUs += taosGetTimestampUs() - analyseStart;
|
||||
|
||||
|
||||
handleQueryAnslyseRes(pWrapper, pResultMeta, code);
|
||||
}
|
||||
|
||||
int32_t cloneCatalogReq(SCatalogReq* * ppTarget, SCatalogReq* pSrc) {
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
SCatalogReq* pTarget = taosMemoryCalloc(1, sizeof(SCatalogReq));
|
||||
int32_t cloneCatalogReq(SCatalogReq **ppTarget, SCatalogReq *pSrc) {
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
SCatalogReq *pTarget = taosMemoryCalloc(1, sizeof(SCatalogReq));
|
||||
if (pTarget == NULL) {
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
} else {
|
||||
|
@ -842,17 +840,16 @@ int32_t cloneCatalogReq(SCatalogReq* * ppTarget, SCatalogReq* pSrc) {
|
|||
return code;
|
||||
}
|
||||
|
||||
|
||||
void handleSubQueryFromAnalyse(SSqlCallbackWrapper *pWrapper, SMetaData *pResultMeta, SNode* pRoot) {
|
||||
SRequestObj* pNewRequest = NULL;
|
||||
SSqlCallbackWrapper* pNewWrapper = NULL;
|
||||
int32_t code = buildPreviousRequest(pWrapper->pRequest, pWrapper->pRequest->sqlstr, &pNewRequest);
|
||||
void handleSubQueryFromAnalyse(SSqlCallbackWrapper *pWrapper, SMetaData *pResultMeta, SNode *pRoot) {
|
||||
SRequestObj *pNewRequest = NULL;
|
||||
SSqlCallbackWrapper *pNewWrapper = NULL;
|
||||
int32_t code = buildPreviousRequest(pWrapper->pRequest, pWrapper->pRequest->sqlstr, &pNewRequest);
|
||||
if (code) {
|
||||
handleQueryAnslyseRes(pWrapper, pResultMeta, code);
|
||||
return;
|
||||
}
|
||||
|
||||
pNewRequest->pQuery = (SQuery*)nodesMakeNode(QUERY_NODE_QUERY);
|
||||
pNewRequest->pQuery = (SQuery *)nodesMakeNode(QUERY_NODE_QUERY);
|
||||
if (NULL == pNewRequest->pQuery) {
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
} else {
|
||||
|
@ -871,16 +868,16 @@ void handleSubQueryFromAnalyse(SSqlCallbackWrapper *pWrapper, SMetaData *pResult
|
|||
}
|
||||
|
||||
void handleQueryAnslyseRes(SSqlCallbackWrapper *pWrapper, SMetaData *pResultMeta, int32_t code) {
|
||||
SRequestObj *pRequest = pWrapper->pRequest;
|
||||
SQuery *pQuery = pRequest->pQuery;
|
||||
SRequestObj *pRequest = pWrapper->pRequest;
|
||||
SQuery *pQuery = pRequest->pQuery;
|
||||
|
||||
if (code == TSDB_CODE_SUCCESS && pQuery->pPrevRoot) {
|
||||
SNode* prevRoot = pQuery->pPrevRoot;
|
||||
SNode *prevRoot = pQuery->pPrevRoot;
|
||||
pQuery->pPrevRoot = NULL;
|
||||
handleSubQueryFromAnalyse(pWrapper, pResultMeta, prevRoot);
|
||||
return;
|
||||
}
|
||||
|
||||
|
||||
if (code == TSDB_CODE_SUCCESS) {
|
||||
pRequest->stableQuery = pQuery->stableQuery;
|
||||
if (pQuery->pRoot) {
|
||||
|
@ -1043,7 +1040,7 @@ int32_t createParseContext(const SRequestObj *pRequest, SParseContext **pCxt) {
|
|||
}
|
||||
|
||||
int32_t prepareAndParseSqlSyntax(SSqlCallbackWrapper **ppWrapper, SRequestObj *pRequest, bool updateMetaForce) {
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
STscObj *pTscObj = pRequest->pTscObj;
|
||||
SSqlCallbackWrapper *pWrapper = taosMemoryCalloc(1, sizeof(SSqlCallbackWrapper));
|
||||
if (pWrapper == NULL) {
|
||||
|
@ -1081,7 +1078,6 @@ int32_t prepareAndParseSqlSyntax(SSqlCallbackWrapper **ppWrapper, SRequestObj *p
|
|||
return code;
|
||||
}
|
||||
|
||||
|
||||
void doAsyncQuery(SRequestObj *pRequest, bool updateMetaForce) {
|
||||
SSqlCallbackWrapper *pWrapper = NULL;
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
|
@ -1128,12 +1124,12 @@ void doAsyncQuery(SRequestObj *pRequest, bool updateMetaForce) {
|
|||
}
|
||||
|
||||
void restartAsyncQuery(SRequestObj *pRequest, int32_t code) {
|
||||
int32_t reqIdx = 0;
|
||||
int32_t reqIdx = 0;
|
||||
SRequestObj *pReqList[16] = {NULL};
|
||||
SRequestObj *pUserReq = NULL;
|
||||
pReqList[0] = pRequest;
|
||||
uint64_t tmpRefId = 0;
|
||||
SRequestObj* pTmp = pRequest;
|
||||
uint64_t tmpRefId = 0;
|
||||
SRequestObj *pTmp = pRequest;
|
||||
while (pTmp->relation.prevRefId) {
|
||||
tmpRefId = pTmp->relation.prevRefId;
|
||||
pTmp = acquireRequest(tmpRefId);
|
||||
|
@ -1141,9 +1137,9 @@ void restartAsyncQuery(SRequestObj *pRequest, int32_t code) {
|
|||
pReqList[++reqIdx] = pTmp;
|
||||
releaseRequest(tmpRefId);
|
||||
} else {
|
||||
tscError("0x%" PRIx64 ", prev req ref 0x%" PRIx64 " is not there, reqId:0x%" PRIx64, pTmp->self,
|
||||
tmpRefId, pTmp->requestId);
|
||||
break;
|
||||
tscError("0x%" PRIx64 ", prev req ref 0x%" PRIx64 " is not there, reqId:0x%" PRIx64, pTmp->self, tmpRefId,
|
||||
pTmp->requestId);
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -1152,11 +1148,11 @@ void restartAsyncQuery(SRequestObj *pRequest, int32_t code) {
|
|||
pTmp = acquireRequest(tmpRefId);
|
||||
if (pTmp) {
|
||||
tmpRefId = pTmp->relation.nextRefId;
|
||||
removeRequest(pTmp->self);
|
||||
removeRequest(pTmp->self);
|
||||
releaseRequest(pTmp->self);
|
||||
} else {
|
||||
tscError("0x%" PRIx64 " is not there", tmpRefId);
|
||||
break;
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -423,7 +423,7 @@ int32_t tdSTSRowNew(SArray *pArray, STSchema *pTSchema, STSRow **ppRow) {
|
|||
val = (const void *)&pColVal->value.val;
|
||||
}
|
||||
} else {
|
||||
pColVal = NULL;
|
||||
// pColVal = NULL;
|
||||
valType = TD_VTYPE_NONE;
|
||||
}
|
||||
|
||||
|
|
|
@ -16,6 +16,7 @@
|
|||
#define _DEFAULT_SOURCE
|
||||
#include "dmMgmt.h"
|
||||
#include "qworker.h"
|
||||
#include "tversion.h"
|
||||
|
||||
static inline void dmSendRsp(SRpcMsg *pMsg) { rpcSendResponse(pMsg); }
|
||||
|
||||
|
@ -73,6 +74,13 @@ static void dmProcessRpcMsg(SDnode *pDnode, SRpcMsg *pRpc, SEpSet *pEpSet) {
|
|||
dGTrace("msg:%s is received, handle:%p len:%d code:0x%x app:%p refId:%" PRId64, TMSG_INFO(pRpc->msgType),
|
||||
pRpc->info.handle, pRpc->contLen, pRpc->code, pRpc->info.ahandle, pRpc->info.refId);
|
||||
|
||||
int32_t svrVer = 0;
|
||||
taosVersionStrToInt(version, &svrVer);
|
||||
if (0 != taosCheckVersionCompatible(pRpc->info.cliVer, svrVer, 3)) {
|
||||
dError("Version not compatible, cli ver: %d, svr ver: %d", pRpc->info.cliVer, svrVer);
|
||||
goto _OVER;
|
||||
}
|
||||
|
||||
switch (pRpc->msgType) {
|
||||
case TDMT_DND_NET_TEST:
|
||||
dmProcessNetTestReq(pDnode, pRpc);
|
||||
|
@ -305,6 +313,7 @@ int32_t dmInitClient(SDnode *pDnode) {
|
|||
rpcInit.supportBatch = 1;
|
||||
rpcInit.batchSize = 8 * 1024;
|
||||
rpcInit.timeToGetConn = tsTimeToGetAvailableConn;
|
||||
taosVersionStrToInt(version, &(rpcInit.compatibilityVer));
|
||||
|
||||
pTrans->clientRpc = rpcOpen(&rpcInit);
|
||||
if (pTrans->clientRpc == NULL) {
|
||||
|
@ -339,7 +348,7 @@ int32_t dmInitServer(SDnode *pDnode) {
|
|||
rpcInit.idleTime = tsShellActivityTimer * 1000;
|
||||
rpcInit.parent = pDnode;
|
||||
rpcInit.compressSize = tsCompressMsgSize;
|
||||
|
||||
taosVersionStrToInt(version, &(rpcInit.compatibilityVer));
|
||||
pTrans->serverRpc = rpcOpen(&rpcInit);
|
||||
if (pTrans->serverRpc == NULL) {
|
||||
dError("failed to init dnode rpc server");
|
||||
|
|
|
@ -16,6 +16,7 @@
|
|||
#include "sut.h"
|
||||
#include "tdatablock.h"
|
||||
#include "tmisce.h"
|
||||
#include "tversion.h"
|
||||
|
||||
static void processClientRsp(void* parent, SRpcMsg* pRsp, SEpSet* pEpSet) {
|
||||
TestClient* client = (TestClient*)parent;
|
||||
|
@ -53,6 +54,7 @@ void TestClient::DoInit() {
|
|||
rpcInit.parent = this;
|
||||
// rpcInit.secret = (char*)secretEncrypt;
|
||||
// rpcInit.spi = 1;
|
||||
taosVersionStrToInt(version, &(rpcInit.compatibilityVer));
|
||||
|
||||
clientRpc = rpcOpen(&rpcInit);
|
||||
ASSERT(clientRpc);
|
||||
|
|
|
@ -207,7 +207,10 @@ int metaCreateSTable(SMeta *pMeta, int64_t version, SVCreateStbReq *pReq) {
|
|||
tb_uid_t uid = *(tb_uid_t *)pData;
|
||||
tdbFree(pData);
|
||||
SMetaInfo info;
|
||||
metaGetInfo(pMeta, uid, &info, NULL);
|
||||
if (metaGetInfo(pMeta, uid, &info, NULL) == TSDB_CODE_NOT_FOUND) {
|
||||
terrno = TSDB_CODE_PAR_TABLE_NOT_EXIST;
|
||||
return -1;
|
||||
}
|
||||
if (info.uid == info.suid) {
|
||||
return 0;
|
||||
} else {
|
||||
|
@ -939,7 +942,7 @@ int metaTtlDropTable(SMeta *pMeta, int64_t timePointMs, SArray *tbUids) {
|
|||
return 0;
|
||||
}
|
||||
|
||||
metaInfo("ttl find expired table count: %zu" , TARRAY_SIZE(tbUids));
|
||||
metaInfo("ttl find expired table count: %zu", TARRAY_SIZE(tbUids));
|
||||
|
||||
metaDropTables(pMeta, tbUids);
|
||||
return 0;
|
||||
|
|
|
@ -1031,7 +1031,7 @@ int32_t tsdbCacheGetBatch(STsdb *pTsdb, tb_uid_t uid, SArray *pLastArray, SCache
|
|||
|
||||
return code;
|
||||
}
|
||||
|
||||
/*
|
||||
int32_t tsdbCacheGet(STsdb *pTsdb, tb_uid_t uid, SArray *pLastArray, SCacheRowsReader *pr, int8_t ltype) {
|
||||
int32_t code = 0;
|
||||
SLRUCache *pCache = pTsdb->lruCache;
|
||||
|
@ -1079,7 +1079,7 @@ int32_t tsdbCacheGet(STsdb *pTsdb, tb_uid_t uid, SArray *pLastArray, SCacheRowsR
|
|||
|
||||
return code;
|
||||
}
|
||||
|
||||
*/
|
||||
int32_t tsdbCacheDel(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSKEY sKey, TSKEY eKey) {
|
||||
int32_t code = 0;
|
||||
// fetch schema
|
||||
|
@ -1829,10 +1829,11 @@ static int32_t getNextRowFromFSLast(void *iter, TSDBROW **ppRow, bool *pIgnoreEa
|
|||
}
|
||||
|
||||
*pIgnoreEarlierTs = false;
|
||||
/*
|
||||
if (!hasVal) {
|
||||
state->state = SFSLASTNEXTROW_FILESET;
|
||||
}
|
||||
|
||||
*/
|
||||
if (!state->checkRemainingRow) {
|
||||
state->checkRemainingRow = true;
|
||||
}
|
||||
|
@ -2020,10 +2021,9 @@ static int32_t getNextRowFromFS(void *iter, TSDBROW **ppRow, bool *pIgnoreEarlie
|
|||
tMapDataGetItemByIdx(&state->blockMap, state->iBlock, &block, tGetDataBlk);
|
||||
if (block.maxKey.ts <= state->lastTs) {
|
||||
*pIgnoreEarlierTs = true;
|
||||
if (state->pBlockData) {
|
||||
tBlockDataDestroy(state->pBlockData);
|
||||
state->pBlockData = NULL;
|
||||
}
|
||||
|
||||
tBlockDataDestroy(state->pBlockData);
|
||||
state->pBlockData = NULL;
|
||||
|
||||
*ppRow = NULL;
|
||||
return code;
|
||||
|
@ -3176,97 +3176,46 @@ static int32_t mergeLastRowCid(tb_uid_t uid, STsdb *pTsdb, SArray **ppLastArray,
|
|||
|
||||
TSKEY rowTs = TSDBROW_TS(pRow);
|
||||
|
||||
if (lastRowTs == TSKEY_MAX) {
|
||||
lastRowTs = rowTs;
|
||||
lastRowTs = rowTs;
|
||||
|
||||
for (int16_t iCol = noneCol; iCol < nCols; ++iCol) {
|
||||
if (iCol >= nLastCol) {
|
||||
break;
|
||||
}
|
||||
SLastCol *pCol = taosArrayGet(pColArray, iCol);
|
||||
if (pCol->colVal.cid != pTSchema->columns[slotIds[iCol]].colId) {
|
||||
continue;
|
||||
}
|
||||
if (slotIds[iCol] == 0) {
|
||||
STColumn *pTColumn = &pTSchema->columns[0];
|
||||
|
||||
*pColVal = COL_VAL_VALUE(pTColumn->colId, pTColumn->type, (SValue){.val = rowTs});
|
||||
taosArraySet(pColArray, 0, &(SLastCol){.ts = rowTs, .colVal = *pColVal});
|
||||
continue;
|
||||
}
|
||||
tsdbRowGetColVal(pRow, pTSchema, slotIds[iCol], pColVal);
|
||||
|
||||
*pCol = (SLastCol){.ts = rowTs, .colVal = *pColVal};
|
||||
if (IS_VAR_DATA_TYPE(pColVal->type) /*&& pColVal->value.nData > 0*/) {
|
||||
pCol->colVal.value.pData = taosMemoryMalloc(pCol->colVal.value.nData);
|
||||
if (pCol->colVal.value.pData == NULL) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
goto _err;
|
||||
}
|
||||
if (pColVal->value.nData > 0) {
|
||||
memcpy(pCol->colVal.value.pData, pColVal->value.pData, pColVal->value.nData);
|
||||
}
|
||||
}
|
||||
|
||||
/*if (COL_VAL_IS_NONE(pColVal)) {
|
||||
if (!setNoneCol) {
|
||||
noneCol = iCol;
|
||||
setNoneCol = true;
|
||||
}
|
||||
} else {*/
|
||||
int32_t aColIndex = taosArraySearchIdx(aColArray, &pColVal->cid, compareInt16Val, TD_EQ);
|
||||
if (aColIndex >= 0) {
|
||||
taosArrayRemove(aColArray, aColIndex);
|
||||
}
|
||||
//}
|
||||
}
|
||||
if (!setNoneCol) {
|
||||
// done, goto return pColArray
|
||||
break;
|
||||
} else {
|
||||
continue;
|
||||
}
|
||||
}
|
||||
|
||||
// merge into pColArray
|
||||
setNoneCol = false;
|
||||
for (int16_t iCol = noneCol; iCol < nCols; ++iCol) {
|
||||
if (iCol >= nLastCol) {
|
||||
break;
|
||||
}
|
||||
// high version's column value
|
||||
SLastCol *lastColVal = (SLastCol *)taosArrayGet(pColArray, iCol);
|
||||
if (lastColVal->colVal.cid != pTSchema->columns[slotIds[iCol]].colId) {
|
||||
SLastCol *pCol = taosArrayGet(pColArray, iCol);
|
||||
if (pCol->colVal.cid != pTSchema->columns[slotIds[iCol]].colId) {
|
||||
continue;
|
||||
}
|
||||
SColVal *tColVal = &lastColVal->colVal;
|
||||
if (slotIds[iCol] == 0) {
|
||||
STColumn *pTColumn = &pTSchema->columns[0];
|
||||
|
||||
*pColVal = COL_VAL_VALUE(pTColumn->colId, pTColumn->type, (SValue){.val = rowTs});
|
||||
taosArraySet(pColArray, 0, &(SLastCol){.ts = rowTs, .colVal = *pColVal});
|
||||
continue;
|
||||
}
|
||||
tsdbRowGetColVal(pRow, pTSchema, slotIds[iCol], pColVal);
|
||||
if (COL_VAL_IS_NONE(tColVal) && !COL_VAL_IS_NONE(pColVal)) {
|
||||
SLastCol lastCol = {.ts = rowTs, .colVal = *pColVal};
|
||||
if (IS_VAR_DATA_TYPE(pColVal->type) && pColVal->value.nData > 0) {
|
||||
SLastCol *pLastCol = (SLastCol *)taosArrayGet(pColArray, iCol);
|
||||
taosMemoryFree(pLastCol->colVal.value.pData);
|
||||
|
||||
lastCol.colVal.value.pData = taosMemoryMalloc(lastCol.colVal.value.nData);
|
||||
if (lastCol.colVal.value.pData == NULL) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
goto _err;
|
||||
}
|
||||
memcpy(lastCol.colVal.value.pData, pColVal->value.pData, pColVal->value.nData);
|
||||
*pCol = (SLastCol){.ts = rowTs, .colVal = *pColVal};
|
||||
if (IS_VAR_DATA_TYPE(pColVal->type) /*&& pColVal->value.nData > 0*/) {
|
||||
pCol->colVal.value.pData = taosMemoryMalloc(pCol->colVal.value.nData);
|
||||
if (pCol->colVal.value.pData == NULL) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
goto _err;
|
||||
}
|
||||
if (pColVal->value.nData > 0) {
|
||||
memcpy(pCol->colVal.value.pData, pColVal->value.pData, pColVal->value.nData);
|
||||
}
|
||||
}
|
||||
|
||||
taosArraySet(pColArray, iCol, &lastCol);
|
||||
int32_t aColIndex = taosArraySearchIdx(aColArray, &lastCol.colVal.cid, compareInt16Val, TD_EQ);
|
||||
int32_t aColIndex = taosArraySearchIdx(aColArray, &pColVal->cid, compareInt16Val, TD_EQ);
|
||||
if (aColIndex >= 0) {
|
||||
taosArrayRemove(aColArray, aColIndex);
|
||||
} else if (COL_VAL_IS_NONE(tColVal) && !COL_VAL_IS_NONE(pColVal) && !setNoneCol) {
|
||||
noneCol = iCol;
|
||||
setNoneCol = true;
|
||||
}
|
||||
}
|
||||
} while (setNoneCol);
|
||||
|
||||
break;
|
||||
} while (1);
|
||||
|
||||
if (!hasRow) {
|
||||
if (ignoreEarlierTs) {
|
||||
|
|
|
@ -87,7 +87,7 @@ static int32_t buildDescResultDataBlock(SSDataBlock** pOutput) {
|
|||
return code;
|
||||
}
|
||||
|
||||
static void setDescResultIntoDataBlock(bool sysInfoUser, SSDataBlock* pBlock, int32_t numOfRows, STableMeta* pMeta) {
|
||||
static int32_t setDescResultIntoDataBlock(bool sysInfoUser, SSDataBlock* pBlock, int32_t numOfRows, STableMeta* pMeta) {
|
||||
blockDataEnsureCapacity(pBlock, numOfRows);
|
||||
pBlock->info.rows = 0;
|
||||
|
||||
|
@ -114,6 +114,11 @@ static void setDescResultIntoDataBlock(bool sysInfoUser, SSDataBlock* pBlock, in
|
|||
colDataSetVal(pCol4, pBlock->info.rows, buf, false);
|
||||
++(pBlock->info.rows);
|
||||
}
|
||||
if (pBlock->info.rows <= 0) {
|
||||
qError("no permission to view any columns");
|
||||
return TSDB_CODE_PAR_PERMISSION_DENIED;
|
||||
}
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static int32_t execDescribe(bool sysInfoUser, SNode* pStmt, SRetrieveTableRsp** pRsp) {
|
||||
|
@ -123,7 +128,7 @@ static int32_t execDescribe(bool sysInfoUser, SNode* pStmt, SRetrieveTableRsp**
|
|||
SSDataBlock* pBlock = NULL;
|
||||
int32_t code = buildDescResultDataBlock(&pBlock);
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
setDescResultIntoDataBlock(sysInfoUser, pBlock, numOfRows, pDesc->pMeta);
|
||||
code = setDescResultIntoDataBlock(sysInfoUser, pBlock, numOfRows, pDesc->pMeta);
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = buildRetrieveTableRsp(pBlock, DESCRIBE_RESULT_COLS, pRsp);
|
||||
|
|
|
@ -29,6 +29,7 @@
|
|||
#include "tmsg.h"
|
||||
#include "trpc.h"
|
||||
#include "tmisce.h"
|
||||
#include "tversion.h"
|
||||
// clang-format on
|
||||
|
||||
#define UDFD_MAX_SCRIPT_PLUGINS 64
|
||||
|
@ -1038,7 +1039,7 @@ int32_t udfdOpenClientRpc() {
|
|||
connLimitNum = TMIN(connLimitNum, 500);
|
||||
rpcInit.connLimitNum = connLimitNum;
|
||||
rpcInit.timeToGetConn = tsTimeToGetAvailableConn;
|
||||
|
||||
taosVersionStrToInt(version, &(rpcInit.compatibilityVer));
|
||||
global.clientRpc = rpcOpen(&rpcInit);
|
||||
if (global.clientRpc == NULL) {
|
||||
fnError("failed to init dnode rpc client");
|
||||
|
|
|
@ -21,6 +21,7 @@
|
|||
#include "tglobal.h"
|
||||
#include "ttimer.h"
|
||||
#include "tutil.h"
|
||||
#include "tversion.h"
|
||||
|
||||
bool gRaftDetailLog = false;
|
||||
SSyncIO *gSyncIO = NULL;
|
||||
|
@ -188,7 +189,7 @@ static int32_t syncIOStartInternal(SSyncIO *io) {
|
|||
rpcInit.idleTime = 100;
|
||||
rpcInit.user = "sync-io";
|
||||
rpcInit.connType = TAOS_CONN_CLIENT;
|
||||
|
||||
taosVersionStrToInt(version, &(rpcInit.compatibilityVer));
|
||||
io->clientRpc = rpcOpen(&rpcInit);
|
||||
if (io->clientRpc == NULL) {
|
||||
sError("failed to initialize RPC");
|
||||
|
@ -209,7 +210,7 @@ static int32_t syncIOStartInternal(SSyncIO *io) {
|
|||
rpcInit.idleTime = 2 * 1500;
|
||||
rpcInit.parent = io;
|
||||
rpcInit.connType = TAOS_CONN_SERVER;
|
||||
|
||||
taosVersionStrToInt(version, &(rpcInit.compatibilityVer));
|
||||
void *pRpc = rpcOpen(&rpcInit);
|
||||
if (pRpc == NULL) {
|
||||
sError("failed to start RPC server");
|
||||
|
@ -470,11 +471,10 @@ static void syncIOTickPing(void *param, void *tmrId) {
|
|||
taosTmrReset(syncIOTickPing, io->pingTimerMS, io, io->timerMgr, &io->pingTimer);
|
||||
}
|
||||
|
||||
void syncEntryDestory(SSyncRaftEntry* pEntry) {}
|
||||
void syncEntryDestory(SSyncRaftEntry *pEntry) {}
|
||||
|
||||
|
||||
void syncUtilMsgNtoH(void* msg) {
|
||||
SMsgHead* pHead = msg;
|
||||
void syncUtilMsgNtoH(void *msg) {
|
||||
SMsgHead *pHead = msg;
|
||||
pHead->contLen = ntohl(pHead->contLen);
|
||||
pHead->vgId = ntohl(pHead->vgId);
|
||||
}
|
||||
|
@ -487,9 +487,9 @@ static inline bool syncUtilCanPrint(char c) {
|
|||
}
|
||||
}
|
||||
|
||||
char* syncUtilPrintBin(char* ptr, uint32_t len) {
|
||||
char *syncUtilPrintBin(char *ptr, uint32_t len) {
|
||||
int64_t memLen = (int64_t)(len + 1);
|
||||
char* s = taosMemoryMalloc(memLen);
|
||||
char *s = taosMemoryMalloc(memLen);
|
||||
ASSERT(s != NULL);
|
||||
memset(s, 0, len + 1);
|
||||
memcpy(s, ptr, len);
|
||||
|
@ -502,13 +502,13 @@ char* syncUtilPrintBin(char* ptr, uint32_t len) {
|
|||
return s;
|
||||
}
|
||||
|
||||
char* syncUtilPrintBin2(char* ptr, uint32_t len) {
|
||||
char *syncUtilPrintBin2(char *ptr, uint32_t len) {
|
||||
uint32_t len2 = len * 4 + 1;
|
||||
char* s = taosMemoryMalloc(len2);
|
||||
char *s = taosMemoryMalloc(len2);
|
||||
ASSERT(s != NULL);
|
||||
memset(s, 0, len2);
|
||||
|
||||
char* p = s;
|
||||
char *p = s;
|
||||
for (int32_t i = 0; i < len; ++i) {
|
||||
int32_t n = sprintf(p, "%d,", ptr[i]);
|
||||
p += n;
|
||||
|
@ -516,7 +516,7 @@ char* syncUtilPrintBin2(char* ptr, uint32_t len) {
|
|||
return s;
|
||||
}
|
||||
|
||||
void syncUtilU642Addr(uint64_t u64, char* host, int64_t len, uint16_t* port) {
|
||||
void syncUtilU642Addr(uint64_t u64, char *host, int64_t len, uint16_t *port) {
|
||||
uint32_t hostU32 = (uint32_t)((u64 >> 32) & 0x00000000FFFFFFFF);
|
||||
|
||||
struct in_addr addr = {.s_addr = hostU32};
|
||||
|
@ -524,7 +524,7 @@ void syncUtilU642Addr(uint64_t u64, char* host, int64_t len, uint16_t* port) {
|
|||
*port = (uint16_t)((u64 & 0x00000000FFFF0000) >> 16);
|
||||
}
|
||||
|
||||
uint64_t syncUtilAddr2U64(const char* host, uint16_t port) {
|
||||
uint64_t syncUtilAddr2U64(const char *host, uint16_t port) {
|
||||
uint32_t hostU32 = taosGetIpv4FromFqdn(host);
|
||||
if (hostU32 == (uint32_t)-1) {
|
||||
sError("failed to resolve ipv4 addr, host:%s", host);
|
||||
|
|
|
@ -345,7 +345,7 @@ int tdbBtreePGet(SBTree *pBt, const void *pKey, int kLen, void **ppKey, int *pkL
|
|||
}
|
||||
*ppKey = pTKey;
|
||||
*pkLen = cd.kLen;
|
||||
memcpy(*ppKey, cd.pKey, cd.kLen);
|
||||
memcpy(*ppKey, cd.pKey, (size_t)cd.kLen);
|
||||
}
|
||||
|
||||
if (ppVal) {
|
||||
|
@ -357,7 +357,7 @@ int tdbBtreePGet(SBTree *pBt, const void *pKey, int kLen, void **ppKey, int *pkL
|
|||
}
|
||||
*ppVal = pTVal;
|
||||
*vLen = cd.vLen;
|
||||
memcpy(*ppVal, cd.pVal, cd.vLen);
|
||||
memcpy(*ppVal, cd.pVal, (size_t)cd.vLen);
|
||||
}
|
||||
|
||||
if (TDB_CELLDECODER_FREE_KEY(&cd)) {
|
||||
|
@ -1793,7 +1793,7 @@ int tdbBtreeNext(SBTC *pBtc, void **ppKey, int *kLen, void **ppVal, int *vLen) {
|
|||
|
||||
*ppKey = pKey;
|
||||
*kLen = cd.kLen;
|
||||
memcpy(pKey, cd.pKey, cd.kLen);
|
||||
memcpy(pKey, cd.pKey, (size_t)cd.kLen);
|
||||
|
||||
if (ppVal) {
|
||||
if (cd.vLen > 0) {
|
||||
|
@ -1852,7 +1852,7 @@ int tdbBtreePrev(SBTC *pBtc, void **ppKey, int *kLen, void **ppVal, int *vLen) {
|
|||
|
||||
*ppKey = pKey;
|
||||
*kLen = cd.kLen;
|
||||
memcpy(pKey, cd.pKey, cd.kLen);
|
||||
memcpy(pKey, cd.pKey, (size_t)cd.kLen);
|
||||
|
||||
if (ppVal) {
|
||||
// TODO: vLen may be zero
|
||||
|
@ -1864,7 +1864,7 @@ int tdbBtreePrev(SBTC *pBtc, void **ppKey, int *kLen, void **ppVal, int *vLen) {
|
|||
|
||||
*ppVal = pVal;
|
||||
*vLen = cd.vLen;
|
||||
memcpy(pVal, cd.pVal, cd.vLen);
|
||||
memcpy(pVal, cd.pVal, (size_t)cd.vLen);
|
||||
}
|
||||
|
||||
ret = tdbBtcMoveToPrev(pBtc);
|
||||
|
|
|
@ -62,7 +62,10 @@ int32_t tdbOpen(const char *dbname, int32_t szPage, int32_t pages, TDB **ppDb, i
|
|||
}
|
||||
memset(pDb->pgrHash, 0, tsize);
|
||||
|
||||
taosMulModeMkDir(dbname, 0755);
|
||||
ret = taosMulModeMkDir(dbname, 0755);
|
||||
if (ret < 0) {
|
||||
return -1;
|
||||
}
|
||||
|
||||
#ifdef USE_MAINDB
|
||||
// open main db
|
||||
|
|
|
@ -980,6 +980,7 @@ int tdbPagerRestoreJournals(SPager *pPager) {
|
|||
jname[dirLen] = '/';
|
||||
sprintf(jname + dirLen + 1, TDB_MAINDB_NAME "-journal.%" PRId64, *pTxnId);
|
||||
if (tdbPagerRestore(pPager, jname) < 0) {
|
||||
taosArrayDestroy(pTxnList);
|
||||
tdbCloseDir(&pDir);
|
||||
|
||||
tdbError("failed to restore file due to %s. jFileName:%s", strerror(errno), jname);
|
||||
|
|
|
@ -154,6 +154,7 @@ typedef struct {
|
|||
|
||||
#pragma pack(push, 1)
|
||||
|
||||
#define TRANS_VER 2
|
||||
typedef struct {
|
||||
char version : 4; // RPC version
|
||||
char comp : 2; // compression algorithm, 0:no compression 1:lz4
|
||||
|
@ -166,6 +167,7 @@ typedef struct {
|
|||
|
||||
uint64_t timestamp;
|
||||
char user[TSDB_UNI_LEN];
|
||||
int32_t compatibilityVer;
|
||||
uint32_t magicNum;
|
||||
STraceId traceId;
|
||||
uint64_t ahandle; // ahandle assigned by client
|
||||
|
|
|
@ -46,10 +46,10 @@ typedef struct {
|
|||
int8_t connType;
|
||||
char label[TSDB_LABEL_LEN];
|
||||
char user[TSDB_UNI_LEN]; // meter ID
|
||||
|
||||
int32_t compressSize; // -1: no compress, 0 : all data compressed, size: compress data if larger than size
|
||||
int8_t encryption; // encrypt or not
|
||||
|
||||
int32_t compatibilityVer;
|
||||
int32_t compressSize; // -1: no compress, 0 : all data compressed, size: compress data if larger than size
|
||||
int8_t encryption; // encrypt or not
|
||||
|
||||
int32_t retryMinInterval; // retry init interval
|
||||
int32_t retryStepFactor; // retry interval factor
|
||||
int32_t retryMaxInterval; // retry max interval
|
||||
|
|
|
@ -50,6 +50,7 @@ void* rpcOpen(const SRpcInit* pInit) {
|
|||
}
|
||||
|
||||
pRpc->encryption = pInit->encryption;
|
||||
pRpc->compatibilityVer = pInit->compatibilityVer;
|
||||
|
||||
pRpc->retryMinInterval = pInit->retryMinInterval; // retry init interval
|
||||
pRpc->retryStepFactor = pInit->retryStepFactor;
|
||||
|
|
|
@ -391,6 +391,7 @@ void cliHandleResp(SCliConn* conn) {
|
|||
transMsg.info.ahandle = NULL;
|
||||
transMsg.info.traceId = pHead->traceId;
|
||||
transMsg.info.hasEpSet = pHead->hasEpSet;
|
||||
transMsg.info.cliVer = htonl(pHead->compatibilityVer);
|
||||
|
||||
SCliMsg* pMsg = NULL;
|
||||
STransConnCtx* pCtx = NULL;
|
||||
|
@ -488,6 +489,7 @@ void cliHandleExceptImpl(SCliConn* pConn, int32_t code) {
|
|||
transMsg.code = code == -1 ? (pConn->broken ? TSDB_CODE_RPC_BROKEN_LINK : TSDB_CODE_RPC_NETWORK_UNAVAIL) : code;
|
||||
transMsg.msgType = pMsg ? pMsg->msg.msgType + 1 : 0;
|
||||
transMsg.info.ahandle = NULL;
|
||||
transMsg.info.cliVer = pTransInst->compatibilityVer;
|
||||
|
||||
if (pMsg == NULL && !CONN_NO_PERSIST_BY_APP(pConn)) {
|
||||
transMsg.info.ahandle = transCtxDumpVal(&pConn->ctx, transMsg.msgType);
|
||||
|
@ -984,11 +986,10 @@ void cliSendBatch(SCliConn* pConn) {
|
|||
SCliThrd* pThrd = pConn->hostThrd;
|
||||
STrans* pTransInst = pThrd->pTransInst;
|
||||
|
||||
SCliBatch* pBatch = pConn->pBatch;
|
||||
SCliBatchList* pList = pBatch->pList;
|
||||
pList->connCnt += 1;
|
||||
SCliBatch* pBatch = pConn->pBatch;
|
||||
int32_t wLen = pBatch->wLen;
|
||||
|
||||
int32_t wLen = pBatch->wLen;
|
||||
pBatch->pList->connCnt += 1;
|
||||
|
||||
uv_buf_t* wb = taosMemoryCalloc(wLen, sizeof(uv_buf_t));
|
||||
int i = 0;
|
||||
|
@ -1018,6 +1019,8 @@ void cliSendBatch(SCliConn* pConn) {
|
|||
memcpy(pHead->user, pTransInst->user, strlen(pTransInst->user));
|
||||
pHead->traceId = pMsg->info.traceId;
|
||||
pHead->magicNum = htonl(TRANS_MAGIC_NUM);
|
||||
pHead->version = TRANS_VER;
|
||||
pHead->compatibilityVer = htonl(pTransInst->compatibilityVer);
|
||||
}
|
||||
pHead->timestamp = taosHton64(taosGetTimestampUs());
|
||||
|
||||
|
@ -1074,6 +1077,8 @@ void cliSend(SCliConn* pConn) {
|
|||
memcpy(pHead->user, pTransInst->user, strlen(pTransInst->user));
|
||||
pHead->traceId = pMsg->info.traceId;
|
||||
pHead->magicNum = htonl(TRANS_MAGIC_NUM);
|
||||
pHead->version = TRANS_VER;
|
||||
pHead->compatibilityVer = htonl(pTransInst->compatibilityVer);
|
||||
}
|
||||
pHead->timestamp = taosHton64(taosGetTimestampUs());
|
||||
|
||||
|
@ -1346,6 +1351,7 @@ static void doNotifyApp(SCliMsg* pMsg, SCliThrd* pThrd) {
|
|||
transMsg.info.ahandle = pMsg->ctx->ahandle;
|
||||
transMsg.info.traceId = pMsg->msg.info.traceId;
|
||||
transMsg.info.hasEpSet = false;
|
||||
transMsg.info.cliVer = pTransInst->compatibilityVer;
|
||||
if (pCtx->pSem != NULL) {
|
||||
if (pCtx->pRsp == NULL) {
|
||||
} else {
|
||||
|
@ -1527,6 +1533,9 @@ void cliHandleReq(SCliMsg* pMsg, SCliThrd* pThrd) {
|
|||
// persist conn already release by server
|
||||
STransMsg resp;
|
||||
cliBuildExceptResp(pMsg, &resp);
|
||||
// refactorr later
|
||||
resp.info.cliVer = pTransInst->compatibilityVer;
|
||||
|
||||
if (pMsg->type != Release) {
|
||||
pTransInst->cfp(pTransInst->parent, &resp, NULL);
|
||||
}
|
||||
|
@ -1836,6 +1845,7 @@ void cliIteraConnMsgs(SCliConn* conn) {
|
|||
if (-1 == cliBuildExceptResp(cmsg, &resp)) {
|
||||
continue;
|
||||
}
|
||||
resp.info.cliVer = pTransInst->compatibilityVer;
|
||||
pTransInst->cfp(pTransInst->parent, &resp, NULL);
|
||||
|
||||
cmsg->ctx->ahandle = NULL;
|
||||
|
|
|
@ -192,7 +192,7 @@ bool transReadComplete(SConnBuffer* connBuf) {
|
|||
memcpy((char*)&head, connBuf->buf, sizeof(head));
|
||||
int32_t msgLen = (int32_t)htonl(head.msgLen);
|
||||
p->total = msgLen;
|
||||
p->invalid = TRANS_NOVALID_PACKET(htonl(head.magicNum));
|
||||
p->invalid = TRANS_NOVALID_PACKET(htonl(head.magicNum)) || head.version != TRANS_VER;
|
||||
}
|
||||
if (p->total >= p->len) {
|
||||
p->left = p->total - p->len;
|
||||
|
|
|
@ -196,6 +196,7 @@ static bool uvHandleReq(SSvrConn* pConn) {
|
|||
tError("%s conn %p recv invalid packet, failed to decompress", transLabel(pTransInst), pConn);
|
||||
return false;
|
||||
}
|
||||
tDebug("head version: %d 2", pHead->version);
|
||||
|
||||
pHead->code = htonl(pHead->code);
|
||||
pHead->msgLen = htonl(pHead->msgLen);
|
||||
|
@ -236,8 +237,8 @@ static bool uvHandleReq(SSvrConn* pConn) {
|
|||
if (pConn->status == ConnNormal && pHead->noResp == 0) {
|
||||
transRefSrvHandle(pConn);
|
||||
if (cost >= EXCEPTION_LIMIT_US) {
|
||||
tGDebug("%s conn %p %s received from %s, local info:%s, len:%d, cost:%dus, recv exception", transLabel(pTransInst),
|
||||
pConn, TMSG_INFO(transMsg.msgType), pConn->dst, pConn->src, msgLen, (int)cost);
|
||||
tGDebug("%s conn %p %s received from %s, local info:%s, len:%d, cost:%dus, recv exception",
|
||||
transLabel(pTransInst), pConn, TMSG_INFO(transMsg.msgType), pConn->dst, pConn->src, msgLen, (int)cost);
|
||||
} else {
|
||||
tGDebug("%s conn %p %s received from %s, local info:%s, len:%d, cost:%dus", transLabel(pTransInst), pConn,
|
||||
TMSG_INFO(transMsg.msgType), pConn->dst, pConn->src, msgLen, (int)cost);
|
||||
|
@ -245,8 +246,8 @@ static bool uvHandleReq(SSvrConn* pConn) {
|
|||
} else {
|
||||
if (cost >= EXCEPTION_LIMIT_US) {
|
||||
tGDebug("%s conn %p %s received from %s, local info:%s, len:%d, noResp:%d, code:%d, cost:%dus, recv exception",
|
||||
transLabel(pTransInst), pConn, TMSG_INFO(transMsg.msgType), pConn->dst, pConn->src, msgLen, pHead->noResp,
|
||||
transMsg.code, (int)(cost));
|
||||
transLabel(pTransInst), pConn, TMSG_INFO(transMsg.msgType), pConn->dst, pConn->src, msgLen, pHead->noResp,
|
||||
transMsg.code, (int)(cost));
|
||||
} else {
|
||||
tGDebug("%s conn %p %s received from %s, local info:%s, len:%d, noResp:%d, code:%d, cost:%dus",
|
||||
transLabel(pTransInst), pConn, TMSG_INFO(transMsg.msgType), pConn->dst, pConn->src, msgLen, pHead->noResp,
|
||||
|
@ -262,6 +263,7 @@ static bool uvHandleReq(SSvrConn* pConn) {
|
|||
transMsg.info.handle = (void*)transAcquireExHandle(transGetRefMgt(), pConn->refId);
|
||||
transMsg.info.refId = pConn->refId;
|
||||
transMsg.info.traceId = pHead->traceId;
|
||||
transMsg.info.cliVer = htonl(pHead->compatibilityVer);
|
||||
|
||||
tGTrace("%s handle %p conn:%p translated to app, refId:%" PRIu64, transLabel(pTransInst), transMsg.info.handle, pConn,
|
||||
pConn->refId);
|
||||
|
@ -410,6 +412,8 @@ static int uvPrepareSendData(SSvrMsg* smsg, uv_buf_t* wb) {
|
|||
pHead->traceId = pMsg->info.traceId;
|
||||
pHead->hasEpSet = pMsg->info.hasEpSet;
|
||||
pHead->magicNum = htonl(TRANS_MAGIC_NUM);
|
||||
pHead->compatibilityVer = htonl(((STrans*)pConn->pTransInst)->compatibilityVer);
|
||||
pHead->version = TRANS_VER;
|
||||
|
||||
// handle invalid drop_task resp, TD-20098
|
||||
if (pConn->inType == TDMT_SCH_DROP_TASK && pMsg->code == TSDB_CODE_VND_INVALID_VGROUP_ID) {
|
||||
|
|
|
@ -19,6 +19,7 @@
|
|||
#include "transLog.h"
|
||||
#include "trpc.h"
|
||||
#include "tutil.h"
|
||||
#include "tversion.h"
|
||||
|
||||
typedef struct {
|
||||
int index;
|
||||
|
@ -155,7 +156,7 @@ int main(int argc, char *argv[]) {
|
|||
}
|
||||
|
||||
initLogEnv();
|
||||
|
||||
taosVersionStrToInt(version, &(rpcInit.compatibilityVer));
|
||||
void *pRpc = rpcOpen(&rpcInit);
|
||||
if (pRpc == NULL) {
|
||||
tError("failed to initialize RPC");
|
||||
|
|
|
@ -13,12 +13,13 @@
|
|||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
//#define _DEFAULT_SOURCE
|
||||
// #define _DEFAULT_SOURCE
|
||||
#include "os.h"
|
||||
#include "tglobal.h"
|
||||
#include "tqueue.h"
|
||||
#include "transLog.h"
|
||||
#include "trpc.h"
|
||||
#include "tversion.h"
|
||||
|
||||
int msgSize = 128;
|
||||
int commit = 0;
|
||||
|
@ -151,6 +152,8 @@ int main(int argc, char *argv[]) {
|
|||
rpcInit.numOfThreads = 1;
|
||||
rpcInit.cfp = processRequestMsg;
|
||||
rpcInit.idleTime = 2 * 1500;
|
||||
|
||||
taosVersionStrToInt(version, &(rpcInit.compatibilityVer));
|
||||
rpcDebugFlag = 131;
|
||||
|
||||
for (int i = 1; i < argc; ++i) {
|
||||
|
@ -187,7 +190,7 @@ int main(int argc, char *argv[]) {
|
|||
rpcInit.connType = TAOS_CONN_SERVER;
|
||||
|
||||
initLogEnv();
|
||||
|
||||
taosVersionStrToInt(version, &(rpcInit.compatibilityVer));
|
||||
void *pRpc = rpcOpen(&rpcInit);
|
||||
if (pRpc == NULL) {
|
||||
tError("failed to start RPC server");
|
||||
|
|
|
@ -18,10 +18,10 @@
|
|||
#include "tdatablock.h"
|
||||
#include "tglobal.h"
|
||||
#include "tlog.h"
|
||||
#include "tmisce.h"
|
||||
#include "transLog.h"
|
||||
#include "trpc.h"
|
||||
#include "tmisce.h"
|
||||
|
||||
#include "tversion.h"
|
||||
using namespace std;
|
||||
|
||||
const char *label = "APP";
|
||||
|
@ -54,6 +54,8 @@ class Client {
|
|||
rpcInit_.user = (char *)user;
|
||||
rpcInit_.parent = this;
|
||||
rpcInit_.connType = TAOS_CONN_CLIENT;
|
||||
|
||||
taosVersionStrToInt(version, &(rpcInit_.compatibilityVer));
|
||||
this->transCli = rpcOpen(&rpcInit_);
|
||||
tsem_init(&this->sem, 0, 0);
|
||||
}
|
||||
|
@ -66,6 +68,7 @@ class Client {
|
|||
void Restart(CB cb) {
|
||||
rpcClose(this->transCli);
|
||||
rpcInit_.cfp = cb;
|
||||
taosVersionStrToInt(version, &(rpcInit_.compatibilityVer));
|
||||
this->transCli = rpcOpen(&rpcInit_);
|
||||
}
|
||||
void Stop() {
|
||||
|
@ -117,6 +120,7 @@ class Server {
|
|||
rpcInit_.cfp = processReq;
|
||||
rpcInit_.user = (char *)user;
|
||||
rpcInit_.connType = TAOS_CONN_SERVER;
|
||||
taosVersionStrToInt(version, &(rpcInit_.compatibilityVer));
|
||||
}
|
||||
void Start() {
|
||||
this->transSrv = rpcOpen(&this->rpcInit_);
|
||||
|
|
|
@ -151,9 +151,10 @@ class TDTestCase:
|
|||
os.system("LD_LIBRARY_PATH=/usr/lib taos -s 'flush database db4096 '")
|
||||
os.system("LD_LIBRARY_PATH=/usr/lib taos -f 0-others/TS-3131.tsql")
|
||||
|
||||
cmd = f" LD_LIBRARY_PATH={bPath}/build/lib {bPath}/build/bin/taos -h localhost ;"
|
||||
if os.system(cmd) == 0:
|
||||
raise Exception("failed to execute system command. cmd: %s" % cmd)
|
||||
# cmd = f" LD_LIBRARY_PATH={bPath}/build/lib {bPath}/build/bin/taos -h localhost ;"
|
||||
# tdLog.info(f"new client version connect to old version taosd, commad return value:{cmd}")
|
||||
# if os.system(cmd) == 0:
|
||||
# raise Exception("failed to execute system command. cmd: %s" % cmd)
|
||||
|
||||
os.system("pkill taosd") # make sure all the data are saved in disk.
|
||||
self.checkProcessPid("taosd")
|
||||
|
|
|
@ -256,6 +256,7 @@ class TDTestCase:
|
|||
tdLog.info("all consumers status into 'lost'")
|
||||
|
||||
# drop consumer groups
|
||||
tdLog.info("drop all consumers")
|
||||
for i in range(len(groupIdList)):
|
||||
for j in range(len(topicNameList)):
|
||||
sqlCmd = f"drop consumer group `%s` on %s"%(groupIdList[i], topicNameList[j])
|
||||
|
@ -265,7 +266,17 @@ class TDTestCase:
|
|||
tmqCom.g_end_insert_flag = 1
|
||||
tdLog.debug("notify sub-thread to stop insert data")
|
||||
pThread.join()
|
||||
|
||||
tdSql.query('show consumers;')
|
||||
consumerNUm = tdSql.queryRows
|
||||
|
||||
tdSql.query('show subscriptions;')
|
||||
subscribeNum = tdSql.queryRows
|
||||
|
||||
if (0 != consumerNUm or 0 != subscribeNum):
|
||||
tdLog.exit("drop consumer fail! consumerNUm %d, subscribeNum: %d"%(consumerNUm, subscribeNum))
|
||||
|
||||
tdLog.info("drop consuer success, there is no consumers and subscribes")
|
||||
tdLog.printNoPrefix("======== test case 1 end ...... ")
|
||||
|
||||
def run(self):
|
||||
|
|
|
@ -15,6 +15,7 @@
|
|||
|
||||
#define _GNU_SOURCE
|
||||
#include "shellInt.h"
|
||||
#include "tversion.h"
|
||||
|
||||
static void shellWorkAsClient() {
|
||||
SShellArgs *pArgs = &shell.args;
|
||||
|
@ -33,6 +34,7 @@ static void shellWorkAsClient() {
|
|||
rpcInit.user = "_dnd";
|
||||
rpcInit.timeToGetConn = tsTimeToGetAvailableConn;
|
||||
|
||||
taosVersionStrToInt(version, &(rpcInit.compatibilityVer));
|
||||
clientRpc = rpcOpen(&rpcInit);
|
||||
if (clientRpc == NULL) {
|
||||
printf("failed to init net test client since %s\r\n", terrstr());
|
||||
|
@ -123,6 +125,8 @@ static void shellWorkAsServer() {
|
|||
rpcInit.connType = TAOS_CONN_SERVER;
|
||||
rpcInit.idleTime = tsShellActivityTimer * 1000;
|
||||
|
||||
taosVersionStrToInt(version, &(rpcInit.compatibilityVer));
|
||||
|
||||
void *serverRpc = rpcOpen(&rpcInit);
|
||||
if (serverRpc == NULL) {
|
||||
printf("failed to init net test server since %s\r\n", terrstr());
|
||||
|
|
Loading…
Reference in New Issue