feat: add new api
This commit is contained in:
parent
33452da7bd
commit
abc69e1e2b
|
@ -128,7 +128,6 @@ typedef struct setConfRet {
|
||||||
|
|
||||||
DLL_EXPORT void taos_cleanup(void);
|
DLL_EXPORT void taos_cleanup(void);
|
||||||
DLL_EXPORT int taos_options(TSDB_OPTION option, const void *arg, ...);
|
DLL_EXPORT int taos_options(TSDB_OPTION option, const void *arg, ...);
|
||||||
DLL_EXPORT setConfRet taos_set_config(const char *config);
|
|
||||||
DLL_EXPORT int taos_init(void);
|
DLL_EXPORT int taos_init(void);
|
||||||
DLL_EXPORT TAOS *taos_connect(const char *ip, const char *user, const char *pass, const char *db, uint16_t port);
|
DLL_EXPORT TAOS *taos_connect(const char *ip, const char *user, const char *pass, const char *db, uint16_t port);
|
||||||
DLL_EXPORT TAOS *taos_connect_auth(const char *ip, const char *user, const char *auth, const char *db, uint16_t port);
|
DLL_EXPORT TAOS *taos_connect_auth(const char *ip, const char *user, const char *auth, const char *db, uint16_t port);
|
||||||
|
|
|
@ -149,6 +149,7 @@ void taosCfgDynamicOptions(const char *option, const char *value);
|
||||||
void taosAddDataDir(int32_t index, char *v1, int32_t level, int32_t primary);
|
void taosAddDataDir(int32_t index, char *v1, int32_t level, int32_t primary);
|
||||||
|
|
||||||
struct SConfig *taosGetCfg();
|
struct SConfig *taosGetCfg();
|
||||||
|
int32_t taosSetCfg(SConfig *pCfg, char* name);
|
||||||
|
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
}
|
}
|
||||||
|
|
|
@ -57,6 +57,8 @@ void tNameAssign(SName* dst, const SName* src);
|
||||||
|
|
||||||
int32_t tNameSetDbName(SName* dst, int32_t acctId, const char* dbName, size_t nameLen);
|
int32_t tNameSetDbName(SName* dst, int32_t acctId, const char* dbName, size_t nameLen);
|
||||||
|
|
||||||
|
int32_t tNameAddTbName(SName* dst, const char* tbName, size_t nameLen);
|
||||||
|
|
||||||
int32_t tNameFromString(SName* dst, const char* str, uint32_t type);
|
int32_t tNameFromString(SName* dst, const char* str, uint32_t type);
|
||||||
|
|
||||||
int32_t tNameSetAcctId(SName* dst, int32_t acctId);
|
int32_t tNameSetAcctId(SName* dst, int32_t acctId);
|
||||||
|
|
|
@ -266,19 +266,19 @@ extern int32_t (*queryProcessMsgRsp[TDMT_MAX])(void* output, char* msg, int32_t
|
||||||
#define qDebug(...) \
|
#define qDebug(...) \
|
||||||
do { \
|
do { \
|
||||||
if (qDebugFlag & DEBUG_DEBUG) { \
|
if (qDebugFlag & DEBUG_DEBUG) { \
|
||||||
taosPrintLog("QRY ", DEBUG_DEBUG, tsLogEmbedded ? 255 : qDebugFlag, __VA_ARGS__); \
|
taosPrintLog("QRY ", DEBUG_DEBUG, qDebugFlag, __VA_ARGS__); \
|
||||||
} \
|
} \
|
||||||
} while (0)
|
} while (0)
|
||||||
#define qTrace(...) \
|
#define qTrace(...) \
|
||||||
do { \
|
do { \
|
||||||
if (qDebugFlag & DEBUG_TRACE) { \
|
if (qDebugFlag & DEBUG_TRACE) { \
|
||||||
taosPrintLog("QRY ", DEBUG_TRACE, tsLogEmbedded ? 255 : qDebugFlag, __VA_ARGS__); \
|
taosPrintLog("QRY ", DEBUG_TRACE, qDebugFlag, __VA_ARGS__); \
|
||||||
} \
|
} \
|
||||||
} while (0)
|
} while (0)
|
||||||
#define qDebugL(...) \
|
#define qDebugL(...) \
|
||||||
do { \
|
do { \
|
||||||
if (qDebugFlag & DEBUG_DEBUG) { \
|
if (qDebugFlag & DEBUG_DEBUG) { \
|
||||||
taosPrintLongString("QRY ", DEBUG_DEBUG, tsLogEmbedded ? 255 : qDebugFlag, __VA_ARGS__); \
|
taosPrintLongString("QRY ", DEBUG_DEBUG, qDebugFlag, __VA_ARGS__); \
|
||||||
} \
|
} \
|
||||||
} while (0)
|
} while (0)
|
||||||
|
|
||||||
|
|
|
@ -215,6 +215,7 @@ typedef struct SRequestObj {
|
||||||
SQueryExecMetric metric;
|
SQueryExecMetric metric;
|
||||||
SRequestSendRecvBody body;
|
SRequestSendRecvBody body;
|
||||||
bool stableQuery;
|
bool stableQuery;
|
||||||
|
bool validateOnly;
|
||||||
|
|
||||||
bool killed;
|
bool killed;
|
||||||
uint32_t prevCode; // previous error code: todo refactor, add update flag for catalog
|
uint32_t prevCode; // previous error code: todo refactor, add update flag for catalog
|
||||||
|
@ -235,7 +236,12 @@ int32_t setQueryResultFromRsp(SReqResultInfo* pResultInfo, const SRetrieveT
|
||||||
bool freeAfterUse);
|
bool freeAfterUse);
|
||||||
void setResSchemaInfo(SReqResultInfo* pResInfo, const SSchema* pSchema, int32_t numOfCols);
|
void setResSchemaInfo(SReqResultInfo* pResInfo, const SSchema* pSchema, int32_t numOfCols);
|
||||||
void doFreeReqResultInfo(SReqResultInfo* pResInfo);
|
void doFreeReqResultInfo(SReqResultInfo* pResInfo);
|
||||||
SRequestObj* execQuery(STscObj* pTscObj, const char* sql, int sqlLen);
|
int32_t transferTableNameList(const char* tbList, int32_t acctId, char* dbName, SArray** pReq);
|
||||||
|
void syncCatalogFn(SMetaData* pResult, void* param, int32_t code);
|
||||||
|
|
||||||
|
SRequestObj* execQuery(STscObj* pTscObj, const char* sql, int sqlLen, bool validateOnly);
|
||||||
|
TAOS_RES *taosQueryImpl(TAOS *taos, const char *sql, bool validateOnly);
|
||||||
|
void taosAsyncQueryImpl(TAOS *taos, const char *sql, __taos_async_fn_t fp, void *param, bool validateOnly);
|
||||||
|
|
||||||
static FORCE_INLINE SReqResultInfo* tmqGetCurResInfo(TAOS_RES* res) {
|
static FORCE_INLINE SReqResultInfo* tmqGetCurResInfo(TAOS_RES* res) {
|
||||||
SMqRspObj* msg = (SMqRspObj*)res;
|
SMqRspObj* msg = (SMqRspObj*)res;
|
||||||
|
@ -301,7 +307,7 @@ void processMsgFromServer(void* parent, SRpcMsg* pMsg, SEpSet* pEpSet);
|
||||||
STscObj* taos_connect_internal(const char* ip, const char* user, const char* pass, const char* auth, const char* db,
|
STscObj* taos_connect_internal(const char* ip, const char* user, const char* pass, const char* auth, const char* db,
|
||||||
uint16_t port, int connType);
|
uint16_t port, int connType);
|
||||||
|
|
||||||
SRequestObj* launchQuery(STscObj* pTscObj, const char* sql, int sqlLen);
|
SRequestObj* launchQuery(STscObj* pTscObj, const char* sql, int sqlLen, bool validateOnly);
|
||||||
|
|
||||||
int32_t parseSql(SRequestObj* pRequest, bool topicQuery, SQuery** pQuery, SStmtCallback* pStmtCb);
|
int32_t parseSql(SRequestObj* pRequest, bool topicQuery, SQuery** pQuery, SStmtCallback* pStmtCb);
|
||||||
|
|
||||||
|
|
|
@ -237,6 +237,10 @@ static SAppInstInfo* getAppInfo(SRequestObj* pRequest) { return pRequest->pTscOb
|
||||||
|
|
||||||
void asyncExecLocalCmd(SRequestObj* pRequest, SQuery* pQuery) {
|
void asyncExecLocalCmd(SRequestObj* pRequest, SQuery* pQuery) {
|
||||||
SRetrieveTableRsp* pRsp = NULL;
|
SRetrieveTableRsp* pRsp = NULL;
|
||||||
|
if (pRequest->validateOnly) {
|
||||||
|
pRequest->body.queryFp(pRequest->body.param, pRequest, 0);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
int32_t code = qExecCommand(pQuery->pRoot, &pRsp);
|
int32_t code = qExecCommand(pQuery->pRoot, &pRsp);
|
||||||
if (TSDB_CODE_SUCCESS == code && NULL != pRsp) {
|
if (TSDB_CODE_SUCCESS == code && NULL != pRsp) {
|
||||||
|
@ -261,6 +265,11 @@ void asyncExecLocalCmd(SRequestObj* pRequest, SQuery* pQuery) {
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t asyncExecDdlQuery(SRequestObj* pRequest, SQuery* pQuery) {
|
int32_t asyncExecDdlQuery(SRequestObj* pRequest, SQuery* pQuery) {
|
||||||
|
if (pRequest->validateOnly) {
|
||||||
|
pRequest->body.queryFp(pRequest->body.param, pRequest, 0);
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
// drop table if exists not_exists_table
|
// drop table if exists not_exists_table
|
||||||
if (NULL == pQuery->pCmdMsg) {
|
if (NULL == pQuery->pCmdMsg) {
|
||||||
pRequest->body.queryFp(pRequest->body.param, pRequest, 0);
|
pRequest->body.queryFp(pRequest->body.param, pRequest, 0);
|
||||||
|
@ -276,8 +285,11 @@ int32_t asyncExecDdlQuery(SRequestObj* pRequest, SQuery* pQuery) {
|
||||||
SMsgSendInfo* pSendMsg = buildMsgInfoImpl(pRequest);
|
SMsgSendInfo* pSendMsg = buildMsgInfoImpl(pRequest);
|
||||||
|
|
||||||
int64_t transporterId = 0;
|
int64_t transporterId = 0;
|
||||||
asyncSendMsgToServer(pAppInfo->pTransporter, &pMsgInfo->epSet, &transporterId, pSendMsg);
|
int32_t code = asyncSendMsgToServer(pAppInfo->pTransporter, &pMsgInfo->epSet, &transporterId, pSendMsg);
|
||||||
return TSDB_CODE_SUCCESS;
|
if (code) {
|
||||||
|
pRequest->body.queryFp(pRequest->body.param, pRequest, code);
|
||||||
|
}
|
||||||
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
int compareQueryNodeLoad(const void* elem1, const void* elem2) {
|
int compareQueryNodeLoad(const void* elem1, const void* elem2) {
|
||||||
|
@ -851,15 +863,19 @@ SRequestObj* launchQueryImpl(SRequestObj* pRequest, SQuery* pQuery, bool keepQue
|
||||||
|
|
||||||
switch (pQuery->execMode) {
|
switch (pQuery->execMode) {
|
||||||
case QUERY_EXEC_MODE_LOCAL:
|
case QUERY_EXEC_MODE_LOCAL:
|
||||||
code = execLocalCmd(pRequest, pQuery);
|
if (!pRequest->validateOnly) {
|
||||||
|
code = execLocalCmd(pRequest, pQuery);
|
||||||
|
}
|
||||||
break;
|
break;
|
||||||
case QUERY_EXEC_MODE_RPC:
|
case QUERY_EXEC_MODE_RPC:
|
||||||
code = execDdlQuery(pRequest, pQuery);
|
if (!pRequest->validateOnly) {
|
||||||
|
code = execDdlQuery(pRequest, pQuery);
|
||||||
|
}
|
||||||
break;
|
break;
|
||||||
case QUERY_EXEC_MODE_SCHEDULE: {
|
case QUERY_EXEC_MODE_SCHEDULE: {
|
||||||
SArray* pMnodeList = taosArrayInit(4, sizeof(SQueryNodeLoad));
|
SArray* pMnodeList = taosArrayInit(4, sizeof(SQueryNodeLoad));
|
||||||
code = getPlan(pRequest, pQuery, &pRequest->body.pDag, pMnodeList);
|
code = getPlan(pRequest, pQuery, &pRequest->body.pDag, pMnodeList);
|
||||||
if (TSDB_CODE_SUCCESS == code) {
|
if (TSDB_CODE_SUCCESS == code && !pRequest->validateOnly) {
|
||||||
SArray* pNodeList = NULL;
|
SArray* pNodeList = NULL;
|
||||||
buildSyncExecNodeList(pRequest, &pNodeList, pMnodeList);
|
buildSyncExecNodeList(pRequest, &pNodeList, pMnodeList);
|
||||||
|
|
||||||
|
@ -894,7 +910,7 @@ SRequestObj* launchQueryImpl(SRequestObj* pRequest, SQuery* pQuery, bool keepQue
|
||||||
return pRequest;
|
return pRequest;
|
||||||
}
|
}
|
||||||
|
|
||||||
SRequestObj* launchQuery(STscObj* pTscObj, const char* sql, int sqlLen) {
|
SRequestObj* launchQuery(STscObj* pTscObj, const char* sql, int sqlLen, bool validateOnly) {
|
||||||
SRequestObj* pRequest = NULL;
|
SRequestObj* pRequest = NULL;
|
||||||
SQuery* pQuery = NULL;
|
SQuery* pQuery = NULL;
|
||||||
|
|
||||||
|
@ -904,6 +920,8 @@ SRequestObj* launchQuery(STscObj* pTscObj, const char* sql, int sqlLen) {
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pRequest->validateOnly = validateOnly;
|
||||||
|
|
||||||
code = parseSql(pRequest, false, &pQuery, NULL);
|
code = parseSql(pRequest, false, &pQuery, NULL);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
pRequest->code = code;
|
pRequest->code = code;
|
||||||
|
@ -945,7 +963,7 @@ void launchAsyncQuery(SRequestObj* pRequest, SQuery* pQuery, SMetaData *pResultM
|
||||||
pRequest->requestId);
|
pRequest->requestId);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (TSDB_CODE_SUCCESS == code) {
|
if (TSDB_CODE_SUCCESS == code && !pRequest->validateOnly) {
|
||||||
SArray* pNodeList = NULL;
|
SArray* pNodeList = NULL;
|
||||||
buildAsyncExecNodeList(pRequest, &pNodeList, pMnodeList, pResultMeta);
|
buildAsyncExecNodeList(pRequest, &pNodeList, pMnodeList, pResultMeta);
|
||||||
|
|
||||||
|
@ -962,7 +980,7 @@ void launchAsyncQuery(SRequestObj* pRequest, SQuery* pQuery, SMetaData *pResultM
|
||||||
code = schedulerAsyncExecJob(&req, &pRequest->body.queryJob);
|
code = schedulerAsyncExecJob(&req, &pRequest->body.queryJob);
|
||||||
taosArrayDestroy(pNodeList);
|
taosArrayDestroy(pNodeList);
|
||||||
} else {
|
} else {
|
||||||
tscError("0x%" PRIx64 " failed to create query plan, code:%s 0x%" PRIx64, pRequest->self, tstrerror(code),
|
tscDebug("0x%" PRIx64 " plan not executed, code:%s 0x%" PRIx64, pRequest->self, tstrerror(code),
|
||||||
pRequest->requestId);
|
pRequest->requestId);
|
||||||
pRequest->body.queryFp(pRequest->body.param, pRequest, code);
|
pRequest->body.queryFp(pRequest->body.param, pRequest, code);
|
||||||
}
|
}
|
||||||
|
@ -1045,14 +1063,14 @@ int32_t removeMeta(STscObj* pTscObj, SArray* tbList) {
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
SRequestObj* execQuery(STscObj* pTscObj, const char* sql, int sqlLen) {
|
SRequestObj* execQuery(STscObj* pTscObj, const char* sql, int sqlLen, bool validateOnly) {
|
||||||
SRequestObj* pRequest = NULL;
|
SRequestObj* pRequest = NULL;
|
||||||
int32_t retryNum = 0;
|
int32_t retryNum = 0;
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
|
|
||||||
do {
|
do {
|
||||||
destroyRequest(pRequest);
|
destroyRequest(pRequest);
|
||||||
pRequest = launchQuery(pTscObj, sql, sqlLen);
|
pRequest = launchQuery(pTscObj, sql, sqlLen, validateOnly);
|
||||||
if (pRequest == NULL || TSDB_CODE_SUCCESS == pRequest->code || !NEED_CLIENT_HANDLE_ERROR(pRequest->code)) {
|
if (pRequest == NULL || TSDB_CODE_SUCCESS == pRequest->code || !NEED_CLIENT_HANDLE_ERROR(pRequest->code)) {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
@ -1819,3 +1837,251 @@ _OVER:
|
||||||
}
|
}
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int32_t appendTbToReq(SArray* pList, int32_t pos1, int32_t len1, int32_t pos2, int32_t len2, const char* str, int32_t acctId, char* db) {
|
||||||
|
SName name;
|
||||||
|
|
||||||
|
if (len1 <= 0) {
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
const char *dbName = db;
|
||||||
|
const char *tbName = NULL;
|
||||||
|
int32_t dbLen = 0;
|
||||||
|
int32_t tbLen = 0;
|
||||||
|
if (len2 > 0) {
|
||||||
|
dbName = str + pos1;
|
||||||
|
dbLen = len1;
|
||||||
|
tbName = str + pos2;
|
||||||
|
tbLen = len2;
|
||||||
|
} else {
|
||||||
|
dbLen = strlen(db);
|
||||||
|
tbName = str + pos1;
|
||||||
|
tbLen = len1;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (tNameSetDbName(&name, acctId, dbName, dbLen)) {
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (tNameAddTbName(&name, tbName, tbLen)) {
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
taosArrayPush(pList, &name);
|
||||||
|
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t transferTableNameList(const char* tbList, int32_t acctId, char* dbName, SArray** pReq) {
|
||||||
|
*pReq = taosArrayInit(10, sizeof(SName));
|
||||||
|
if (NULL == *pReq) {
|
||||||
|
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
return terrno;
|
||||||
|
}
|
||||||
|
|
||||||
|
bool inEscape = false;
|
||||||
|
int32_t code = 0;
|
||||||
|
|
||||||
|
int32_t vIdx = 0;
|
||||||
|
int32_t vPos[2];
|
||||||
|
int32_t vLen[2];
|
||||||
|
|
||||||
|
memset(vPos, -1, sizeof(vPos));
|
||||||
|
memset(vLen, 0, sizeof(vLen));
|
||||||
|
|
||||||
|
for (int32_t i = 0; ; ++i) {
|
||||||
|
if (0 == *(tbList + i)) {
|
||||||
|
if (vPos[vIdx] >= 0 && vLen[vIdx] <= 0) {
|
||||||
|
vLen[vIdx] = i - vPos[vIdx];
|
||||||
|
}
|
||||||
|
|
||||||
|
code = appendTbToReq(*pReq, vPos[0], vLen[0], vPos[1], vLen[1], tbList, acctId, dbName);
|
||||||
|
if (code) {
|
||||||
|
goto _return;
|
||||||
|
}
|
||||||
|
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
if ('`' == *(tbList + i)) {
|
||||||
|
inEscape = !inEscape;
|
||||||
|
if (!inEscape) {
|
||||||
|
if (vPos[vIdx] >= 0) {
|
||||||
|
vLen[vIdx] = i - vPos[vIdx];
|
||||||
|
} else {
|
||||||
|
goto _return;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (inEscape) {
|
||||||
|
if (vPos[vIdx] < 0) {
|
||||||
|
vPos[vIdx] = i;
|
||||||
|
}
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
if ('.' == *(tbList + i)) {
|
||||||
|
if (vPos[vIdx] < 0) {
|
||||||
|
goto _return;
|
||||||
|
}
|
||||||
|
if (vLen[vIdx] <= 0) {
|
||||||
|
vLen[vIdx] = i - vPos[vIdx];
|
||||||
|
}
|
||||||
|
vIdx++;
|
||||||
|
if (vIdx >= 2) {
|
||||||
|
goto _return;
|
||||||
|
}
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (',' == *(tbList + i)) {
|
||||||
|
if (vPos[vIdx] < 0) {
|
||||||
|
goto _return;
|
||||||
|
}
|
||||||
|
if (vLen[vIdx] <= 0) {
|
||||||
|
vLen[vIdx] = i - vPos[vIdx];
|
||||||
|
}
|
||||||
|
|
||||||
|
code = appendTbToReq(*pReq, vPos[0], vLen[0], vPos[1], vLen[1], tbList, acctId, dbName);
|
||||||
|
if (code) {
|
||||||
|
goto _return;
|
||||||
|
}
|
||||||
|
|
||||||
|
memset(vPos, -1, sizeof(vPos));
|
||||||
|
memset(vLen, 0, sizeof(vLen));
|
||||||
|
vIdx = 0;
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (' ' == *(tbList + i) || '\r' == *(tbList + i) || '\t' == *(tbList + i) || '\n' == *(tbList + i)) {
|
||||||
|
if (vPos[vIdx] >= 0 && vLen[vIdx] <= 0) {
|
||||||
|
vLen[vIdx] = i - vPos[vIdx];
|
||||||
|
}
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (('a' <= *(tbList + i) && 'z' >= *(tbList + i)) ||
|
||||||
|
('A' <= *(tbList + i) && 'Z' >= *(tbList + i)) ||
|
||||||
|
('0' <= *(tbList + i) && '9' >= *(tbList + i))) {
|
||||||
|
if (vLen[vIdx] > 0) {
|
||||||
|
goto _return;
|
||||||
|
}
|
||||||
|
if (vPos[vIdx] < 0) {
|
||||||
|
vPos[vIdx] = i;
|
||||||
|
}
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
goto _return;
|
||||||
|
}
|
||||||
|
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
|
||||||
|
_return:
|
||||||
|
|
||||||
|
terrno = TSDB_CODE_TSC_INVALID_OPERATION;
|
||||||
|
|
||||||
|
taosArrayDestroy(*pReq);
|
||||||
|
*pReq = NULL;
|
||||||
|
|
||||||
|
return terrno;
|
||||||
|
}
|
||||||
|
|
||||||
|
void syncCatalogFn(SMetaData* pResult, void* param, int32_t code) {
|
||||||
|
SSyncQueryParam *pParam = param;
|
||||||
|
pParam->pRequest->code = code;
|
||||||
|
|
||||||
|
tsem_post(&pParam->sem);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
void syncQueryFn(void *param, void *res, int32_t code) {
|
||||||
|
SSyncQueryParam *pParam = param;
|
||||||
|
pParam->pRequest = res;
|
||||||
|
pParam->pRequest->code = code;
|
||||||
|
|
||||||
|
tsem_post(&pParam->sem);
|
||||||
|
}
|
||||||
|
|
||||||
|
void taosAsyncQueryImpl(TAOS *taos, const char *sql, __taos_async_fn_t fp, void *param, bool validateOnly) {
|
||||||
|
STscObj *pTscObj = acquireTscObj(*(int64_t *)taos);
|
||||||
|
if (pTscObj == NULL || sql == NULL || NULL == fp) {
|
||||||
|
terrno = TSDB_CODE_INVALID_PARA;
|
||||||
|
if (pTscObj) {
|
||||||
|
releaseTscObj(*(int64_t *)taos);
|
||||||
|
} else {
|
||||||
|
terrno = TSDB_CODE_TSC_DISCONNECTED;
|
||||||
|
}
|
||||||
|
fp(param, NULL, terrno);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
size_t sqlLen = strlen(sql);
|
||||||
|
if (sqlLen > (size_t)TSDB_MAX_ALLOWED_SQL_LEN) {
|
||||||
|
tscError("sql string exceeds max length:%d", TSDB_MAX_ALLOWED_SQL_LEN);
|
||||||
|
terrno = TSDB_CODE_TSC_EXCEED_SQL_LIMIT;
|
||||||
|
|
||||||
|
fp(param, NULL, terrno);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
SRequestObj *pRequest = NULL;
|
||||||
|
int32_t code = buildRequest(pTscObj, sql, sqlLen, &pRequest);
|
||||||
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
terrno = code;
|
||||||
|
fp(param, NULL, terrno);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
pRequest->validateOnly = validateOnly;
|
||||||
|
pRequest->body.queryFp = fp;
|
||||||
|
pRequest->body.param = param;
|
||||||
|
doAsyncQuery(pRequest, false);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
TAOS_RES *taosQueryImpl(TAOS *taos, const char *sql, bool validateOnly) {
|
||||||
|
if (NULL == taos) {
|
||||||
|
terrno = TSDB_CODE_TSC_DISCONNECTED;
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
STscObj *pTscObj = acquireTscObj(*(int64_t *)taos);
|
||||||
|
if (pTscObj == NULL || sql == NULL) {
|
||||||
|
terrno = TSDB_CODE_TSC_DISCONNECTED;
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
#if SYNC_ON_TOP_OF_ASYNC
|
||||||
|
SSyncQueryParam *param = taosMemoryCalloc(1, sizeof(SSyncQueryParam));
|
||||||
|
tsem_init(¶m->sem, 0, 0);
|
||||||
|
|
||||||
|
taosAsyncQueryImpl(taos, sql, syncQueryFn, param, validateOnly);
|
||||||
|
tsem_wait(¶m->sem);
|
||||||
|
|
||||||
|
releaseTscObj(*(int64_t *)taos);
|
||||||
|
|
||||||
|
return param->pRequest;
|
||||||
|
#else
|
||||||
|
size_t sqlLen = strlen(sql);
|
||||||
|
if (sqlLen > (size_t)TSDB_MAX_ALLOWED_SQL_LEN) {
|
||||||
|
releaseTscObj(*(int64_t *)taos);
|
||||||
|
tscError("sql string exceeds max length:%d", TSDB_MAX_ALLOWED_SQL_LEN);
|
||||||
|
terrno = TSDB_CODE_TSC_EXCEED_SQL_LIMIT;
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
TAOS_RES *pRes = execQuery(pTscObj, sql, sqlLen, validateOnly);
|
||||||
|
|
||||||
|
releaseTscObj(*(int64_t *)taos);
|
||||||
|
|
||||||
|
return pRes;
|
||||||
|
#endif
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -81,12 +81,6 @@ void taos_cleanup(void) {
|
||||||
taosCloseLog();
|
taosCloseLog();
|
||||||
}
|
}
|
||||||
|
|
||||||
setConfRet taos_set_config(const char *config) {
|
|
||||||
// TODO
|
|
||||||
setConfRet ret = {SET_CONF_RET_SUCC, {0}};
|
|
||||||
return ret;
|
|
||||||
}
|
|
||||||
|
|
||||||
TAOS *taos_connect(const char *ip, const char *user, const char *pass, const char *db, uint16_t port) {
|
TAOS *taos_connect(const char *ip, const char *user, const char *pass, const char *db, uint16_t port) {
|
||||||
tscDebug("try to connect to %s:%u, user:%s db:%s", ip, port, user, db);
|
tscDebug("try to connect to %s:%u, user:%s db:%s", ip, port, user, db);
|
||||||
if (user == NULL) {
|
if (user == NULL) {
|
||||||
|
@ -205,51 +199,9 @@ TAOS_FIELD *taos_fetch_fields(TAOS_RES *res) {
|
||||||
return pResInfo->userFields;
|
return pResInfo->userFields;
|
||||||
}
|
}
|
||||||
|
|
||||||
static void syncQueryFn(void *param, void *res, int32_t code) {
|
|
||||||
SSyncQueryParam *pParam = param;
|
|
||||||
pParam->pRequest = res;
|
|
||||||
pParam->pRequest->code = code;
|
|
||||||
|
|
||||||
tsem_post(&pParam->sem);
|
|
||||||
}
|
|
||||||
|
|
||||||
TAOS_RES *taos_query(TAOS *taos, const char *sql) {
|
TAOS_RES *taos_query(TAOS *taos, const char *sql) {
|
||||||
if (NULL == taos) {
|
return taosQueryImpl(taos, sql, false);
|
||||||
terrno = TSDB_CODE_TSC_DISCONNECTED;
|
|
||||||
return NULL;
|
|
||||||
}
|
|
||||||
|
|
||||||
STscObj *pTscObj = acquireTscObj(*(int64_t *)taos);
|
|
||||||
if (pTscObj == NULL || sql == NULL) {
|
|
||||||
terrno = TSDB_CODE_TSC_DISCONNECTED;
|
|
||||||
return NULL;
|
|
||||||
}
|
|
||||||
|
|
||||||
#if SYNC_ON_TOP_OF_ASYNC
|
|
||||||
SSyncQueryParam *param = taosMemoryCalloc(1, sizeof(SSyncQueryParam));
|
|
||||||
tsem_init(¶m->sem, 0, 0);
|
|
||||||
|
|
||||||
taos_query_a(taos, sql, syncQueryFn, param);
|
|
||||||
tsem_wait(¶m->sem);
|
|
||||||
|
|
||||||
releaseTscObj(*(int64_t *)taos);
|
|
||||||
|
|
||||||
return param->pRequest;
|
|
||||||
#else
|
|
||||||
size_t sqlLen = strlen(sql);
|
|
||||||
if (sqlLen > (size_t)TSDB_MAX_ALLOWED_SQL_LEN) {
|
|
||||||
releaseTscObj(*(int64_t *)taos);
|
|
||||||
tscError("sql string exceeds max length:%d", TSDB_MAX_ALLOWED_SQL_LEN);
|
|
||||||
terrno = TSDB_CODE_TSC_EXCEED_SQL_LIMIT;
|
|
||||||
return NULL;
|
|
||||||
}
|
|
||||||
|
|
||||||
TAOS_RES *pRes = execQuery(pTscObj, sql, sqlLen);
|
|
||||||
|
|
||||||
releaseTscObj(*(int64_t *)taos);
|
|
||||||
|
|
||||||
return pRes;
|
|
||||||
#endif
|
|
||||||
}
|
}
|
||||||
|
|
||||||
TAOS_ROW taos_fetch_row(TAOS_RES *res) {
|
TAOS_ROW taos_fetch_row(TAOS_RES *res) {
|
||||||
|
@ -639,7 +591,14 @@ int *taos_get_column_data_offset(TAOS_RES *res, int columnIndex) {
|
||||||
return pResInfo->pCol[columnIndex].offset;
|
return pResInfo->pCol[columnIndex].offset;
|
||||||
}
|
}
|
||||||
|
|
||||||
int taos_validate_sql(TAOS *taos, const char *sql) { return true; }
|
int taos_validate_sql(TAOS *taos, const char *sql) {
|
||||||
|
TAOS_RES* pObj = taosQueryImpl(taos, sql, true);
|
||||||
|
|
||||||
|
int code = taos_errno(pObj);
|
||||||
|
|
||||||
|
taos_free_result(pObj);
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
void taos_reset_current_db(TAOS *taos) {
|
void taos_reset_current_db(TAOS *taos) {
|
||||||
STscObj *pTscObj = acquireTscObj(*(int64_t *)taos);
|
STscObj *pTscObj = acquireTscObj(*(int64_t *)taos);
|
||||||
|
@ -729,38 +688,7 @@ void retrieveMetaCallback(SMetaData *pResultMeta, void *param, int32_t code) {
|
||||||
}
|
}
|
||||||
|
|
||||||
void taos_query_a(TAOS *taos, const char *sql, __taos_async_fn_t fp, void *param) {
|
void taos_query_a(TAOS *taos, const char *sql, __taos_async_fn_t fp, void *param) {
|
||||||
STscObj *pTscObj = acquireTscObj(*(int64_t *)taos);
|
taosAsyncQueryImpl(taos, sql, fp, param, false);
|
||||||
if (pTscObj == NULL || sql == NULL || NULL == fp) {
|
|
||||||
terrno = TSDB_CODE_INVALID_PARA;
|
|
||||||
if (pTscObj) {
|
|
||||||
releaseTscObj(*(int64_t *)taos);
|
|
||||||
} else {
|
|
||||||
terrno = TSDB_CODE_TSC_DISCONNECTED;
|
|
||||||
}
|
|
||||||
fp(param, NULL, terrno);
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
size_t sqlLen = strlen(sql);
|
|
||||||
if (sqlLen > (size_t)TSDB_MAX_ALLOWED_SQL_LEN) {
|
|
||||||
tscError("sql string exceeds max length:%d", TSDB_MAX_ALLOWED_SQL_LEN);
|
|
||||||
terrno = TSDB_CODE_TSC_EXCEED_SQL_LIMIT;
|
|
||||||
|
|
||||||
fp(param, NULL, terrno);
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
SRequestObj *pRequest = NULL;
|
|
||||||
int32_t code = buildRequest(pTscObj, sql, sqlLen, &pRequest);
|
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
|
||||||
terrno = code;
|
|
||||||
fp(param, NULL, terrno);
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
pRequest->body.queryFp = fp;
|
|
||||||
pRequest->body.param = param;
|
|
||||||
doAsyncQuery(pRequest, false);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t createParseContext(const SRequestObj *pRequest, SParseContext **pCxt) {
|
int32_t createParseContext(const SRequestObj *pRequest, SParseContext **pCxt) {
|
||||||
|
@ -953,10 +881,75 @@ void taos_unsubscribe(TAOS_SUB *tsub, int keepProgress) {
|
||||||
}
|
}
|
||||||
|
|
||||||
int taos_load_table_info(TAOS *taos, const char *tableNameList) {
|
int taos_load_table_info(TAOS *taos, const char *tableNameList) {
|
||||||
// TODO
|
const int32_t MAX_TABLE_NAME_LENGTH = 12 * 1024 * 1024; // 12MB list
|
||||||
return -1;
|
int32_t code = 0;
|
||||||
|
SRequestObj *pRequest = NULL;
|
||||||
|
SCatalogReq catalogReq = {0};
|
||||||
|
|
||||||
|
if (NULL == tableNameList) {
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t length = (int32_t)strlen(tableNameList);
|
||||||
|
if (0 == length) {
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
} else if (length > MAX_TABLE_NAME_LENGTH) {
|
||||||
|
tscError("tableNameList too long, length:%d, maximum allowed:%d", length, MAX_TABLE_NAME_LENGTH);
|
||||||
|
return TSDB_CODE_TSC_INVALID_OPERATION;
|
||||||
|
}
|
||||||
|
|
||||||
|
STscObj *pTscObj = acquireTscObj(*(int64_t *)taos);
|
||||||
|
if (pTscObj == NULL) {
|
||||||
|
terrno = TSDB_CODE_TSC_DISCONNECTED;
|
||||||
|
return terrno;
|
||||||
|
}
|
||||||
|
|
||||||
|
code = transferTableNameList(tableNameList, pTscObj->acctId, pTscObj->db, &catalogReq.pTableMeta);
|
||||||
|
if (code) {
|
||||||
|
goto _return;
|
||||||
|
}
|
||||||
|
|
||||||
|
SCatalog* pCtg = NULL;
|
||||||
|
code = catalogGetHandle(pTscObj->pAppInfo->clusterId, &pCtg);
|
||||||
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
goto _return;
|
||||||
|
}
|
||||||
|
|
||||||
|
char* sql = "taos_load_table_info";
|
||||||
|
code = buildRequest(pTscObj, sql, strlen(sql), &pRequest);
|
||||||
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
terrno = code;
|
||||||
|
goto _return;
|
||||||
|
}
|
||||||
|
|
||||||
|
SSyncQueryParam param = {0};
|
||||||
|
tsem_init(¶m.sem, 0, 0);
|
||||||
|
param.pRequest = pRequest;
|
||||||
|
|
||||||
|
SRequestConnInfo conn = {.pTrans = pTscObj->pAppInfo->pTransporter,
|
||||||
|
.requestId = pRequest->requestId,
|
||||||
|
.requestObjRefId = pRequest->self};
|
||||||
|
|
||||||
|
conn.mgmtEps = getEpSet_s(&pTscObj->pAppInfo->mgmtEp);
|
||||||
|
|
||||||
|
code = catalogAsyncGetAllMeta(pCtg, &conn, pRequest->requestId, &catalogReq, syncCatalogFn, ¶m, NULL);
|
||||||
|
if (code) {
|
||||||
|
goto _return;
|
||||||
|
}
|
||||||
|
|
||||||
|
tsem_wait(¶m.sem);
|
||||||
|
|
||||||
|
_return:
|
||||||
|
|
||||||
|
taosArrayDestroy(catalogReq.pTableMeta);
|
||||||
|
destroyRequest(pRequest);
|
||||||
|
|
||||||
|
releaseTscObj(*(int64_t *)taos);
|
||||||
|
|
||||||
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
TAOS_STMT *taos_stmt_init(TAOS *taos) {
|
TAOS_STMT *taos_stmt_init(TAOS *taos) {
|
||||||
STscObj *pObj = acquireTscObj(*(int64_t *)taos);
|
STscObj *pObj = acquireTscObj(*(int64_t *)taos);
|
||||||
if (NULL == pObj) {
|
if (NULL == pObj) {
|
||||||
|
|
|
@ -629,6 +629,373 @@ static int32_t taosSetServerCfg(SConfig *pCfg) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int32_t taosSetCfg(SConfig *pCfg, char* name) {
|
||||||
|
int32_t len = strlen(name);
|
||||||
|
char lowcaseName[CFG_NAME_MAX_LEN + 1] = {0};
|
||||||
|
strntolower(lowcaseName, name, TMIN(CFG_NAME_MAX_LEN, len));
|
||||||
|
|
||||||
|
switch (lowcaseName[0]) {
|
||||||
|
case 'a': {
|
||||||
|
if (strcasecmp("asyncLog", name) == 0) {
|
||||||
|
tsAsyncLog = cfgGetItem(pCfg, "asyncLog")->bval;
|
||||||
|
}
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
case 'b': {
|
||||||
|
if (strcasecmp("bnodeShmSize", name) == 0) {
|
||||||
|
tsBnodeShmSize = cfgGetItem(pCfg, "bnodeShmSize")->i32;
|
||||||
|
}
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
case 'c': {
|
||||||
|
if (strcasecmp("charset", name) == 0) {
|
||||||
|
const char *locale = cfgGetItem(pCfg, "locale")->str;
|
||||||
|
const char *charset = cfgGetItem(pCfg, "charset")->str;
|
||||||
|
taosSetSystemLocale(locale, charset);
|
||||||
|
osSetSystemLocale(locale, charset);
|
||||||
|
} else if (strcasecmp("compressMsgSize", name) == 0) {
|
||||||
|
tsCompressMsgSize = cfgGetItem(pCfg, "compressMsgSize")->i32;
|
||||||
|
} else if (strcasecmp("compressColData", name) == 0) {
|
||||||
|
tsCompressColData = cfgGetItem(pCfg, "compressColData")->i32;
|
||||||
|
} else if (strcasecmp("countAlwaysReturnValue", name) == 0) {
|
||||||
|
tsCountAlwaysReturnValue = cfgGetItem(pCfg, "countAlwaysReturnValue")->i32;
|
||||||
|
} else if (strcasecmp("cDebugFlag", name) == 0) {
|
||||||
|
cDebugFlag = cfgGetItem(pCfg, "cDebugFlag")->i32;
|
||||||
|
}
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
case 'd': {
|
||||||
|
if (strcasecmp("deadLockKillQuery", name) == 0) {
|
||||||
|
tsDeadLockKillQuery = cfgGetItem(pCfg, "deadLockKillQuery")->i32;
|
||||||
|
} else if (strcasecmp("dDebugFlag", name) == 0) {
|
||||||
|
dDebugFlag = cfgGetItem(pCfg, "dDebugFlag")->i32;
|
||||||
|
}
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
case 'e': {
|
||||||
|
if (strcasecmp("enableCoreFile", name) == 0) {
|
||||||
|
bool enableCore = cfgGetItem(pCfg, "enableCoreFile")->bval;
|
||||||
|
taosSetConsoleEcho(enableCore);
|
||||||
|
}
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
case 'f': {
|
||||||
|
if (strcasecmp("fqdn", name) == 0) {
|
||||||
|
tstrncpy(tsLocalFqdn, cfgGetItem(pCfg, "fqdn")->str, TSDB_FQDN_LEN);
|
||||||
|
tsServerPort = (uint16_t)cfgGetItem(pCfg, "serverPort")->i32;
|
||||||
|
snprintf(tsLocalEp, sizeof(tsLocalEp), "%s:%u", tsLocalFqdn, tsServerPort);
|
||||||
|
|
||||||
|
char defaultFirstEp[TSDB_EP_LEN] = {0};
|
||||||
|
snprintf(defaultFirstEp, TSDB_EP_LEN, "%s:%u", tsLocalFqdn, tsServerPort);
|
||||||
|
|
||||||
|
SConfigItem *pFirstEpItem = cfgGetItem(pCfg, "firstEp");
|
||||||
|
SEp firstEp = {0};
|
||||||
|
taosGetFqdnPortFromEp(strlen(pFirstEpItem->str) == 0 ? defaultFirstEp : pFirstEpItem->str, &firstEp);
|
||||||
|
snprintf(tsFirst, sizeof(tsFirst), "%s:%u", firstEp.fqdn, firstEp.port);
|
||||||
|
cfgSetItem(pCfg, "firstEp", tsFirst, pFirstEpItem->stype);
|
||||||
|
} else if (strcasecmp("firstEp", name) == 0) {
|
||||||
|
tstrncpy(tsLocalFqdn, cfgGetItem(pCfg, "fqdn")->str, TSDB_FQDN_LEN);
|
||||||
|
tsServerPort = (uint16_t)cfgGetItem(pCfg, "serverPort")->i32;
|
||||||
|
snprintf(tsLocalEp, sizeof(tsLocalEp), "%s:%u", tsLocalFqdn, tsServerPort);
|
||||||
|
|
||||||
|
char defaultFirstEp[TSDB_EP_LEN] = {0};
|
||||||
|
snprintf(defaultFirstEp, TSDB_EP_LEN, "%s:%u", tsLocalFqdn, tsServerPort);
|
||||||
|
|
||||||
|
SConfigItem *pFirstEpItem = cfgGetItem(pCfg, "firstEp");
|
||||||
|
SEp firstEp = {0};
|
||||||
|
taosGetFqdnPortFromEp(strlen(pFirstEpItem->str) == 0 ? defaultFirstEp : pFirstEpItem->str, &firstEp);
|
||||||
|
snprintf(tsFirst, sizeof(tsFirst), "%s:%u", firstEp.fqdn, firstEp.port);
|
||||||
|
cfgSetItem(pCfg, "firstEp", tsFirst, pFirstEpItem->stype);
|
||||||
|
} else if (strcasecmp("fsDebugFlag", name) == 0) {
|
||||||
|
fsDebugFlag = cfgGetItem(pCfg, "fsDebugFlag")->i32;
|
||||||
|
} else if (strcasecmp("fnDebugFlag", name) == 0) {
|
||||||
|
fnDebugFlag = cfgGetItem(pCfg, "fnDebugFlag")->i32;
|
||||||
|
}
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
case 'i': {
|
||||||
|
if (strcasecmp("idxDebugFlag", name) == 0) {
|
||||||
|
idxDebugFlag = cfgGetItem(pCfg, "idxDebugFlag")->i32;
|
||||||
|
}
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
case 'j': {
|
||||||
|
if (strcasecmp("jniDebugFlag", name) == 0) {
|
||||||
|
jniDebugFlag = cfgGetItem(pCfg, "jniDebugFlag")->i32;
|
||||||
|
}
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
case 'k': {
|
||||||
|
if (strcasecmp("keepColumnName", name) == 0) {
|
||||||
|
tsKeepOriginalColumnName = cfgGetItem(pCfg, "keepColumnName")->bval;
|
||||||
|
}
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
case 'l': {
|
||||||
|
if (strcasecmp("locale", name) == 0) {
|
||||||
|
const char *locale = cfgGetItem(pCfg, "locale")->str;
|
||||||
|
const char *charset = cfgGetItem(pCfg, "charset")->str;
|
||||||
|
taosSetSystemLocale(locale, charset);
|
||||||
|
osSetSystemLocale(locale, charset);
|
||||||
|
} else if (strcasecmp("logDir", name) == 0) {
|
||||||
|
tstrncpy(tsLogDir, cfgGetItem(pCfg, "logDir")->str, PATH_MAX);
|
||||||
|
taosExpandDir(tsLogDir, tsLogDir, PATH_MAX);
|
||||||
|
} else if (strcasecmp("logKeepDays", name) == 0) {
|
||||||
|
tsLogKeepDays = cfgGetItem(pCfg, "logKeepDays")->i32;
|
||||||
|
}
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
case 'm': {
|
||||||
|
switch (lowcaseName[1]) {
|
||||||
|
case 'a': {
|
||||||
|
if (strcasecmp("maxShellConns", name) == 0) {
|
||||||
|
tsMaxShellConns = cfgGetItem(pCfg, "maxShellConns")->i32;
|
||||||
|
} else if (strcasecmp("maxNumOfDistinctRes", name) == 0) {
|
||||||
|
tsMaxNumOfDistinctResults = cfgGetItem(pCfg, "maxNumOfDistinctRes")->i32;
|
||||||
|
} else if (strcasecmp("maxStreamCompDelay", name) == 0) {
|
||||||
|
tsMaxStreamComputDelay = cfgGetItem(pCfg, "maxStreamCompDelay")->i32;
|
||||||
|
} else if (strcasecmp("maxFirstStreamCompDelay", name) == 0) {
|
||||||
|
tsStreamCompStartDelay = cfgGetItem(pCfg, "maxFirstStreamCompDelay")->i32;
|
||||||
|
}
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
case 'd': {
|
||||||
|
if (strcasecmp("mDebugFlag", name) == 0) {
|
||||||
|
mDebugFlag = cfgGetItem(pCfg, "mDebugFlag")->i32;
|
||||||
|
}
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
case 'i': {
|
||||||
|
if (strcasecmp("minimalTempDirGB", name) == 0) {
|
||||||
|
tsTempSpace.reserved = cfgGetItem(pCfg, "minimalTempDirGB")->fval;
|
||||||
|
} else if (strcasecmp("minimalDataDirGB", name) == 0) {
|
||||||
|
tsDataSpace.reserved = cfgGetItem(pCfg, "minimalDataDirGB")->fval;
|
||||||
|
} else if (strcasecmp("minSlidingTime", name) == 0) {
|
||||||
|
tsMinSlidingTime = cfgGetItem(pCfg, "minSlidingTime")->i32;
|
||||||
|
} else if (strcasecmp("minIntervalTime", name) == 0) {
|
||||||
|
tsMinIntervalTime = cfgGetItem(pCfg, "minIntervalTime")->i32;
|
||||||
|
} else if (strcasecmp("minimalLogDirGB", name) == 0) {
|
||||||
|
tsLogSpace.reserved = cfgGetItem(pCfg, "minimalLogDirGB")->fval;
|
||||||
|
}
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
case 'n': {
|
||||||
|
if (strcasecmp("mnodeShmSize", name) == 0) {
|
||||||
|
tsMnodeShmSize = cfgGetItem(pCfg, "mnodeShmSize")->i32;
|
||||||
|
}
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
case 'o': {
|
||||||
|
if (strcasecmp("monitor", name) == 0) {
|
||||||
|
tsEnableMonitor = cfgGetItem(pCfg, "monitor")->bval;
|
||||||
|
} else if (strcasecmp("monitorInterval", name) == 0) {
|
||||||
|
tsMonitorInterval = cfgGetItem(pCfg, "monitorInterval")->i32;
|
||||||
|
} else if (strcasecmp("monitorFqdn", name) == 0) {
|
||||||
|
tstrncpy(tsMonitorFqdn, cfgGetItem(pCfg, "monitorFqdn")->str, TSDB_FQDN_LEN);
|
||||||
|
} else if (strcasecmp("monitorPort", name) == 0) {
|
||||||
|
tsMonitorPort = (uint16_t)cfgGetItem(pCfg, "monitorPort")->i32;
|
||||||
|
} else if (strcasecmp("monitorMaxLogs", name) == 0) {
|
||||||
|
tsMonitorMaxLogs = cfgGetItem(pCfg, "monitorMaxLogs")->i32;
|
||||||
|
} else if (strcasecmp("monitorComp", name) == 0) {
|
||||||
|
tsMonitorComp = cfgGetItem(pCfg, "monitorComp")->bval;
|
||||||
|
}
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
case 'q': {
|
||||||
|
if (strcasecmp("mqRebalanceInterval", name) == 0) {
|
||||||
|
tsMqRebalanceInterval = cfgGetItem(pCfg, "mqRebalanceInterval")->i32;
|
||||||
|
}
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
case 'u': {
|
||||||
|
if (strcasecmp("multiProcess", name) == 0) {
|
||||||
|
tsMultiProcess = cfgGetItem(pCfg, "multiProcess")->bval;
|
||||||
|
}
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
default:
|
||||||
|
terrno = TSDB_CODE_CFG_NOT_FOUND;
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
case 'n': {
|
||||||
|
if (strcasecmp("numOfTaskQueueThreads", name) == 0) {
|
||||||
|
tsNumOfTaskQueueThreads = cfgGetItem(pCfg, "numOfTaskQueueThreads")->i32;
|
||||||
|
} else if (strcasecmp("numOfRpcThreads", name) == 0) {
|
||||||
|
tsNumOfRpcThreads = cfgGetItem(pCfg, "numOfRpcThreads")->i32;
|
||||||
|
} else if (strcasecmp("numOfCommitThreads", name) == 0) {
|
||||||
|
tsNumOfCommitThreads = cfgGetItem(pCfg, "numOfCommitThreads")->i32;
|
||||||
|
} else if (strcasecmp("numOfMnodeQueryThreads", name) == 0) {
|
||||||
|
tsNumOfMnodeQueryThreads = cfgGetItem(pCfg, "numOfMnodeQueryThreads")->i32;
|
||||||
|
} else if (strcasecmp("numOfMnodeReadThreads", name) == 0) {
|
||||||
|
tsNumOfMnodeReadThreads = cfgGetItem(pCfg, "numOfMnodeReadThreads")->i32;
|
||||||
|
} else if (strcasecmp("numOfVnodeQueryThreads", name) == 0) {
|
||||||
|
tsNumOfVnodeQueryThreads = cfgGetItem(pCfg, "numOfVnodeQueryThreads")->i32;
|
||||||
|
} else if (strcasecmp("numOfVnodeFetchThreads", name) == 0) {
|
||||||
|
tsNumOfVnodeFetchThreads = cfgGetItem(pCfg, "numOfVnodeFetchThreads")->i32;
|
||||||
|
} else if (strcasecmp("numOfVnodeWriteThreads", name) == 0) {
|
||||||
|
tsNumOfVnodeWriteThreads = cfgGetItem(pCfg, "numOfVnodeWriteThreads")->i32;
|
||||||
|
} else if (strcasecmp("numOfVnodeSyncThreads", name) == 0) {
|
||||||
|
tsNumOfVnodeSyncThreads = cfgGetItem(pCfg, "numOfVnodeSyncThreads")->i32;
|
||||||
|
} else if (strcasecmp("numOfVnodeMergeThreads", name) == 0) {
|
||||||
|
tsNumOfVnodeMergeThreads = cfgGetItem(pCfg, "numOfVnodeMergeThreads")->i32;
|
||||||
|
} else if (strcasecmp("numOfQnodeQueryThreads", name) == 0) {
|
||||||
|
tsNumOfQnodeQueryThreads = cfgGetItem(pCfg, "numOfQnodeQueryThreads")->i32;
|
||||||
|
} else if (strcasecmp("numOfQnodeFetchThreads", name) == 0) {
|
||||||
|
tsNumOfQnodeFetchThreads = cfgGetItem(pCfg, "numOfQnodeFetchThreads")->i32;
|
||||||
|
} else if (strcasecmp("numOfSnodeSharedThreads", name) == 0) {
|
||||||
|
tsNumOfSnodeSharedThreads = cfgGetItem(pCfg, "numOfSnodeSharedThreads")->i32;
|
||||||
|
} else if (strcasecmp("numOfSnodeUniqueThreads", name) == 0) {
|
||||||
|
tsNumOfSnodeUniqueThreads = cfgGetItem(pCfg, "numOfSnodeUniqueThreads")->i32;
|
||||||
|
} else if (strcasecmp("numOfLogLines", name) == 0) {
|
||||||
|
tsNumOfLogLines = cfgGetItem(pCfg, "numOfLogLines")->i32;
|
||||||
|
}
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
case 'p': {
|
||||||
|
if (strcasecmp("printAuth", name) == 0) {
|
||||||
|
tsPrintAuth = cfgGetItem(pCfg, "printAuth")->bval;
|
||||||
|
}
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
case 'q': {
|
||||||
|
if (strcasecmp("queryPolicy", name) == 0) {
|
||||||
|
tsQueryPolicy = cfgGetItem(pCfg, "queryPolicy")->i32;
|
||||||
|
} else if (strcasecmp("querySmaOptimize", name) == 0) {
|
||||||
|
tsQuerySmaOptimize = cfgGetItem(pCfg, "querySmaOptimize")->i32;
|
||||||
|
} else if (strcasecmp("queryBufferSize", name) == 0) {
|
||||||
|
tsQueryBufferSize = cfgGetItem(pCfg, "queryBufferSize")->i32;
|
||||||
|
if (tsQueryBufferSize >= 0) {
|
||||||
|
tsQueryBufferSizeBytes = tsQueryBufferSize * 1048576UL;
|
||||||
|
}
|
||||||
|
} else if (strcasecmp("qnodeShmSize", name) == 0) {
|
||||||
|
tsQnodeShmSize = cfgGetItem(pCfg, "qnodeShmSize")->i32;
|
||||||
|
} else if (strcasecmp("qDebugFlag", name) == 0) {
|
||||||
|
qDebugFlag = cfgGetItem(pCfg, "qDebugFlag")->i32;
|
||||||
|
}
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
case 'r': {
|
||||||
|
if (strcasecmp("retryStreamCompDelay", name) == 0) {
|
||||||
|
tsRetryStreamCompDelay = cfgGetItem(pCfg, "retryStreamCompDelay")->i32;
|
||||||
|
} else if (strcasecmp("retrieveBlockingModel", name) == 0) {
|
||||||
|
tsRetrieveBlockingModel = cfgGetItem(pCfg, "retrieveBlockingModel")->bval;
|
||||||
|
} else if (strcasecmp("rpcQueueMemoryAllowed", name) == 0) {
|
||||||
|
tsRpcQueueMemoryAllowed = cfgGetItem(pCfg, "rpcQueueMemoryAllowed")->i64;
|
||||||
|
} else if (strcasecmp("rpcDebugFlag", name) == 0) {
|
||||||
|
rpcDebugFlag = cfgGetItem(pCfg, "rpcDebugFlag")->i32;
|
||||||
|
}
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
case 's': {
|
||||||
|
if (strcasecmp("secondEp", name) == 0) {
|
||||||
|
SConfigItem *pSecondpItem = cfgGetItem(pCfg, "secondEp");
|
||||||
|
SEp secondEp = {0};
|
||||||
|
taosGetFqdnPortFromEp(strlen(pSecondpItem->str) == 0 ? tsFirst : pSecondpItem->str, &secondEp);
|
||||||
|
snprintf(tsSecond, sizeof(tsSecond), "%s:%u", secondEp.fqdn, secondEp.port);
|
||||||
|
cfgSetItem(pCfg, "secondEp", tsSecond, pSecondpItem->stype);
|
||||||
|
} else if (strcasecmp("smlChildTableName", name) == 0) {
|
||||||
|
tstrncpy(tsSmlChildTableName, cfgGetItem(pCfg, "smlChildTableName")->str, TSDB_TABLE_NAME_LEN);
|
||||||
|
} else if (strcasecmp("smlTagName", name) == 0) {
|
||||||
|
tstrncpy(tsSmlTagName, cfgGetItem(pCfg, "smlTagName")->str, TSDB_COL_NAME_LEN);
|
||||||
|
} else if (strcasecmp("smlDataFormat", name) == 0) {
|
||||||
|
tsSmlDataFormat = cfgGetItem(pCfg, "smlDataFormat")->bval;
|
||||||
|
} else if (strcasecmp("shellActivityTimer", name) == 0) {
|
||||||
|
tsShellActivityTimer = cfgGetItem(pCfg, "shellActivityTimer")->i32;
|
||||||
|
} else if (strcasecmp("supportVnodes", name) == 0) {
|
||||||
|
tsNumOfSupportVnodes = cfgGetItem(pCfg, "supportVnodes")->i32;
|
||||||
|
} else if (strcasecmp("statusInterval", name) == 0) {
|
||||||
|
tsStatusInterval = cfgGetItem(pCfg, "statusInterval")->i32;
|
||||||
|
} else if (strcasecmp("streamCompDelayRatio", name) == 0) {
|
||||||
|
tsStreamComputDelayRatio = cfgGetItem(pCfg, "streamCompDelayRatio")->fval;
|
||||||
|
} else if (strcasecmp("slaveQuery", name) == 0) {
|
||||||
|
tsEnableSlaveQuery = cfgGetItem(pCfg, "slaveQuery")->bval;
|
||||||
|
} else if (strcasecmp("snodeShmSize", name) == 0) {
|
||||||
|
tsSnodeShmSize = cfgGetItem(pCfg, "snodeShmSize")->i32;
|
||||||
|
} else if (strcasecmp("serverPort", name) == 0) {
|
||||||
|
tstrncpy(tsLocalFqdn, cfgGetItem(pCfg, "fqdn")->str, TSDB_FQDN_LEN);
|
||||||
|
tsServerPort = (uint16_t)cfgGetItem(pCfg, "serverPort")->i32;
|
||||||
|
snprintf(tsLocalEp, sizeof(tsLocalEp), "%s:%u", tsLocalFqdn, tsServerPort);
|
||||||
|
|
||||||
|
char defaultFirstEp[TSDB_EP_LEN] = {0};
|
||||||
|
snprintf(defaultFirstEp, TSDB_EP_LEN, "%s:%u", tsLocalFqdn, tsServerPort);
|
||||||
|
|
||||||
|
SConfigItem *pFirstEpItem = cfgGetItem(pCfg, "firstEp");
|
||||||
|
SEp firstEp = {0};
|
||||||
|
taosGetFqdnPortFromEp(strlen(pFirstEpItem->str) == 0 ? defaultFirstEp : pFirstEpItem->str, &firstEp);
|
||||||
|
snprintf(tsFirst, sizeof(tsFirst), "%s:%u", firstEp.fqdn, firstEp.port);
|
||||||
|
cfgSetItem(pCfg, "firstEp", tsFirst, pFirstEpItem->stype);
|
||||||
|
} else if (strcasecmp("sDebugFlag", name) == 0) {
|
||||||
|
sDebugFlag = cfgGetItem(pCfg, "sDebugFlag")->i32;
|
||||||
|
} else if (strcasecmp("smaDebugFlag", name) == 0) {
|
||||||
|
smaDebugFlag = cfgGetItem(pCfg, "smaDebugFlag")->i32;
|
||||||
|
}
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
case 't': {
|
||||||
|
if (strcasecmp("timezone", name) == 0) {
|
||||||
|
SConfigItem *pItem = cfgGetItem(pCfg, "timezone");
|
||||||
|
osSetTimezone(pItem->str);
|
||||||
|
uDebug("timezone format changed from %s to %s", pItem->str, tsTimezoneStr);
|
||||||
|
cfgSetItem(pCfg, "timezone", tsTimezoneStr, pItem->stype);
|
||||||
|
} else if (strcasecmp("tempDir", name) == 0) {
|
||||||
|
tstrncpy(tsTempDir, cfgGetItem(pCfg, "tempDir")->str, PATH_MAX);
|
||||||
|
taosExpandDir(tsTempDir, tsTempDir, PATH_MAX);
|
||||||
|
if (taosMulMkDir(tsTempDir) != 0) {
|
||||||
|
uError("failed to create tempDir:%s since %s", tsTempDir, terrstr());
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
} else if (strcasecmp("telemetryReporting", name) == 0) {
|
||||||
|
tsEnableTelem = cfgGetItem(pCfg, "telemetryReporting")->bval;
|
||||||
|
} else if (strcasecmp("telemetryInterval", name) == 0) {
|
||||||
|
tsTelemInterval = cfgGetItem(pCfg, "telemetryInterval")->i32;
|
||||||
|
} else if (strcasecmp("telemetryServer", name) == 0) {
|
||||||
|
tstrncpy(tsTelemServer, cfgGetItem(pCfg, "telemetryServer")->str, TSDB_FQDN_LEN);
|
||||||
|
} else if (strcasecmp("telemetryPort", name) == 0) {
|
||||||
|
tsTelemPort = (uint16_t)cfgGetItem(pCfg, "telemetryPort")->i32;
|
||||||
|
} else if (strcasecmp("transPullupInterval", name) == 0) {
|
||||||
|
tsTransPullupInterval = cfgGetItem(pCfg, "transPullupInterval")->i32;
|
||||||
|
} else if (strcasecmp("tmrDebugFlag", name) == 0) {
|
||||||
|
tmrDebugFlag = cfgGetItem(pCfg, "tmrDebugFlag")->i32;
|
||||||
|
} else if (strcasecmp("tsdbDebugFlag", name) == 0) {
|
||||||
|
tsdbDebugFlag = cfgGetItem(pCfg, "tsdbDebugFlag")->i32;
|
||||||
|
} else if (strcasecmp("tqDebugFlag", name) == 0) {
|
||||||
|
tqDebugFlag = cfgGetItem(pCfg, "tqDebugFlag")->i32;
|
||||||
|
}
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
case 'u': {
|
||||||
|
if (strcasecmp("udf", name) == 0) {
|
||||||
|
tsStartUdfd = cfgGetItem(pCfg, "udf")->bval;
|
||||||
|
} else if (strcasecmp("uDebugFlag", name) == 0) {
|
||||||
|
uDebugFlag = cfgGetItem(pCfg, "uDebugFlag")->i32;
|
||||||
|
}
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
case 'v': {
|
||||||
|
if (strcasecmp("vnodeShmSize", name) == 0) {
|
||||||
|
tsVnodeShmSize = cfgGetItem(pCfg, "vnodeShmSize")->i32;
|
||||||
|
} else if (strcasecmp("vDebugFlag", name) == 0) {
|
||||||
|
vDebugFlag = cfgGetItem(pCfg, "vDebugFlag")->i32;
|
||||||
|
}
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
case 'w': {
|
||||||
|
if (strcasecmp("wDebugFlag", name) == 0) {
|
||||||
|
wDebugFlag = cfgGetItem(pCfg, "wDebugFlag")->i32;
|
||||||
|
}
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
default:
|
||||||
|
terrno = TSDB_CODE_CFG_NOT_FOUND;
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
int32_t taosCreateLog(const char *logname, int32_t logFileNum, const char *cfgDir, const char **envCmd,
|
int32_t taosCreateLog(const char *logname, int32_t logFileNum, const char *cfgDir, const char **envCmd,
|
||||||
const char *envFile, char *apolloUrl, SArray *pArgs, bool tsc) {
|
const char *envFile, char *apolloUrl, SArray *pArgs, bool tsc) {
|
||||||
osDefaultInit();
|
osDefaultInit();
|
||||||
|
|
|
@ -214,6 +214,20 @@ int32_t tNameSetDbName(SName* dst, int32_t acct, const char* dbName, size_t name
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int32_t tNameAddTbName(SName* dst, const char* tbName, size_t nameLen) {
|
||||||
|
assert(dst != NULL && tbName != NULL && nameLen > 0);
|
||||||
|
|
||||||
|
// too long account id or too long db name
|
||||||
|
if (nameLen >= tListLen(dst->tname) || nameLen <= 0) {
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
dst->type = TSDB_TABLE_NAME_T;
|
||||||
|
tstrncpy(dst->tname, tbName, nameLen + 1);
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
int32_t tNameSetAcctId(SName* dst, int32_t acctId) {
|
int32_t tNameSetAcctId(SName* dst, int32_t acctId) {
|
||||||
assert(dst != NULL);
|
assert(dst != NULL);
|
||||||
dst->acctId = acctId;
|
dst->acctId = acctId;
|
||||||
|
|
|
@ -555,6 +555,10 @@ static int32_t execAlterLocal(SAlterLocalStmt* pStmt) {
|
||||||
return terrno;
|
return terrno;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (taosSetCfg(tsCfg, pStmt->config)) {
|
||||||
|
return terrno;
|
||||||
|
}
|
||||||
|
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue