Merge branch '3.0' of github.com:taosdata/TDengine into 3.0

This commit is contained in:
wenzhouwww@live.cn 2022-05-19 14:17:04 +08:00
commit fddcd7eccd
68 changed files with 963 additions and 401 deletions

View File

@ -117,27 +117,29 @@ def pre_test(){
def pre_test_win(){
bat '''
hostname
ipconfig
set
date /t
time /t
taskkill /f /t /im python.exe
taskkill /f /t /im bash.exe
rd /s /Q C:\\workspace\\%EXECUTOR_NUMBER%\\TDinternal\\debug
exit 0
'''
bat '''
cd C:\\workspace\\%EXECUTOR_NUMBER%\\TDinternal
git reset --hard
git fetch || git fetch
'''
bat '''
cd C:\\workspace\\%EXECUTOR_NUMBER%\\TDinternal\\community
git reset --hard
git fetch || git fetch
git checkout -f
'''
script {
if (env.CHANGE_TARGET == 'master') {
bat '''
cd C:\\workspace\\%EXECUTOR_NUMBER%\\TDinternal
git checkout master
'''
bat '''
cd C:\\workspace\\%EXECUTOR_NUMBER%\\TDinternal\\community
git checkout master
'''
@ -145,6 +147,8 @@ def pre_test_win(){
bat '''
cd C:\\workspace\\%EXECUTOR_NUMBER%\\TDinternal
git checkout 2.0
'''
bat '''
cd C:\\workspace\\%EXECUTOR_NUMBER%\\TDinternal\\community
git checkout 2.0
'''
@ -152,6 +156,8 @@ def pre_test_win(){
bat '''
cd C:\\workspace\\%EXECUTOR_NUMBER%\\TDinternal
git checkout 3.0
'''
bat '''
cd C:\\workspace\\%EXECUTOR_NUMBER%\\TDinternal\\community
git checkout 3.0
'''
@ -159,6 +165,8 @@ def pre_test_win(){
bat '''
cd C:\\workspace\\%EXECUTOR_NUMBER%\\TDinternal
git checkout develop
'''
bat '''
cd C:\\workspace\\%EXECUTOR_NUMBER%\\TDinternal\\community
git checkout develop
'''
@ -169,30 +177,52 @@ def pre_test_win(){
bat '''
cd C:\\workspace\\%EXECUTOR_NUMBER%\\TDinternal
git pull
git log -5
'''
bat '''
cd C:\\workspace\\%EXECUTOR_NUMBER%\\TDinternal\\community
git pull
git fetch origin +refs/pull/${CHANGE_ID}/merge
'''
bat '''
cd C:\\workspace\\%EXECUTOR_NUMBER%\\TDinternal\\community
git fetch origin +refs/pull/%CHANGE_ID%/merge
'''
bat '''
cd C:\\workspace\\%EXECUTOR_NUMBER%\\TDinternal\\community
git checkout -qf FETCH_HEAD
git log -5
'''
} else if (env.CHANGE_URL =~ /\/TDinternal\//) {
bat '''
cd C:\\workspace\\%EXECUTOR_NUMBER%\\TDinternal
git pull
git fetch origin +refs/pull/${CHANGE_ID}/merge
'''
bat '''
cd C:\\workspace\\%EXECUTOR_NUMBER%\\TDinternal
git fetch origin +refs/pull/%CHANGE_ID%/merge
'''
bat '''
cd C:\\workspace\\%EXECUTOR_NUMBER%\\TDinternal
git checkout -qf FETCH_HEAD
git log -5
'''
bat '''
cd C:\\workspace\\%EXECUTOR_NUMBER%\\TDinternal\\community
git pull
git log -5
'''
} else {
sh '''
echo "unmatched reposiotry ${CHANGE_URL}"
bat '''
echo "unmatched reposiotry %CHANGE_URL%"
'''
}
}
bat '''
cd C:\\workspace\\%EXECUTOR_NUMBER%\\TDinternal
git branch
git log -5
'''
bat '''
cd C:\\workspace\\%EXECUTOR_NUMBER%\\TDinternal\\community
git branch
git log -5
'''
bat '''
cd C:\\workspace\\%EXECUTOR_NUMBER%\\TDinternal\\community
git submodule update --init --recursive
@ -205,10 +235,15 @@ def pre_test_build_win() {
cd C:\\workspace\\%EXECUTOR_NUMBER%\\TDinternal
mkdir debug
cd debug
time /t
call "C:\\Program Files (x86)\\Microsoft Visual Studio\\2017\\Community\\VC\\Auxiliary\\Build\\vcvarsall.bat" x64
set CL=/MP8
cmake .. -G "NMake Makefiles JOM"
jom -j 4 || exit 8
echo ">>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> cmake"
time /t
cmake .. -G "NMake Makefiles JOM" || exit 7
echo ">>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> jom -j 6"
time /t
jom -j 6 || exit 8
time /t
'''
return 1
@ -226,6 +261,13 @@ pipeline {
stages {
stage('run test') {
parallel {
stage('windows test') {
agent{label " windows10_01 || windows10_02 || windows10_03 || windows10_04 "}
steps {
pre_test_win()
pre_test_build_win()
}
}
stage('linux test') {
agent{label " slave3_0 || slave15 || slave16 || slave17 "}
options { skipDefaultCheckout() }

View File

@ -5,22 +5,27 @@ IF (TD_LINUX)
ELSEIF (TD_WINDOWS)
SET(CMAKE_INSTALL_PREFIX C:/TDengine)
INSTALL(DIRECTORY ${TD_SOURCE_DIR}/src/connector/go DESTINATION connector)
INSTALL(DIRECTORY ${TD_SOURCE_DIR}/src/connector/nodejs DESTINATION connector)
INSTALL(DIRECTORY ${TD_SOURCE_DIR}/src/connector/python DESTINATION connector)
INSTALL(DIRECTORY ${TD_SOURCE_DIR}/src/connector/C\# DESTINATION connector)
INSTALL(DIRECTORY ${TD_SOURCE_DIR}/examples DESTINATION .)
# INSTALL(DIRECTORY ${TD_SOURCE_DIR}/src/connector/go DESTINATION connector)
# INSTALL(DIRECTORY ${TD_SOURCE_DIR}/src/connector/nodejs DESTINATION connector)
# INSTALL(DIRECTORY ${TD_SOURCE_DIR}/src/connector/python DESTINATION connector)
# INSTALL(DIRECTORY ${TD_SOURCE_DIR}/src/connector/C\# DESTINATION connector)
# INSTALL(DIRECTORY ${TD_SOURCE_DIR}/examples DESTINATION .)
INSTALL(FILES ${TD_SOURCE_DIR}/packaging/cfg/taos.cfg DESTINATION cfg)
INSTALL(FILES ${TD_SOURCE_DIR}/src/inc/taos.h DESTINATION include)
INSTALL(FILES ${TD_SOURCE_DIR}/src/inc/taoserror.h DESTINATION include)
INSTALL(FILES ${TD_SOURCE_DIR}/include/client/taos.h DESTINATION include)
INSTALL(FILES ${TD_SOURCE_DIR}/include/util/taoserror.h DESTINATION include)
INSTALL(FILES ${LIBRARY_OUTPUT_PATH}/taos.lib DESTINATION driver)
INSTALL(FILES ${LIBRARY_OUTPUT_PATH}/taos_static.lib DESTINATION driver)
INSTALL(FILES ${LIBRARY_OUTPUT_PATH}/taos.exp DESTINATION driver)
INSTALL(FILES ${LIBRARY_OUTPUT_PATH}/taos.dll DESTINATION driver)
INSTALL(FILES ${EXECUTABLE_OUTPUT_PATH}/taos.exe DESTINATION .)
INSTALL(FILES ${EXECUTABLE_OUTPUT_PATH}/taosd.exe DESTINATION .)
INSTALL(FILES ${EXECUTABLE_OUTPUT_PATH}/udfd.exe DESTINATION .)
IF (TD_MVN_INSTALLED)
INSTALL(FILES ${LIBRARY_OUTPUT_PATH}/taos-jdbcdriver-2.0.38-dist.jar DESTINATION connector/jdbc)
ENDIF ()
SET(TD_MAKE_INSTALL_SH "${TD_SOURCE_DIR}/packaging/tools/make_install.bat")
INSTALL(CODE "MESSAGE(\"make install script: ${TD_MAKE_INSTALL_SH}\")")
INSTALL(CODE "execute_process(COMMAND ${TD_MAKE_INSTALL_SH} :needAdmin ${TD_SOURCE_DIR} ${PROJECT_BINARY_DIR} Windows ${TD_VER_NUMBER})")
ELSEIF (TD_DARWIN)
SET(TD_MAKE_INSTALL_SH "${TD_SOURCE_DIR}/packaging/tools/make_install.sh")
INSTALL(CODE "MESSAGE(\"make install script: ${TD_MAKE_INSTALL_SH}\")")

View File

@ -106,8 +106,8 @@ int32_t create_topic() {
}
taos_free_result(pRes);
/*pRes = taos_query(pConn, "create topic topic_ctb_column as abc1");*/
pRes = taos_query(pConn, "create topic topic_ctb_column as select ts, c1, c2, c3 from st1");
pRes = taos_query(pConn, "create topic topic_ctb_column as abc1");
/*pRes = taos_query(pConn, "create topic topic_ctb_column as select ts, c1, c2, c3 from st1");*/
if (taos_errno(pRes) != 0) {
printf("failed to create topic topic_ctb_column, reason:%s\n", taos_errstr(pRes));
return -1;
@ -239,7 +239,7 @@ void sync_consume_loop(tmq_t* tmq, tmq_list_t* topics) {
msg_process(tmqmessage);
taos_free_result(tmqmessage);
tmq_commit(tmq, NULL, 1);
tmq_commit_async(tmq, NULL, tmq_commit_cb_print, NULL);
/*if ((++msg_count % MIN_COMMIT_COUNT) == 0) tmq_commit(tmq, NULL, 0);*/
}
}

View File

@ -232,11 +232,11 @@ DLL_EXPORT tmq_resp_err_t tmq_unsubscribe(tmq_t *tmq);
DLL_EXPORT tmq_resp_err_t tmq_subscription(tmq_t *tmq, tmq_list_t **topics);
DLL_EXPORT TAOS_RES *tmq_consumer_poll(tmq_t *tmq, int64_t wait_time);
DLL_EXPORT tmq_resp_err_t tmq_consumer_close(tmq_t *tmq);
DLL_EXPORT tmq_resp_err_t tmq_commit(tmq_t *tmq, const tmq_topic_vgroup_list_t *offsets, int32_t async);
DLL_EXPORT void tmq_commit_async(tmq_t *tmq, const tmq_topic_vgroup_list_t *offsets, tmq_commit_cb *cb, void *param);
DLL_EXPORT tmq_resp_err_t tmq_commit_sync(tmq_t *tmq, const tmq_topic_vgroup_list_t *offsets);
DLL_EXPORT void tmq_commit_async(tmq_t *tmq, const tmq_topic_vgroup_list_t *offsets, tmq_commit_cb *cb, void *param);
#if 0
DLL_EXPORT tmq_resp_err_t tmq_commit_message(tmq_t* tmq, const tmq_message_t* tmqmessage, int32_t async);
DLL_EXPORT tmq_resp_err_t tmq_commit(tmq_t *tmq, const tmq_topic_vgroup_list_t *offsets, int32_t async);
DLL_EXPORT tmq_resp_err_t tmq_seek(tmq_t *tmq, const tmq_topic_vgroup_t *offset);
#endif

View File

@ -301,6 +301,8 @@ typedef struct SSchema {
typedef struct {
int32_t nCols;
int32_t sver;
int32_t tagVer;
int32_t colVer;
SSchema* pSchema;
} SSchemaWrapper;
@ -309,6 +311,8 @@ static FORCE_INLINE SSchemaWrapper* tCloneSSchemaWrapper(const SSchemaWrapper* p
if (pSW == NULL) return pSW;
pSW->nCols = pSchemaWrapper->nCols;
pSW->sver = pSchemaWrapper->sver;
pSW->tagVer = pSchemaWrapper->tagVer;
pSW->colVer = pSchemaWrapper->colVer;
pSW->pSchema = (SSchema*)taosMemoryCalloc(pSW->nCols, sizeof(SSchema));
if (pSW->pSchema == NULL) {
taosMemoryFree(pSW);
@ -364,6 +368,8 @@ static FORCE_INLINE int32_t taosEncodeSSchemaWrapper(void** buf, const SSchemaWr
int32_t tlen = 0;
tlen += taosEncodeVariantI32(buf, pSW->nCols);
tlen += taosEncodeVariantI32(buf, pSW->sver);
tlen += taosEncodeVariantI32(buf, pSW->tagVer);
tlen += taosEncodeVariantI32(buf, pSW->colVer);
for (int32_t i = 0; i < pSW->nCols; i++) {
tlen += taosEncodeSSchema(buf, &pSW->pSchema[i]);
}
@ -373,6 +379,8 @@ static FORCE_INLINE int32_t taosEncodeSSchemaWrapper(void** buf, const SSchemaWr
static FORCE_INLINE void* taosDecodeSSchemaWrapper(const void* buf, SSchemaWrapper* pSW) {
buf = taosDecodeVariantI32(buf, &pSW->nCols);
buf = taosDecodeVariantI32(buf, &pSW->sver);
buf = taosDecodeVariantI32(buf, &pSW->tagVer);
buf = taosDecodeVariantI32(buf, &pSW->colVer);
pSW->pSchema = (SSchema*)taosMemoryCalloc(pSW->nCols, sizeof(SSchema));
if (pSW->pSchema == NULL) {
return NULL;
@ -387,6 +395,8 @@ static FORCE_INLINE void* taosDecodeSSchemaWrapper(const void* buf, SSchemaWrapp
static FORCE_INLINE int32_t tEncodeSSchemaWrapper(SEncoder* pEncoder, const SSchemaWrapper* pSW) {
if (tEncodeI32v(pEncoder, pSW->nCols) < 0) return -1;
if (tEncodeI32v(pEncoder, pSW->sver) < 0) return -1;
if (tEncodeI32v(pEncoder, pSW->tagVer) < 0) return -1;
if (tEncodeI32v(pEncoder, pSW->colVer) < 0) return -1;
for (int32_t i = 0; i < pSW->nCols; i++) {
if (tEncodeSSchema(pEncoder, &pSW->pSchema[i]) < 0) return -1;
}
@ -397,6 +407,8 @@ static FORCE_INLINE int32_t tEncodeSSchemaWrapper(SEncoder* pEncoder, const SSch
static FORCE_INLINE int32_t tDecodeSSchemaWrapper(SDecoder* pDecoder, SSchemaWrapper* pSW) {
if (tDecodeI32v(pDecoder, &pSW->nCols) < 0) return -1;
if (tDecodeI32v(pDecoder, &pSW->sver) < 0) return -1;
if (tDecodeI32v(pDecoder, &pSW->tagVer) < 0) return -1;
if (tDecodeI32v(pDecoder, &pSW->colVer) < 0) return -1;
pSW->pSchema = (SSchema*)taosMemoryCalloc(pSW->nCols, sizeof(SSchema));
if (pSW->pSchema == NULL) return -1;
@ -410,6 +422,8 @@ static FORCE_INLINE int32_t tDecodeSSchemaWrapper(SDecoder* pDecoder, SSchemaWra
static FORCE_INLINE int32_t tDecodeSSchemaWrapperEx(SDecoder* pDecoder, SSchemaWrapper* pSW) {
if (tDecodeI32v(pDecoder, &pSW->nCols) < 0) return -1;
if (tDecodeI32v(pDecoder, &pSW->sver) < 0) return -1;
if (tDecodeI32v(pDecoder, &pSW->tagVer) < 0) return -1;
if (tDecodeI32v(pDecoder, &pSW->colVer) < 0) return -1;
pSW->pSchema = (SSchema*)tDecoderMalloc(pDecoder, pSW->nCols * sizeof(SSchema));
if (pSW->pSchema == NULL) return -1;
@ -455,6 +469,7 @@ int32_t tDeserializeSMDropStbReq(void* buf, int32_t bufLen, SMDropStbReq* pReq);
typedef struct {
char name[TSDB_TABLE_FNAME_LEN];
int8_t alterType;
int32_t verInBlock;
int32_t numOfFields;
SArray* pFields;
int32_t ttl;
@ -1480,6 +1495,7 @@ typedef struct {
typedef struct {
int64_t consumerId;
char cgroup[TSDB_CGROUP_LEN];
char clientId[256];
SArray* topicNames; // SArray<char**>
} SCMSubscribeReq;
@ -1487,6 +1503,7 @@ static FORCE_INLINE int32_t tSerializeSCMSubscribeReq(void** buf, const SCMSubsc
int32_t tlen = 0;
tlen += taosEncodeFixedI64(buf, pReq->consumerId);
tlen += taosEncodeString(buf, pReq->cgroup);
tlen += taosEncodeString(buf, pReq->clientId);
int32_t topicNum = taosArrayGetSize(pReq->topicNames);
tlen += taosEncodeFixedI32(buf, topicNum);
@ -1500,6 +1517,7 @@ static FORCE_INLINE int32_t tSerializeSCMSubscribeReq(void** buf, const SCMSubsc
static FORCE_INLINE void* tDeserializeSCMSubscribeReq(void* buf, SCMSubscribeReq* pReq) {
buf = taosDecodeFixedI64(buf, &pReq->consumerId);
buf = taosDecodeStringTo(buf, pReq->cgroup);
buf = taosDecodeStringTo(buf, pReq->clientId);
int32_t topicNum;
buf = taosDecodeFixedI32(buf, &topicNum);

View File

@ -59,6 +59,11 @@ typedef struct SMetaData {
SArray *pQnodeList; // qnode list, SArray<SQueryNodeAddr>
} SMetaData;
typedef struct STbSVersion {
char* tbFName;
int32_t sver;
} STbSVersion;
typedef struct SCatalogCfg {
uint32_t maxTblCacheNum;
uint32_t maxDBCacheNum;
@ -165,6 +170,8 @@ int32_t catalogUpdateSTableMeta(SCatalog* pCatalog, STableMetaRsp *rspMsg);
*/
int32_t catalogRefreshDBVgInfo(SCatalog* pCtg, void *pTrans, const SEpSet* pMgmtEps, const char* dbFName);
int32_t catalogChkTbMetaVersion(SCatalog* pCtg, void *pTrans, const SEpSet* pMgmtEps, SArray* pTables);
/**
* Force refresh a table's local cached meta data.
* @param pCatalog (input, got with catalogGetHandle)

View File

@ -72,7 +72,7 @@ int32_t schedulerInit(SSchedulerCfg *cfg);
* @param nodeList Qnode/Vnode address list, element is SQueryNodeAddr
* @return
*/
int32_t schedulerExecJob(void *transport, SArray *nodeList, SQueryPlan *pDag, int64_t *pJob, const char *sql, int64_t startTs, bool needRes, SQueryResult *pRes);
int32_t schedulerExecJob(void *transport, SArray *nodeList, SQueryPlan *pDag, int64_t *pJob, const char *sql, int64_t startTs, SQueryResult *pRes);
/**
* Process the query job, generated according to the query physical plan.

View File

@ -38,6 +38,13 @@ typedef int32_t TdUcs4;
#define wcsncpy WCSNCPY_FUNC_TAOS_FORBID
#define wchar_t WCHAR_T_TYPE_TAOS_FORBID
#define strcasestr STR_CASE_STR_FORBID
#define strtoll STR_TO_LL_FUNC_TAOS_FORBID
#define strtoull STR_TO_ULL_FUNC_TAOS_FORBID
#define strtol STR_TO_L_FUNC_TAOS_FORBID
#define strtoul STR_TO_UL_FUNC_TAOS_FORBID
#define strtod STR_TO_LD_FUNC_TAOS_FORBID
#define strtold STR_TO_D_FUNC_TAOS_FORBID
#define strtof STR_TO_F_FUNC_TAOS_FORBID
#endif
#ifdef WINDOWS
@ -72,6 +79,17 @@ int32_t taosWcharsToMbs(char *pStrs, TdWchar *pWchars, int32_t size);
char *taosStrCaseStr(const char *str, const char *pattern);
int64_t taosStr2Int64(const char *str, char** pEnd, int32_t radix);
uint64_t taosStr2UInt64(const char *str, char** pEnd, int32_t radix);
int32_t taosStr2Int32(const char *str, char** pEnd, int32_t radix);
uint32_t taosStr2UInt32(const char *str, char** pEnd, int32_t radix);
int16_t taosStr2Int16(const char *str, char** pEnd, int32_t radix);
uint16_t taosStr2UInt16(const char *str, char** pEnd, int32_t radix);
int8_t taosStr2Int8(const char *str, char** pEnd, int32_t radix);
uint8_t taosStr2UInt8(const char *str, char** pEnd, int32_t radix);
double taosStr2Double(const char *str, char** pEnd);
float taosStr2Float(const char *str, char** pEnd);
#ifdef __cplusplus
}
#endif

View File

@ -56,10 +56,10 @@ typedef enum { SSkipListPutSuccess = 0, SSkipListPutEarlyStop = 1, SSkipListPutS
typedef struct SSkipList {
uint32_t seed;
uint16_t len;
__compar_fn_t comparFn;
__sl_key_fn_t keyFn;
TdThreadRwlock *lock;
uint16_t len;
uint8_t maxLevel;
uint8_t flags;
uint8_t type; // static info above

View File

@ -0,0 +1,6 @@
@echo off
goto %1
:needAdmin
mshta vbscript:createobject("shell.application").shellexecute("%~s0",":hasAdmin","","runas",1)(window.close)&goto :eof
:hasAdmin
cp -f C:\\TDengine\\driver\\taos.dll C:\\Windows\\System32

View File

@ -83,6 +83,15 @@ void closeTransporter(STscObj *pTscObj) {
rpcClose(pTscObj->pAppInfo->pTransporter);
}
static bool clientRpcRfp(int32_t code) {
if (code == TSDB_CODE_RPC_REDIRECT) {
return true;
} else {
return false;
}
}
// TODO refactor
void *openTransporter(const char *user, const char *auth, int32_t numOfThread) {
SRpcInit rpcInit;
@ -91,6 +100,7 @@ void *openTransporter(const char *user, const char *auth, int32_t numOfThread) {
rpcInit.label = "TSC";
rpcInit.numOfThreads = numOfThread;
rpcInit.cfp = processMsgFromServer;
rpcInit.rfp = clientRpcRfp;
rpcInit.sessions = 1024;
rpcInit.connType = TAOS_CONN_CLIENT;
rpcInit.user = (char *)user;

View File

@ -291,7 +291,7 @@ int32_t scheduleQuery(SRequestObj* pRequest, SQueryPlan* pDag, SArray* pNodeList
SQueryResult res = {.code = 0, .numOfRows = 0, .msgSize = ERROR_MSG_BUF_DEFAULT_SIZE, .msg = pRequest->msgBuf};
int32_t code = schedulerExecJob(pTransporter, pNodeList, pDag, &pRequest->body.queryJob, pRequest->sqlstr,
pRequest->metric.start, NULL != pRes, &res);
pRequest->metric.start, &res);
if (code != TSDB_CODE_SUCCESS) {
if (pRequest->body.queryJob != 0) {
schedulerFreeJob(pRequest->body.queryJob);
@ -310,9 +310,7 @@ int32_t scheduleQuery(SRequestObj* pRequest, SQueryPlan* pDag, SArray* pNodeList
}
}
if (pRes) {
*pRes = res.res;
}
*pRes = res.res;
pRequest->code = res.code;
terrno = res.code;
@ -324,7 +322,60 @@ int32_t getQueryPlan(SRequestObj* pRequest, SQuery* pQuery, SArray** pNodeList)
return getPlan(pRequest, pQuery, &pRequest->body.pDag, *pNodeList);
}
int32_t validateSversion(SRequestObj* pRequest, void* res) {
SArray* pArray = NULL;
int32_t code = 0;
if (TDMT_VND_SUBMIT == pRequest->type) {
SSubmitRsp* pRsp = (SSubmitRsp*)res;
if (pRsp->nBlocks <= 0) {
return TSDB_CODE_SUCCESS;
}
pArray = taosArrayInit(pRsp->nBlocks, sizeof(STbSVersion));
if (NULL == pArray) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return TSDB_CODE_OUT_OF_MEMORY;
}
for (int32_t i = 0; i < pRsp->nBlocks; ++i) {
SSubmitBlkRsp *blk = pRsp->pBlocks + i;
STbSVersion tbSver = {.tbFName = blk->tblFName, .sver = blk->sver};
taosArrayPush(pArray, &tbSver);
}
} else if (TDMT_VND_QUERY == pRequest->type) {
}
SCatalog* pCatalog = NULL;
CHECK_CODE_GOTO(catalogGetHandle(pRequest->pTscObj->pAppInfo->clusterId, &pCatalog), _return);
SEpSet epset = getEpSet_s(&pRequest->pTscObj->pAppInfo->mgmtEp);
code = catalogChkTbMetaVersion(pCatalog, pRequest->pTscObj->pAppInfo->pTransporter, &epset, pArray);
_return:
taosArrayDestroy(pArray);
return code;
}
void freeRequestRes(SRequestObj* pRequest, void* res) {
if (NULL == res) {
return;
}
if (TDMT_VND_SUBMIT == pRequest->type) {
tFreeSSubmitRsp((SSubmitRsp*)res);
} else if (TDMT_VND_QUERY == pRequest->type) {
}
}
SRequestObj* launchQueryImpl(SRequestObj* pRequest, SQuery* pQuery, int32_t code, bool keepQuery, void** res) {
void* pRes = NULL;
if (TSDB_CODE_SUCCESS == code) {
switch (pQuery->execMode) {
case QUERY_EXEC_MODE_LOCAL:
@ -337,7 +388,10 @@ SRequestObj* launchQueryImpl(SRequestObj* pRequest, SQuery* pQuery, int32_t code
SArray* pNodeList = taosArrayInit(4, sizeof(struct SQueryNodeAddr));
code = getPlan(pRequest, pQuery, &pRequest->body.pDag, pNodeList);
if (TSDB_CODE_SUCCESS == code) {
code = scheduleQuery(pRequest, pRequest->body.pDag, pNodeList, res);
code = scheduleQuery(pRequest, pRequest->body.pDag, pNodeList, &pRes);
if (NULL != pRes) {
code = validateSversion(pRequest, pRes);
}
}
taosArrayDestroy(pNodeList);
break;
@ -356,6 +410,12 @@ SRequestObj* launchQueryImpl(SRequestObj* pRequest, SQuery* pQuery, int32_t code
if (NULL != pRequest && TSDB_CODE_SUCCESS != code) {
pRequest->code = terrno;
freeRequestRes(pRequest, pRes);
pRes = NULL;
}
if (res) {
*res = pRes;
}
return pRequest;

View File

@ -580,7 +580,7 @@ static bool smlParseNumber(SSmlKv *kvVal, SSmlMsgBuf *msg){
const char *pVal = kvVal->value;
int32_t len = kvVal->length;
char *endptr = NULL;
double result = strtod(pVal, &endptr);
double result = taosStr2Double(pVal, &endptr);
if(pVal == endptr){
smlBuildInvalidDataMsg(msg, "invalid data", pVal);
return false;
@ -714,7 +714,7 @@ static bool smlIsNchar(const char *pVal, uint16_t len) {
static int64_t smlGetTimeValue(const char *value, int32_t len, int8_t type) {
char *endPtr = NULL;
int64_t tsInt64 = strtoll(value, &endPtr, 10);
int64_t tsInt64 = taosStr2Int64(value, &endPtr, 10);
if(value + len != endPtr){
return -1;
}

View File

@ -263,7 +263,7 @@ static const SSysDbTableSchema topicSchema[] = {
static const SSysDbTableSchema consumerSchema[] = {
{.name = "consumer_id", .bytes = 8, .type = TSDB_DATA_TYPE_BIGINT},
{.name = "consumer_group", .bytes = SYSTABLE_SCH_TABLE_NAME_LEN, .type = TSDB_DATA_TYPE_BINARY},
{.name = "app_id", .bytes = SYSTABLE_SCH_TABLE_NAME_LEN, .type = TSDB_DATA_TYPE_BINARY},
{.name = "client_id", .bytes = SYSTABLE_SCH_TABLE_NAME_LEN, .type = TSDB_DATA_TYPE_BINARY},
{.name = "status", .bytes = 20 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY},
{.name = "topics", .bytes = TSDB_TOPIC_FNAME_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY},
{.name = "pid", .bytes = 4, .type = TSDB_DATA_TYPE_INT},

View File

@ -600,6 +600,7 @@ int32_t tSerializeSMAlterStbReq(void *buf, int32_t bufLen, SMAlterStbReq *pReq)
if (tStartEncode(&encoder) < 0) return -1;
if (tEncodeCStr(&encoder, pReq->name) < 0) return -1;
if (tEncodeI8(&encoder, pReq->alterType) < 0) return -1;
if (tEncodeI32(&encoder, pReq->verInBlock) < 0) return -1;
if (tEncodeI32(&encoder, pReq->numOfFields) < 0) return -1;
for (int32_t i = 0; i < pReq->numOfFields; ++i) {
SField *pField = taosArrayGet(pReq->pFields, i);
@ -626,6 +627,7 @@ int32_t tDeserializeSMAlterStbReq(void *buf, int32_t bufLen, SMAlterStbReq *pReq
if (tStartDecode(&decoder) < 0) return -1;
if (tDecodeCStrTo(&decoder, pReq->name) < 0) return -1;
if (tDecodeI8(&decoder, &pReq->alterType) < 0) return -1;
if (tDecodeI32(&decoder, &pReq->verInBlock) < 0) return -1;
if (tDecodeI32(&decoder, &pReq->numOfFields) < 0) return -1;
pReq->pFields = taosArrayInit(pReq->numOfFields, sizeof(SField));
if (pReq->pFields == NULL) {
@ -4087,10 +4089,8 @@ static int32_t tEncodeSSubmitBlkRsp(SEncoder *pEncoder, const SSubmitBlkRsp *pBl
if (tEncodeI32(pEncoder, pBlock->code) < 0) return -1;
if (tEncodeI8(pEncoder, pBlock->hashMeta) < 0) return -1;
if (pBlock->hashMeta) {
if (tEncodeI64(pEncoder, pBlock->uid) < 0) return -1;
if (tEncodeCStr(pEncoder, pBlock->tblFName) < 0) return -1;
}
if (tEncodeI64(pEncoder, pBlock->uid) < 0) return -1;
if (tEncodeCStr(pEncoder, pBlock->tblFName) < 0) return -1;
if (tEncodeI32v(pEncoder, pBlock->numOfRows) < 0) return -1;
if (tEncodeI32v(pEncoder, pBlock->affectedRows) < 0) return -1;
if (tEncodeI64v(pEncoder, pBlock->sver) < 0) return -1;
@ -4104,12 +4104,10 @@ static int32_t tDecodeSSubmitBlkRsp(SDecoder *pDecoder, SSubmitBlkRsp *pBlock) {
if (tDecodeI32(pDecoder, &pBlock->code) < 0) return -1;
if (tDecodeI8(pDecoder, &pBlock->hashMeta) < 0) return -1;
if (pBlock->hashMeta) {
if (tDecodeI64(pDecoder, &pBlock->uid) < 0) return -1;
pBlock->tblFName = taosMemoryCalloc(TSDB_TABLE_FNAME_LEN, 1);
if (NULL == pBlock->tblFName) return -1;
if (tDecodeCStrTo(pDecoder, pBlock->tblFName) < 0) return -1;
}
if (tDecodeI64(pDecoder, &pBlock->uid) < 0) return -1;
pBlock->tblFName = taosMemoryCalloc(TSDB_TABLE_FNAME_LEN, 1);
if (NULL == pBlock->tblFName) return -1;
if (tDecodeCStrTo(pDecoder, pBlock->tblFName) < 0) return -1;
if (tDecodeI32v(pDecoder, &pBlock->numOfRows) < 0) return -1;
if (tDecodeI32v(pDecoder, &pBlock->affectedRows) < 0) return -1;
if (tDecodeI64v(pDecoder, &pBlock->sver) < 0) return -1;

View File

@ -250,7 +250,7 @@ int32_t tNameFromString(SName* dst, const char* str, uint32_t type) {
return -1;
}
dst->acctId = strtoll(str, NULL, 10);
dst->acctId = taosStr2Int32(str, NULL, 10);
}
if ((type & T_NAME_DB) == T_NAME_DB) {

View File

@ -590,7 +590,7 @@ int32_t parseAbsoluteDuration(const char* token, int32_t tokenlen, int64_t* dura
char* endPtr = NULL;
/* get the basic numeric value */
int64_t timestamp = strtoll(token, &endPtr, 10);
int64_t timestamp = taosStr2Int64(token, &endPtr, 10);
if (errno != 0) {
return -1;
}
@ -608,7 +608,7 @@ int32_t parseNatualDuration(const char* token, int32_t tokenLen, int64_t* durati
errno = 0;
/* get the basic numeric value */
*duration = strtoll(token, NULL, 10);
*duration = taosStr2Int64(token, NULL, 10);
if (errno != 0) {
return -1;
}

View File

@ -39,7 +39,7 @@ int32_t toInteger(const char *z, int32_t n, int32_t base, int64_t *value) {
errno = 0;
char *endPtr = NULL;
*value = strtoll(z, &endPtr, base);
*value = taosStr2Int64(z, &endPtr, base);
if (errno == ERANGE || errno == EINVAL || endPtr - z != n) {
errno = 0;
return -1;
@ -58,7 +58,7 @@ int32_t toUInteger(const char *z, int32_t n, int32_t base, uint64_t *value) {
return -1;
}
*value = strtoull(z, &endPtr, base);
*value = taosStr2UInt64(z, &endPtr, base);
if (errno == ERANGE || errno == EINVAL || endPtr - z != n) {
errno = 0;
return -1;
@ -434,7 +434,7 @@ static FORCE_INLINE int32_t convertToDouble(char *pStr, int32_t len, double *val
// return -1;
// }
//
// *value = strtod(pStr, NULL);
// *value = taosStr2Double(pStr, NULL);
return 0;
}
@ -911,7 +911,7 @@ int32_t taosVariantTypeSetType(SVariant *pVariant, char type) {
case TSDB_DATA_TYPE_DOUBLE: {
if (pVariant->nType == TSDB_DATA_TYPE_BINARY) {
errno = 0;
double v = strtod(pVariant->pz, NULL);
double v = taosStr2Double(pVariant->pz, NULL);
if ((errno == ERANGE && v == -1) || (isinf(v) || isnan(v))) {
taosMemoryFree(pVariant->pz);
return -1;

View File

@ -16,7 +16,7 @@
#define _DEFAULT_SOURCE
#include "bmInt.h"
static void bmGetMonitorInfo(SBnodeMgmt *pMgmt, SMonBmInfo *bmInfo) {}
void bmGetMonitorInfo(SBnodeMgmt *pMgmt, SMonBmInfo *bmInfo) {}
int32_t bmProcessGetMonBmInfoReq(SBnodeMgmt *pMgmt, SRpcMsg *pReq) {
SMonBmInfo bmInfo = {0};

View File

@ -32,7 +32,9 @@ typedef struct SDnodeMgmt {
SSingleWorker mgmtWorker;
ProcessCreateNodeFp processCreateNodeFp;
ProcessDropNodeFp processDropNodeFp;
IsNodeRequiredFp isNodeRequiredFp;
SendMonitorReportFp sendMonitorReportFp;
GetVnodeLoadsFp getVnodeLoadsFp;
GetMnodeLoadsFp getMnodeLoadsFp;
} SDnodeMgmt;
// dmHandle.c
@ -43,11 +45,6 @@ int32_t dmProcessAuthRsp(SDnodeMgmt *pMgmt, SRpcMsg *pMsg);
int32_t dmProcessGrantRsp(SDnodeMgmt *pMgmt, SRpcMsg *pMsg);
int32_t dmProcessServerRunStatus(SDnodeMgmt *pMgmt, SRpcMsg *pMsg);
// dmMonitor.c
void dmGetVnodeLoads(SDnodeMgmt *pMgmt, SMonVloadInfo *pInfo);
void dmGetMnodeLoads(SDnodeMgmt *pMgmt, SMonMloadInfo *pInfo);
void dmSendMonitorReport(SDnodeMgmt *pMgmt);
// dmWorker.c
int32_t dmPutNodeMsgToMgmtQueue(SDnodeMgmt *pMgmt, SRpcMsg *pMsg);
int32_t dmStartStatusThread(SDnodeMgmt *pMgmt);

View File

@ -72,11 +72,11 @@ void dmSendStatusReq(SDnodeMgmt *pMgmt) {
taosRUnLockLatch(&pMgmt->pData->latch);
SMonVloadInfo vinfo = {0};
dmGetVnodeLoads(pMgmt, &vinfo);
(*pMgmt->getVnodeLoadsFp)(&vinfo);
req.pVloads = vinfo.pVloads;
SMonMloadInfo minfo = {0};
dmGetMnodeLoads(pMgmt, &minfo);
(*pMgmt->getMnodeLoadsFp)(&minfo);
int32_t contLen = tSerializeSStatusReq(NULL, 0, &req);
void *pHead = rpcMallocCont(contLen);
@ -115,7 +115,7 @@ static void dmGetServerRunStatus(SDnodeMgmt *pMgmt, SServerStatusRsp *pStatus) {
SServerStatusRsp statusRsp = {0};
SMonMloadInfo minfo = {0};
dmGetMnodeLoads(pMgmt, &minfo);
(*pMgmt->getMnodeLoadsFp)(&minfo);
if (minfo.isMnode && minfo.load.syncState == TAOS_SYNC_STATE_ERROR) {
pStatus->statusCode = TSDB_SRV_STATUS_SERVICE_DEGRADED;
snprintf(pStatus->details, sizeof(pStatus->details), "mnode sync state is %s", syncStr(minfo.load.syncState));
@ -123,7 +123,7 @@ static void dmGetServerRunStatus(SDnodeMgmt *pMgmt, SServerStatusRsp *pStatus) {
}
SMonVloadInfo vinfo = {0};
dmGetVnodeLoads(pMgmt, &vinfo);
(*pMgmt->getVnodeLoadsFp)(&vinfo);
for (int32_t i = 0; i < taosArrayGetSize(vinfo.pVloads); ++i) {
SVnodeLoad *pLoad = taosArrayGet(vinfo.pVloads, i);
if (pLoad->syncState == TAOS_SYNC_STATE_ERROR) {

View File

@ -45,7 +45,9 @@ static int32_t dmOpenMgmt(SMgmtInputOpt *pInput, SMgmtOutputOpt *pOutput) {
pMgmt->name = pInput->name;
pMgmt->processCreateNodeFp = pInput->processCreateNodeFp;
pMgmt->processDropNodeFp = pInput->processDropNodeFp;
pMgmt->isNodeRequiredFp = pInput->isNodeRequiredFp;
pMgmt->sendMonitorReportFp = pInput->sendMonitorReportFp;
pMgmt->getVnodeLoadsFp = pInput->getVnodeLoadsFp;
pMgmt->getMnodeLoadsFp = pInput->getMnodeLoadsFp;
if (dmStartWorker(pMgmt) != 0) {
return -1;

View File

@ -1,104 +0,0 @@
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#define _DEFAULT_SOURCE
#include "dmInt.h"
#define dmSendLocalRecv(pMgmt, mtype, func, pInfo) \
if (!tsMultiProcess) { \
SRpcMsg rsp = {0}; \
SRpcMsg req = {.msgType = mtype}; \
SEpSet epset = {.inUse = 0, .numOfEps = 1}; \
tstrncpy(epset.eps[0].fqdn, tsLocalFqdn, TSDB_FQDN_LEN); \
epset.eps[0].port = tsServerPort; \
rpcSendRecv(pMgmt->msgCb.clientRpc, &epset, &req, &rsp); \
if (rsp.code == 0 && rsp.contLen > 0) { \
func(rsp.pCont, rsp.contLen, pInfo); \
} \
rpcFreeCont(rsp.pCont); \
}
static void dmGetMonitorBasicInfo(SDnodeMgmt *pMgmt, SMonBasicInfo *pInfo) {
pInfo->protocol = 1;
pInfo->dnode_id = pMgmt->pData->dnodeId;
pInfo->cluster_id = pMgmt->pData->clusterId;
tstrncpy(pInfo->dnode_ep, tsLocalEp, TSDB_EP_LEN);
}
static void dmGetMonitorDnodeInfo(SDnodeMgmt *pMgmt, SMonDnodeInfo *pInfo) {
pInfo->uptime = (taosGetTimestampMs() - pMgmt->pData->rebootTime) / (86400000.0f);
pInfo->has_mnode = (*pMgmt->isNodeRequiredFp)(MNODE);
pInfo->has_qnode = (*pMgmt->isNodeRequiredFp)(QNODE);
pInfo->has_snode = (*pMgmt->isNodeRequiredFp)(SNODE);
pInfo->has_bnode = (*pMgmt->isNodeRequiredFp)(BNODE);
tstrncpy(pInfo->logdir.name, tsLogDir, sizeof(pInfo->logdir.name));
pInfo->logdir.size = tsLogSpace.size;
tstrncpy(pInfo->tempdir.name, tsTempDir, sizeof(pInfo->tempdir.name));
pInfo->tempdir.size = tsTempSpace.size;
}
static void dmGetMonitorInfo(SDnodeMgmt *pMgmt, SMonDmInfo *pInfo) {
dmGetMonitorBasicInfo(pMgmt, &pInfo->basic);
dmGetMonitorDnodeInfo(pMgmt, &pInfo->dnode);
dmGetMonitorSystemInfo(&pInfo->sys);
}
void dmSendMonitorReport(SDnodeMgmt *pMgmt) {
if (!tsEnableMonitor || tsMonitorFqdn[0] == 0 || tsMonitorPort == 0) return;
dTrace("send monitor report to %s:%u", tsMonitorFqdn, tsMonitorPort);
SMonDmInfo dmInfo = {0};
SMonMmInfo mmInfo = {0};
SMonVmInfo vmInfo = {0};
SMonQmInfo qmInfo = {0};
SMonSmInfo smInfo = {0};
SMonBmInfo bmInfo = {0};
dmGetMonitorInfo(pMgmt, &dmInfo);
dmSendLocalRecv(pMgmt, TDMT_MON_VM_INFO, tDeserializeSMonVmInfo, &vmInfo);
if (dmInfo.dnode.has_mnode) {
dmSendLocalRecv(pMgmt, TDMT_MON_MM_INFO, tDeserializeSMonMmInfo, &mmInfo);
}
if (dmInfo.dnode.has_qnode) {
dmSendLocalRecv(pMgmt, TDMT_MON_QM_INFO, tDeserializeSMonQmInfo, &qmInfo);
}
if (dmInfo.dnode.has_snode) {
dmSendLocalRecv(pMgmt, TDMT_MON_SM_INFO, tDeserializeSMonSmInfo, &smInfo);
}
if (dmInfo.dnode.has_bnode) {
dmSendLocalRecv(pMgmt, TDMT_MON_BM_INFO, tDeserializeSMonBmInfo, &bmInfo);
}
monSetDmInfo(&dmInfo);
monSetMmInfo(&mmInfo);
monSetVmInfo(&vmInfo);
monSetQmInfo(&qmInfo);
monSetSmInfo(&smInfo);
monSetBmInfo(&bmInfo);
tFreeSMonMmInfo(&mmInfo);
tFreeSMonVmInfo(&vmInfo);
tFreeSMonQmInfo(&qmInfo);
tFreeSMonSmInfo(&smInfo);
tFreeSMonBmInfo(&bmInfo);
monSendReport();
}
void dmGetVnodeLoads(SDnodeMgmt *pMgmt, SMonVloadInfo *pInfo) {
dmSendLocalRecv(pMgmt, TDMT_MON_VM_LOAD, tDeserializeSMonVloadInfo, pInfo);
}
void dmGetMnodeLoads(SDnodeMgmt *pMgmt, SMonMloadInfo *pInfo) {
dmSendLocalRecv(pMgmt, TDMT_MON_MM_LOAD, tDeserializeSMonMloadInfo, pInfo);
}

View File

@ -50,7 +50,7 @@ static void *dmMonitorThreadFp(void *param) {
int64_t curTime = taosGetTimestampMs();
float interval = (curTime - lastTime) / 1000.0f;
if (interval >= tsMonitorInterval) {
dmSendMonitorReport(pMgmt);
(*pMgmt->sendMonitorReportFp)();
lastTime = curTime;
}
}

View File

@ -154,6 +154,6 @@ int32_t mmWriteFile(SMnodeMgmt *pMgmt, SDCreateMnodeReq *pReq, bool deployed) {
return -1;
}
dInfo("successed to write %s, deployed:%d", realfile, deployed);
dDebug("successed to write %s, deployed:%d", realfile, deployed);
return 0;
}

View File

@ -16,8 +16,13 @@
#define _DEFAULT_SOURCE
#include "mmInt.h"
static void mmGetMonitorInfo(SMnodeMgmt *pMgmt, SMonMmInfo *mmInfo) {
mndGetMonitorInfo(pMgmt->pMnode, &mmInfo->cluster, &mmInfo->vgroup, &mmInfo->grant);
void mmGetMonitorInfo(SMnodeMgmt *pMgmt, SMonMmInfo *pInfo) {
mndGetMonitorInfo(pMgmt->pMnode, &pInfo->cluster, &pInfo->vgroup, &pInfo->grant);
}
void mmGetMnodeLoads(SMnodeMgmt *pMgmt, SMonMloadInfo *pInfo) {
pInfo->isMnode = 1;
mndGetLoad(pMgmt->pMnode, &pInfo->load);
}
int32_t mmProcessGetMonitorInfoReq(SMnodeMgmt *pMgmt, SRpcMsg *pReq) {
@ -45,11 +50,6 @@ int32_t mmProcessGetMonitorInfoReq(SMnodeMgmt *pMgmt, SRpcMsg *pReq) {
return 0;
}
static void mmGetMnodeLoads(SMnodeMgmt *pMgmt, SMonMloadInfo *pInfo) {
pInfo->isMnode = 1;
mndGetLoad(pMgmt->pMnode, &pInfo->load);
}
int32_t mmProcessGetLoadsReq(SMnodeMgmt *pMgmt, SRpcMsg *pReq) {
SMonMloadInfo mloads = {0};
mmGetMnodeLoads(pMgmt, &mloads);

View File

@ -16,7 +16,7 @@
#define _DEFAULT_SOURCE
#include "qmInt.h"
static void qmGetMonitorInfo(SQnodeMgmt *pMgmt, SMonQmInfo *qmInfo) {}
void qmGetMonitorInfo(SQnodeMgmt *pMgmt, SMonQmInfo *qmInfo) {}
int32_t qmProcessGetMonitorInfoReq(SQnodeMgmt *pMgmt, SRpcMsg *pReq) {
SMonQmInfo qmInfo = {0};

View File

@ -16,7 +16,7 @@
#define _DEFAULT_SOURCE
#include "smInt.h"
static void smGetMonitorInfo(SSnodeMgmt *pMgmt, SMonSmInfo *smInfo) {}
void smGetMonitorInfo(SSnodeMgmt *pMgmt, SMonSmInfo *smInfo) {}
int32_t smProcessGetMonitorInfoReq(SSnodeMgmt *pMgmt, SRpcMsg *pReq) {
SMonSmInfo smInfo = {0};

View File

@ -128,7 +128,7 @@ int32_t vmGetVnodeListFromFile(SVnodeMgmt *pMgmt, SWrapperCfg **ppCfgs, int32_t
*numOfVnodes = vnodesNum;
code = 0;
dInfo("succcessed to read file %s", file);
dDebug("succcessed to read file %s", file);
_OVER:
if (content != NULL) taosMemoryFree(content);

View File

@ -16,7 +16,7 @@
#define _DEFAULT_SOURCE
#include "vmInt.h"
static void vmGetVnodeLoads(SVnodeMgmt *pMgmt, SMonVloadInfo *pInfo) {
void vmGetVnodeLoads(SVnodeMgmt *pMgmt, SMonVloadInfo *pInfo) {
pInfo->pVloads = taosArrayInit(pMgmt->state.totalVnodes, sizeof(SVnodeLoad));
if (pInfo->pVloads == NULL) return;
@ -37,7 +37,7 @@ static void vmGetVnodeLoads(SVnodeMgmt *pMgmt, SMonVloadInfo *pInfo) {
taosRUnLockLatch(&pMgmt->latch);
}
static void vmGetMonitorInfo(SVnodeMgmt *pMgmt, SMonVmInfo *pInfo) {
void vmGetMonitorInfo(SVnodeMgmt *pMgmt, SMonVmInfo *pInfo) {
SMonVloadInfo vloads = {0};
vmGetVnodeLoads(pMgmt, &vloads);

View File

@ -13,8 +13,8 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef _TD_DND_IMP_H_
#define _TD_DND_IMP_H_
#ifndef _TD_DND_MGMT_H_
#define _TD_DND_MGMT_H_
// tobe deleted
#include "uv.h"
@ -165,16 +165,13 @@ SMsgCb dmGetMsgcb(SDnode *pDnode);
int32_t dmInitMsgHandle(SDnode *pDnode);
int32_t dmProcessNodeMsg(SMgmtWrapper *pWrapper, SRpcMsg *pMsg);
// mgmt nodes
SMgmtFunc dmGetMgmtFunc();
SMgmtFunc bmGetMgmtFunc();
SMgmtFunc qmGetMgmtFunc();
SMgmtFunc smGetMgmtFunc();
SMgmtFunc vmGetMgmtFunc();
SMgmtFunc mmGetMgmtFunc();
// dmMonitor.c
void dmSendMonitorReport();
void dmGetVnodeLoads(SMonVloadInfo *pInfo);
void dmGetMnodeLoads(SMonMloadInfo *pInfo);
#ifdef __cplusplus
}
#endif
#endif /*_TD_DND_IMP_H_*/
#endif /*_TD_DND_MGMT_H_*/

View File

@ -0,0 +1,45 @@
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef _TD_DND_NODES_H_
#define _TD_DND_NODES_H_
#include "dmInt.h"
#ifdef __cplusplus
extern "C" {
#endif
SMgmtFunc dmGetMgmtFunc();
SMgmtFunc bmGetMgmtFunc();
SMgmtFunc qmGetMgmtFunc();
SMgmtFunc smGetMgmtFunc();
SMgmtFunc vmGetMgmtFunc();
SMgmtFunc mmGetMgmtFunc();
void mmGetMonitorInfo(void *pMgmt, SMonMmInfo *pInfo);
void vmGetMonitorInfo(void *pMgmt, SMonVmInfo *pInfo);
void qmGetMonitorInfo(void *pMgmt, SMonQmInfo *pInfo);
void smGetMonitorInfo(void *pMgmt, SMonSmInfo *pInfo);
void bmGetMonitorInfo(void *pMgmt, SMonBmInfo *pInfo);
void vmGetVnodeLoads(void *pMgmt, SMonVloadInfo *pInfo);
void mmGetMnodeLoads(void *pMgmt, SMonMloadInfo *pInfo);
#ifdef __cplusplus
}
#endif
#endif /*_TD_DND_NODES_H_*/

View File

@ -168,11 +168,6 @@ static int32_t dmProcessDropNodeReq(EDndNodeType ntype, SRpcMsg *pMsg) {
return code;
}
static bool dmIsNodeRequired(EDndNodeType ntype) {
SDnode *pDnode = dmInstance();
return pDnode->wrappers[ntype].required;
}
SMgmtInputOpt dmBuildMgmtInputOpt(SMgmtWrapper *pWrapper) {
SMgmtInputOpt opt = {
.path = pWrapper->path,
@ -180,7 +175,9 @@ SMgmtInputOpt dmBuildMgmtInputOpt(SMgmtWrapper *pWrapper) {
.pData = &pWrapper->pDnode->data,
.processCreateNodeFp = dmProcessCreateNodeReq,
.processDropNodeFp = dmProcessDropNodeReq,
.isNodeRequiredFp = dmIsNodeRequired,
.sendMonitorReportFp = dmSendMonitorReport,
.getVnodeLoadsFp = dmGetVnodeLoads,
.getMnodeLoadsFp = dmGetMnodeLoads,
};
opt.msgCb = dmGetMsgcb(pWrapper->pDnode);

View File

@ -15,6 +15,7 @@
#define _DEFAULT_SOURCE
#include "dmMgmt.h"
#include "dmNodes.h"
static bool dmRequireNode(SDnode *pDnode, SMgmtWrapper *pWrapper) {
SMgmtInputOpt input = dmBuildMgmtInputOpt(pWrapper);
@ -189,7 +190,7 @@ int32_t dmInitDnode(SDnode *pDnode, EDndNodeType rtype) {
}
dmReportStartup("dnode-transport", "initialized");
dInfo("dnode is created, ptr:%p", pDnode);
dDebug("dnode is created, ptr:%p", pDnode);
code = 0;
_OVER:
@ -208,7 +209,7 @@ void dmCleanupDnode(SDnode *pDnode) {
dmCleanupClient(pDnode);
dmCleanupServer(pDnode);
dmClearVars(pDnode);
dInfo("dnode is closed, ptr:%p", pDnode);
dDebug("dnode is closed, ptr:%p", pDnode);
}
void dmSetStatus(SDnode *pDnode, EDndRunStatus status) {

View File

@ -0,0 +1,170 @@
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#define _DEFAULT_SOURCE
#include "dmMgmt.h"
#include "dmNodes.h"
#define dmSendLocalRecv(pDnode, mtype, func, pInfo) \
SRpcMsg rsp = {0}; \
SRpcMsg req = {.msgType = mtype}; \
SEpSet epset = {.inUse = 0, .numOfEps = 1}; \
tstrncpy(epset.eps[0].fqdn, tsLocalFqdn, TSDB_FQDN_LEN); \
epset.eps[0].port = tsServerPort; \
rpcSendRecv(pDnode->trans.clientRpc, &epset, &req, &rsp); \
if (rsp.code == 0 && rsp.contLen > 0) { \
func(rsp.pCont, rsp.contLen, pInfo); \
} \
rpcFreeCont(rsp.pCont);
static void dmGetMonitorBasicInfo(SDnode *pDnode, SMonBasicInfo *pInfo) {
pInfo->protocol = 1;
pInfo->dnode_id = pDnode->data.dnodeId;
pInfo->cluster_id = pDnode->data.clusterId;
tstrncpy(pInfo->dnode_ep, tsLocalEp, TSDB_EP_LEN);
}
static void dmGetMonitorDnodeInfo(SDnode *pDnode, SMonDnodeInfo *pInfo) {
pInfo->uptime = (taosGetTimestampMs() - pDnode->data.rebootTime) / (86400000.0f);
pInfo->has_mnode = pDnode->wrappers[MNODE].required;
pInfo->has_qnode = pDnode->wrappers[QNODE].required;
pInfo->has_snode = pDnode->wrappers[SNODE].required;
pInfo->has_bnode = pDnode->wrappers[BNODE].required;
tstrncpy(pInfo->logdir.name, tsLogDir, sizeof(pInfo->logdir.name));
pInfo->logdir.size = tsLogSpace.size;
tstrncpy(pInfo->tempdir.name, tsTempDir, sizeof(pInfo->tempdir.name));
pInfo->tempdir.size = tsTempSpace.size;
}
static void dmGetDmMonitorInfo(SDnode *pDnode) {
SMonDmInfo dmInfo = {0};
dmGetMonitorBasicInfo(pDnode, &dmInfo.basic);
dmGetMonitorDnodeInfo(pDnode, &dmInfo.dnode);
dmGetMonitorSystemInfo(&dmInfo.sys);
monSetDmInfo(&dmInfo);
}
static void dmGetMmMonitorInfo(SDnode *pDnode) {
SMgmtWrapper *pWrapper = &pDnode->wrappers[MNODE];
if (dmMarkWrapper(pWrapper) == 0) {
SMonMmInfo mmInfo = {0};
if (tsMultiProcess) {
dmSendLocalRecv(pDnode, TDMT_MON_MM_INFO, tDeserializeSMonMmInfo, &mmInfo);
} else if (pWrapper->pMgmt != NULL) {
mmGetMonitorInfo(pWrapper->pMgmt, &mmInfo);
}
dmReleaseWrapper(pWrapper);
monSetMmInfo(&mmInfo);
tFreeSMonMmInfo(&mmInfo);
}
}
static void dmGetVmMonitorInfo(SDnode *pDnode) {
SMgmtWrapper *pWrapper = &pDnode->wrappers[VNODE];
if (dmMarkWrapper(pWrapper) == 0) {
SMonVmInfo vmInfo = {0};
if (tsMultiProcess) {
dmSendLocalRecv(pDnode, TDMT_MON_VM_INFO, tDeserializeSMonVmInfo, &vmInfo);
} else if (pWrapper->pMgmt != NULL) {
vmGetMonitorInfo(pWrapper->pMgmt, &vmInfo);
}
dmReleaseWrapper(pWrapper);
monSetVmInfo(&vmInfo);
tFreeSMonVmInfo(&vmInfo);
}
}
static void dmGetQmMonitorInfo(SDnode *pDnode) {
SMgmtWrapper *pWrapper = &pDnode->wrappers[QNODE];
if (dmMarkWrapper(pWrapper) == 0) {
SMonQmInfo qmInfo = {0};
if (tsMultiProcess) {
dmSendLocalRecv(pDnode, TDMT_MON_QM_INFO, tDeserializeSMonQmInfo, &qmInfo);
} else if (pWrapper->pMgmt != NULL) {
qmGetMonitorInfo(pWrapper->pMgmt, &qmInfo);
}
dmReleaseWrapper(pWrapper);
monSetQmInfo(&qmInfo);
tFreeSMonQmInfo(&qmInfo);
}
}
static void dmGetSmMonitorInfo(SDnode *pDnode) {
SMgmtWrapper *pWrapper = &pDnode->wrappers[SNODE];
if (dmMarkWrapper(pWrapper) == 0) {
SMonSmInfo smInfo = {0};
if (tsMultiProcess) {
dmSendLocalRecv(pDnode, TDMT_MON_SM_INFO, tDeserializeSMonSmInfo, &smInfo);
} else if (pWrapper->pMgmt != NULL) {
smGetMonitorInfo(pWrapper->pMgmt, &smInfo);
}
dmReleaseWrapper(pWrapper);
monSetSmInfo(&smInfo);
tFreeSMonSmInfo(&smInfo);
}
}
static void dmGetBmMonitorInfo(SDnode *pDnode) {
SMgmtWrapper *pWrapper = &pDnode->wrappers[BNODE];
if (dmMarkWrapper(pWrapper) == 0) {
SMonBmInfo bmInfo = {0};
if (tsMultiProcess) {
dmSendLocalRecv(pDnode, TDMT_MON_BM_INFO, tDeserializeSMonBmInfo, &bmInfo);
} else if (pWrapper->pMgmt != NULL) {
bmGetMonitorInfo(pWrapper->pMgmt, &bmInfo);
}
dmReleaseWrapper(pWrapper);
monSetBmInfo(&bmInfo);
tFreeSMonBmInfo(&bmInfo);
}
}
void dmSendMonitorReport() {
if (!tsEnableMonitor || tsMonitorFqdn[0] == 0 || tsMonitorPort == 0) return;
dTrace("send monitor report to %s:%u", tsMonitorFqdn, tsMonitorPort);
SDnode *pDnode = dmInstance();
dmGetDmMonitorInfo(pDnode);
dmGetMmMonitorInfo(pDnode);
dmGetVmMonitorInfo(pDnode);
dmGetQmMonitorInfo(pDnode);
dmGetSmMonitorInfo(pDnode);
dmGetBmMonitorInfo(pDnode);
monSendReport();
}
void dmGetVnodeLoads(SMonVloadInfo *pInfo) {
SDnode *pDnode = dmInstance();
SMgmtWrapper *pWrapper = &pDnode->wrappers[VNODE];
if (dmMarkWrapper(pWrapper) == 0) {
if (tsMultiProcess) {
dmSendLocalRecv(pDnode, TDMT_MON_VM_LOAD, tDeserializeSMonVloadInfo, pInfo);
} else if (pWrapper->pMgmt != NULL) {
vmGetVnodeLoads(pWrapper->pMgmt, pInfo);
}
dmReleaseWrapper(pWrapper);
}
}
void dmGetMnodeLoads(SMonMloadInfo *pInfo) {
SDnode *pDnode = dmInstance();
SMgmtWrapper *pWrapper = &pDnode->wrappers[MNODE];
if (tsMultiProcess) {
dmSendLocalRecv(pDnode, TDMT_MON_MM_LOAD, tDeserializeSMonMloadInfo, pInfo);
} else if (pWrapper->pMgmt != NULL) {
mmGetMnodeLoads(pWrapper->pMgmt, pInfo);
}
dmReleaseWrapper(pWrapper);
}

View File

@ -89,21 +89,23 @@ typedef enum {
typedef int32_t (*ProcessCreateNodeFp)(EDndNodeType ntype, SRpcMsg *pMsg);
typedef int32_t (*ProcessDropNodeFp)(EDndNodeType ntype, SRpcMsg *pMsg);
typedef bool (*IsNodeRequiredFp)(EDndNodeType ntype);
typedef void (*SendMonitorReportFp)();
typedef void (*GetVnodeLoadsFp)();
typedef void (*GetMnodeLoadsFp)();
typedef struct {
int32_t dnodeId;
int64_t clusterId;
int64_t dnodeVer;
int64_t updateTime;
int64_t rebootTime;
bool dropped;
bool stopped;
SEpSet mnodeEps;
SArray *dnodeEps;
SHashObj *dnodeHash;
SRWLatch latch;
SMsgCb msgCb;
int32_t dnodeId;
int64_t clusterId;
int64_t dnodeVer;
int64_t updateTime;
int64_t rebootTime;
bool dropped;
bool stopped;
SEpSet mnodeEps;
SArray *dnodeEps;
SHashObj *dnodeHash;
SRWLatch latch;
SMsgCb msgCb;
} SDnodeData;
typedef struct {
@ -113,7 +115,9 @@ typedef struct {
SMsgCb msgCb;
ProcessCreateNodeFp processCreateNodeFp;
ProcessDropNodeFp processDropNodeFp;
IsNodeRequiredFp isNodeRequiredFp;
SendMonitorReportFp sendMonitorReportFp;
GetVnodeLoadsFp getVnodeLoadsFp;
GetMnodeLoadsFp getMnodeLoadsFp;
} SMgmtInputOpt;
typedef struct {

View File

@ -365,6 +365,8 @@ typedef struct {
int64_t uid;
int64_t dbUid;
int32_t version;
int32_t tagVer;
int32_t colVer;
int32_t nextColId;
float xFilesFactor;
int32_t delay;
@ -463,7 +465,7 @@ typedef struct {
typedef struct {
int64_t consumerId;
char cgroup[TSDB_CGROUP_LEN];
char appId[TSDB_CGROUP_LEN];
char clientId[256];
int8_t updateType; // used only for update
int32_t epoch;
int32_t status;

View File

@ -427,6 +427,7 @@ static int32_t mndProcessSubscribeReq(SRpcMsg *pMsg) {
pConsumerOld = mndAcquireConsumer(pMnode, consumerId);
if (pConsumerOld == NULL) {
pConsumerNew = tNewSMqConsumerObj(consumerId, cgroup);
tstrncpy(pConsumerNew->clientId, subscribe.clientId, 256);
pConsumerNew->updateType = CONSUMER_UPDATE__MODIFY;
pConsumerNew->rebNewTopics = newSub;
subscribe.topicNames = NULL;
@ -848,11 +849,11 @@ static int32_t mndRetrieveConsumer(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *
colDataAppend(pColInfo, numOfRows, (const char *)cgroup, false);
// app id
char appId[TSDB_CGROUP_LEN + VARSTR_HEADER_SIZE] = {0};
tstrncpy(varDataVal(appId), pConsumer->appId, TSDB_CGROUP_LEN);
varDataSetLen(appId, strlen(varDataVal(appId)));
char clientId[TSDB_CGROUP_LEN + VARSTR_HEADER_SIZE] = {0};
tstrncpy(varDataVal(clientId), pConsumer->clientId, TSDB_CGROUP_LEN);
varDataSetLen(clientId, strlen(varDataVal(clientId)));
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataAppend(pColInfo, numOfRows, (const char *)appId, false);
colDataAppend(pColInfo, numOfRows, (const char *)clientId, false);
// status
char status[20 + VARSTR_HEADER_SIZE] = {0};

View File

@ -88,6 +88,8 @@ SSdbRaw *mndStbActionEncode(SStbObj *pStb) {
SDB_SET_INT64(pRaw, dataPos, pStb->uid, _OVER)
SDB_SET_INT64(pRaw, dataPos, pStb->dbUid, _OVER)
SDB_SET_INT32(pRaw, dataPos, pStb->version, _OVER)
SDB_SET_INT32(pRaw, dataPos, pStb->tagVer, _OVER)
SDB_SET_INT32(pRaw, dataPos, pStb->colVer, _OVER)
SDB_SET_INT32(pRaw, dataPos, pStb->nextColId, _OVER)
SDB_SET_INT32(pRaw, dataPos, (int32_t)(pStb->xFilesFactor * 10000), _OVER)
SDB_SET_INT32(pRaw, dataPos, pStb->delay, _OVER)
@ -166,6 +168,8 @@ static SSdbRow *mndStbActionDecode(SSdbRaw *pRaw) {
SDB_GET_INT64(pRaw, dataPos, &pStb->uid, _OVER)
SDB_GET_INT64(pRaw, dataPos, &pStb->dbUid, _OVER)
SDB_GET_INT32(pRaw, dataPos, &pStb->version, _OVER)
SDB_GET_INT32(pRaw, dataPos, &pStb->tagVer, _OVER)
SDB_GET_INT32(pRaw, dataPos, &pStb->colVer, _OVER)
SDB_GET_INT32(pRaw, dataPos, &pStb->nextColId, _OVER)
int32_t xFilesFactor = 0;
SDB_GET_INT32(pRaw, dataPos, &xFilesFactor, _OVER)
@ -317,6 +321,8 @@ static int32_t mndStbActionUpdate(SSdb *pSdb, SStbObj *pOld, SStbObj *pNew) {
pOld->updateTime = pNew->updateTime;
pOld->version = pNew->version;
pOld->tagVer = pNew->tagVer;
pOld->colVer = pNew->colVer;
pOld->nextColId = pNew->nextColId;
pOld->ttl = pNew->ttl;
pOld->numOfColumns = pNew->numOfColumns;
@ -384,6 +390,8 @@ static void *mndBuildVCreateStbReq(SMnode *pMnode, SVgObj *pVgroup, SStbObj *pSt
req.rollup = pStb->ast1Len > 0 ? 1 : 0;
req.schema.nCols = pStb->numOfColumns;
req.schema.sver = pStb->version;
req.schema.tagVer = pStb->tagVer;
req.schema.colVer = pStb->colVer;
req.schema.pSchema = pStb->pColumns;
req.schemaTag.nCols = pStb->numOfTags;
req.schemaTag.sver = 1;
@ -657,6 +665,8 @@ int32_t mndBuildStbFromReq(SMnode *pMnode, SStbObj *pDst, SMCreateStbReq *pCreat
pDst->uid = mndGenerateUid(pCreate->name, TSDB_TABLE_FNAME_LEN);
pDst->dbUid = pDb->uid;
pDst->version = 1;
pDst->tagVer = 1;
pDst->colVer = 1;
pDst->nextColId = 1;
pDst->xFilesFactor = pCreate->xFilesFactor;
pDst->delay = pCreate->delay;
@ -949,6 +959,7 @@ static int32_t mndAddSuperTableTag(const SStbObj *pOld, SStbObj *pNew, SArray *p
}
pNew->version++;
pNew->tagVer++;
return 0;
}
@ -967,6 +978,7 @@ static int32_t mndDropSuperTableTag(const SStbObj *pOld, SStbObj *pNew, const ch
pNew->numOfTags--;
pNew->version++;
pNew->tagVer++;
mDebug("stb:%s, start to drop tag %s", pNew->name, tagName);
return 0;
}
@ -1007,6 +1019,7 @@ static int32_t mndAlterStbTagName(const SStbObj *pOld, SStbObj *pNew, SArray *pF
memcpy(pSchema->name, newTagName, TSDB_COL_NAME_LEN);
pNew->version++;
pNew->tagVer++;
mDebug("stb:%s, start to modify tag %s to %s", pNew->name, oldTagName, newTagName);
return 0;
}
@ -1036,6 +1049,7 @@ static int32_t mndAlterStbTagBytes(const SStbObj *pOld, SStbObj *pNew, const SFi
pTag->bytes = pField->bytes;
pNew->version++;
pNew->tagVer++;
mDebug("stb:%s, start to modify tag len %s to %d", pNew->name, pField->name, pField->bytes);
return 0;
@ -1075,6 +1089,7 @@ static int32_t mndAddSuperTableColumn(const SStbObj *pOld, SStbObj *pNew, SArray
}
pNew->version++;
pNew->colVer++;
return 0;
}
@ -1103,6 +1118,7 @@ static int32_t mndDropSuperTableColumn(const SStbObj *pOld, SStbObj *pNew, const
pNew->numOfColumns--;
pNew->version++;
pNew->colVer++;
mDebug("stb:%s, start to drop col %s", pNew->name, colName);
return 0;
}
@ -1141,6 +1157,7 @@ static int32_t mndAlterStbColumnBytes(const SStbObj *pOld, SStbObj *pNew, const
pCol->bytes = pField->bytes;
pNew->version++;
pNew->colVer++;
mDebug("stb:%s, start to modify col len %s to %d", pNew->name, pField->name, pField->bytes);
return 0;
@ -1300,6 +1317,13 @@ static int32_t mndProcessMAlterStbReq(SRpcMsg *pReq) {
goto _OVER;
}
if (alterReq.verInBlock > 0 && alterReq.verInBlock <= pStb->version) {
mDebug("stb:%s, already exist, verInBlock:%d smaller than verInStb:%d, alter success", alterReq.name,
alterReq.verInBlock, pStb->version);
code = 0;
goto _OVER;
}
pUser = mndAcquireUser(pMnode, pReq->conn.user);
if (pUser == NULL) {
goto _OVER;

View File

@ -492,8 +492,8 @@ static int32_t mndDropTopic(SMnode *pMnode, STrans *pTrans, SRpcMsg *pReq, SMqTo
}
static int32_t mndProcessDropTopicReq(SRpcMsg *pReq) {
SMnode *pMnode = pReq->info.node;
SSdb *pSdb = pMnode->pSdb;
SMnode *pMnode = pReq->info.node;
/*SSdb *pSdb = pMnode->pSdb;*/
SMDropTopicReq dropReq = {0};
if (tDeserializeSMDropTopicReq(pReq->pCont, pReq->contLen, &dropReq) != 0) {
@ -502,16 +502,16 @@ static int32_t mndProcessDropTopicReq(SRpcMsg *pReq) {
}
SMqTopicObj *pTopic = mndAcquireTopic(pMnode, dropReq.name);
// if (pTopic == NULL) {
// if (dropReq.igNotExists) {
// mDebug("topic:%s, not exist, ignore not exist is set", dropReq.name);
// return 0;
// } else {
// terrno = TSDB_CODE_MND_TOPIC_NOT_EXIST;
// mError("topic:%s, failed to drop since %s", dropReq.name, terrstr());
// return -1;
// }
// }
if (pTopic == NULL) {
if (dropReq.igNotExists) {
mDebug("topic:%s, not exist, ignore not exist is set", dropReq.name);
return 0;
} else {
terrno = TSDB_CODE_MND_TOPIC_NOT_EXIST;
mError("topic:%s, failed to drop since %s", dropReq.name, terrstr());
return -1;
}
}
if (pTopic->refConsumerCnt != 0) {
mndReleaseTopic(pMnode, pTopic);
@ -528,12 +528,10 @@ static int32_t mndProcessDropTopicReq(SRpcMsg *pReq) {
mDebug("trans:%d, used to drop topic:%s", pTrans->id, pTopic->name);
#if 1
if (mndDropOffsetByTopic(pMnode, pTrans, dropReq.name) < 0) {
ASSERT(0);
return -1;
}
#endif
if (mndDropSubByTopic(pMnode, pTrans, dropReq.name) < 0) {
ASSERT(0);

View File

@ -32,7 +32,7 @@ class MndTestStb : public ::testing::Test {
void* BuildAlterStbUpdateTagBytesReq(const char* stbname, const char* tagname, int32_t bytes, int32_t* pContLen);
void* BuildAlterStbAddColumnReq(const char* stbname, const char* colname, int32_t* pContLen);
void* BuildAlterStbDropColumnReq(const char* stbname, const char* colname, int32_t* pContLen);
void* BuildAlterStbUpdateColumnBytesReq(const char* stbname, const char* colname, int32_t bytes, int32_t* pContLen);
void* BuildAlterStbUpdateColumnBytesReq(const char* stbname, const char* colname, int32_t bytes, int32_t* pContLen, int32_t verInBlock);
};
Testbase MndTestStb::test;
@ -271,12 +271,13 @@ void* MndTestStb::BuildAlterStbDropColumnReq(const char* stbname, const char* co
}
void* MndTestStb::BuildAlterStbUpdateColumnBytesReq(const char* stbname, const char* colname, int32_t bytes,
int32_t* pContLen) {
int32_t* pContLen, int32_t verInBlock) {
SMAlterStbReq req = {0};
strcpy(req.name, stbname);
req.numOfFields = 1;
req.pFields = taosArrayInit(1, sizeof(SField));
req.alterType = TSDB_ALTER_TABLE_UPDATE_COLUMN_BYTES;
req.verInBlock = verInBlock;
SField field = {0};
field.bytes = bytes;
@ -781,31 +782,40 @@ TEST_F(MndTestStb, 08_Alter_Stb_AlterTagBytes) {
}
{
void* pReq = BuildAlterStbUpdateColumnBytesReq(stbname, "col5", 12, &contLen);
void* pReq = BuildAlterStbUpdateColumnBytesReq(stbname, "col5", 12, &contLen, 0);
SRpcMsg* pRsp = test.SendReq(TDMT_MND_ALTER_STB, pReq, contLen);
ASSERT_EQ(pRsp->code, TSDB_CODE_MND_COLUMN_NOT_EXIST);
}
{
void* pReq = BuildAlterStbUpdateColumnBytesReq(stbname, "ts", 8, &contLen);
void* pReq = BuildAlterStbUpdateColumnBytesReq(stbname, "ts", 8, &contLen, 0);
SRpcMsg* pRsp = test.SendReq(TDMT_MND_ALTER_STB, pReq, contLen);
ASSERT_EQ(pRsp->code, TSDB_CODE_MND_INVALID_STB_OPTION);
}
{
void* pReq = BuildAlterStbUpdateColumnBytesReq(stbname, "col1", 8, &contLen);
void* pReq = BuildAlterStbUpdateColumnBytesReq(stbname, "col1", 8, &contLen, 0);
SRpcMsg* pRsp = test.SendReq(TDMT_MND_ALTER_STB, pReq, contLen);
ASSERT_EQ(pRsp->code, TSDB_CODE_MND_INVALID_ROW_BYTES);
}
{
void* pReq = BuildAlterStbUpdateColumnBytesReq(stbname, "col1", TSDB_MAX_BYTES_PER_ROW, &contLen);
void* pReq = BuildAlterStbUpdateColumnBytesReq(stbname, "col1", TSDB_MAX_BYTES_PER_ROW, &contLen, 0);
SRpcMsg* pRsp = test.SendReq(TDMT_MND_ALTER_STB, pReq, contLen);
ASSERT_EQ(pRsp->code, TSDB_CODE_MND_INVALID_ROW_BYTES);
}
{
void* pReq = BuildAlterStbUpdateColumnBytesReq(stbname, "col1", 20, &contLen);
void* pReq = BuildAlterStbUpdateColumnBytesReq(stbname, "col1", 20, &contLen, 0);
SRpcMsg* pRsp = test.SendReq(TDMT_MND_ALTER_STB, pReq, contLen);
ASSERT_EQ(pRsp->code, 0);
test.SendShowReq(TSDB_MGMT_TABLE_STB, "user_stables", dbname);
EXPECT_EQ(test.GetShowRows(), 1);
}
{
void* pReq = BuildAlterStbUpdateColumnBytesReq(stbname, "col_not_exist", 20, &contLen, 1);
SRpcMsg* pRsp = test.SendReq(TDMT_MND_ALTER_STB, pReq, contLen);
ASSERT_EQ(pRsp->code, 0);

View File

@ -104,6 +104,7 @@ int tsdbOpen(SVnode* pVnode, STsdb** ppTsdb, const char* dir, STsdbKeep
int tsdbClose(STsdb** pTsdb);
int tsdbBegin(STsdb* pTsdb);
int tsdbCommit(STsdb* pTsdb);
int tsdbScanAndConvertSubmitMsg(STsdb *pTsdb, const SSubmitReq *pMsg);
int tsdbInsertData(STsdb* pTsdb, int64_t version, SSubmitReq* pMsg, SSubmitRsp* pRsp);
int tsdbInsertTableData(STsdb* pTsdb, SSubmitMsgIter* pMsgIter, SSubmitBlk* pBlock, SSubmitBlkRsp* pRsp);
tsdbReaderT* tsdbQueryTables(SVnode* pVnode, SQueryTableDataCond* pCond, STableGroupInfo* groupList, uint64_t qId,

View File

@ -111,6 +111,7 @@ int32_t tqUpdateTbUidList(STQ* pTq, const SArray* tbUidList) {
pIter = taosHashIterate(pTq->execs, pIter);
if (pIter == NULL) break;
pExec = (STqExec*)pIter;
if (pExec->subType == TOPIC_SUB_TYPE__DB) continue;
for (int32_t i = 0; i < 5; i++) {
int32_t code = qUpdateQualifiedTableId(pExec->task[i], tbUidList, true);
ASSERT(code == 0);

View File

@ -320,6 +320,8 @@ int tsdbInsertTableData(STsdb *pTsdb, SSubmitMsgIter *pMsgIter, SSubmitBlk *pBlo
terrno = TSDB_CODE_PAR_TABLE_NOT_EXIST;
return -1;
}
strcat(pRsp->tblFName, mr.me.name);
if (mr.me.type == TSDB_NORMAL_TABLE) {
sverNew = mr.me.ntbEntry.schema.sver;
} else {

View File

@ -15,7 +15,7 @@
#include "tsdb.h"
static int tsdbScanAndConvertSubmitMsg(STsdb *pTsdb, SSubmitReq *pMsg);
// static int tsdbScanAndConvertSubmitMsg(STsdb *pTsdb, SSubmitReq *pMsg);
int tsdbInsertData(STsdb *pTsdb, int64_t version, SSubmitReq *pMsg, SSubmitRsp *pRsp) {
SSubmitMsgIter msgIter = {0};
@ -54,7 +54,38 @@ int tsdbInsertData(STsdb *pTsdb, int64_t version, SSubmitReq *pMsg, SSubmitRsp *
return 0;
}
static int tsdbScanAndConvertSubmitMsg(STsdb *pTsdb, SSubmitReq *pMsg) {
#if 0
static FORCE_INLINE int tsdbCheckRowRange(STsdb *pTsdb, STable *pTable, STSRow *row, TSKEY minKey, TSKEY maxKey,
TSKEY now) {
TSKEY rowKey = TD_ROW_KEY(row);
if (rowKey < minKey || rowKey > maxKey) {
tsdbError("vgId:%d table %s tid %d uid %" PRIu64 " timestamp is out of range! now %" PRId64 " minKey %" PRId64
" maxKey %" PRId64 " row key %" PRId64,
REPO_ID(pTsdb), TABLE_CHAR_NAME(pTable), TABLE_TID(pTable), TABLE_UID(pTable), now, minKey, maxKey,
rowKey);
terrno = TSDB_CODE_TDB_TIMESTAMP_OUT_OF_RANGE;
return -1;
}
return 0;
}
#endif
static FORCE_INLINE int tsdbCheckRowRange(STsdb *pTsdb, tb_uid_t uid, STSRow *row, TSKEY minKey, TSKEY maxKey,
TSKEY now) {
TSKEY rowKey = TD_ROW_KEY(row);
if (rowKey < minKey || rowKey > maxKey) {
tsdbError("vgId:%d table uid %" PRIu64 " timestamp is out of range! now %" PRId64 " minKey %" PRId64
" maxKey %" PRId64 " row key %" PRId64,
REPO_ID(pTsdb), uid, now, minKey, maxKey, rowKey);
terrno = TSDB_CODE_TDB_TIMESTAMP_OUT_OF_RANGE;
return -1;
}
return 0;
}
int tsdbScanAndConvertSubmitMsg(STsdb *pTsdb, const SSubmitReq *pMsg) {
ASSERT(pMsg != NULL);
// STsdbMeta * pMeta = pTsdb->tsdbMeta;
SSubmitMsgIter msgIter = {0};
@ -112,14 +143,14 @@ static int tsdbScanAndConvertSubmitMsg(STsdb *pTsdb, SSubmitReq *pMsg) {
return -1;
}
}
tsdbInitSubmitBlkIter(pBlock, &blkIter);
while ((row = tsdbGetSubmitBlkNext(&blkIter)) != NULL) {
if (tsdbCheckRowRange(pTsdb, pTable, row, minKey, maxKey, now) < 0) {
#endif
tInitSubmitBlkIter(&msgIter, pBlock, &blkIter);
while ((row = tGetSubmitBlkNext(&blkIter)) != NULL) {
if (tsdbCheckRowRange(pTsdb, msgIter.uid, row, minKey, maxKey, now) < 0) {
return -1;
}
}
#endif
}
if (terrno != TSDB_CODE_SUCCESS) return -1;

View File

@ -631,6 +631,11 @@ static int vnodeProcessSubmitReq(SVnode *pVnode, int64_t version, void *pReq, in
vnodeDebugPrintSubmitMsg(pVnode, pReq, __func__);
#endif
if (tsdbScanAndConvertSubmitMsg(pVnode->pTsdb, pSubmitReq) < 0) {
pRsp->code = terrno;
goto _exit;
}
// handle the request
if (tInitSubmitMsgIter(pSubmitReq, &msgIter) < 0) {
pRsp->code = TSDB_CODE_INVALID_MSG;
@ -681,6 +686,9 @@ static int vnodeProcessSubmitReq(SVnode *pVnode, int64_t version, void *pReq, in
vnodeDebugPrintSingleSubmitMsg(pVnode->pMeta, pBlock, &msgIter, "real uid");
tDecoderClear(&decoder);
} else {
submitBlkRsp.tblFName = taosMemoryMalloc(TSDB_TABLE_FNAME_LEN);
sprintf(submitBlkRsp.tblFName, "%s.", pVnode->config.dbname);
}
if (tsdbInsertTableData(pVnode->pTsdb, &msgIter, pBlock, &submitBlkRsp) < 0) {

View File

@ -2883,6 +2883,110 @@ _return:
CTG_API_LEAVE(code);
}
int32_t ctgGetTbSverFromCache(SCatalog* pCtg, const SName* pTableName, int32_t* sver) {
*sver = -1;
if (NULL == pCtg->dbCache) {
ctgDebug("empty tbmeta cache, tbName:%s", pTableName->tname);
return TSDB_CODE_SUCCESS;
}
SCtgDBCache *dbCache = NULL;
char dbFName[TSDB_DB_FNAME_LEN] = {0};
tNameGetFullDbName(pTableName, dbFName);
ctgAcquireDBCache(pCtg, dbFName, &dbCache);
if (NULL == dbCache) {
ctgDebug("db %s not in cache", pTableName->tname);
return TSDB_CODE_SUCCESS;
}
int32_t tbType = 0;
uint64_t suid = 0;
CTG_LOCK(CTG_READ, &dbCache->tbCache.metaLock);
STableMeta* tbMeta = taosHashGet(dbCache->tbCache.metaCache, pTableName->tname, strlen(pTableName->tname));
if (tbMeta) {
tbType = tbMeta->tableType;
suid = tbMeta->suid;
if (tbType != TSDB_CHILD_TABLE) {
*sver = tbMeta->sversion;
}
}
CTG_UNLOCK(CTG_READ, &dbCache->tbCache.metaLock);
if (NULL == tbMeta) {
ctgReleaseDBCache(pCtg, dbCache);
return TSDB_CODE_SUCCESS;
}
if (tbType != TSDB_CHILD_TABLE) {
ctgReleaseDBCache(pCtg, dbCache);
ctgDebug("Got sver %d from cache, type:%d, dbFName:%s, tbName:%s", *sver, tbType, dbFName, pTableName->tname);
return TSDB_CODE_SUCCESS;
}
ctgDebug("Got subtable meta from cache, dbFName:%s, tbName:%s, suid:%" PRIx64, dbFName, pTableName->tname, suid);
CTG_LOCK(CTG_READ, &dbCache->tbCache.stbLock);
STableMeta **stbMeta = taosHashGet(dbCache->tbCache.stbCache, &suid, sizeof(suid));
if (NULL == stbMeta || NULL == *stbMeta) {
CTG_UNLOCK(CTG_READ, &dbCache->tbCache.stbLock);
ctgReleaseDBCache(pCtg, dbCache);
ctgDebug("stb not in stbCache, suid:%"PRIx64, suid);
return TSDB_CODE_SUCCESS;
}
if ((*stbMeta)->suid != suid) {
CTG_UNLOCK(CTG_READ, &dbCache->tbCache.stbLock);
ctgReleaseDBCache(pCtg, dbCache);
ctgError("stable suid in stbCache mis-match, expected suid:%"PRIx64 ",actual suid:%"PRIx64, suid, (*stbMeta)->suid);
CTG_ERR_RET(TSDB_CODE_CTG_INTERNAL_ERROR);
}
*sver = (*stbMeta)->sversion;
CTG_UNLOCK(CTG_READ, &dbCache->tbCache.stbLock);
ctgReleaseDBCache(pCtg, dbCache);
ctgDebug("Got sver %d from cache, type:%d, dbFName:%s, tbName:%s", *sver, tbType, dbFName, pTableName->tname);
return TSDB_CODE_SUCCESS;
}
int32_t catalogChkTbMetaVersion(SCatalog* pCtg, void *pTrans, const SEpSet* pMgmtEps, SArray* pTables) {
CTG_API_ENTER();
if (NULL == pCtg || NULL == pTrans || NULL == pMgmtEps || NULL == pTables) {
CTG_API_LEAVE(TSDB_CODE_CTG_INVALID_INPUT);
}
SName name;
int32_t sver = 0;
int32_t tbNum = taosArrayGetSize(pTables);
for (int32_t i = 0; i < tbNum; ++i) {
STbSVersion* pTb = (STbSVersion*)taosArrayGet(pTables, i);
tNameFromString(&name, pTb->tbFName, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
if (CTG_IS_SYS_DBNAME(name.dbname)) {
continue;
}
ctgGetTbSverFromCache(pCtg, &name, &sver);
if (sver >= 0 && sver < pTb->sver) {
catalogRemoveTableMeta(pCtg, &name); //TODO REMOVE STB FROM CACHE
}
}
CTG_API_LEAVE(TSDB_CODE_SUCCESS);
}
int32_t catalogRefreshDBVgInfo(SCatalog* pCtg, void *pTrans, const SEpSet* pMgmtEps, const char* dbFName) {
CTG_API_ENTER();

View File

@ -869,8 +869,12 @@ int32_t projectApplyFunctions(SExprInfo* pExpr, SSDataBlock* pResult, SSDataBloc
SColumnInfoData idata = {.info = pResColData->info, .hasNull = true};
SScalarParam dest = {.columnData = &idata};
scalarCalculate(pExpr[k].pExpr->_optrRoot.pRootNode, pBlockList, &dest);
int32_t code = scalarCalculate(pExpr[k].pExpr->_optrRoot.pRootNode, pBlockList, &dest);
if (code != TSDB_CODE_SUCCESS) {
taosArrayDestroy(pBlockList);
return code;
}
int32_t startOffset = createNewColModel ? 0 : pResult->info.rows;
colInfoDataEnsureCapacity(pResColData, startOffset, pResult->info.capacity);
colDataMergeCol(pResColData, startOffset, &pResult->info.capacity, &idata, dest.numOfRows);

View File

@ -118,21 +118,21 @@ TExeCond tCompare(__compar_fn_t func, int8_t cmptype, void* a, void* b, int8_t d
}
return tDoCompare(func, cmptype, &va, &vb);
} else if (dtype == TSDB_DATA_TYPE_FLOAT) {
float va = strtod(a, NULL);
float va = taosStr2Float(a, NULL);
if (errno == ERANGE && va == -1) {
return CONTINUE;
}
float vb = strtod(b, NULL);
float vb = taosStr2Float(b, NULL);
if (errno == ERANGE && va == -1) {
return CONTINUE;
}
return tDoCompare(func, cmptype, &va, &vb);
} else if (dtype == TSDB_DATA_TYPE_DOUBLE) {
double va = strtod(a, NULL);
double va = taosStr2Double(a, NULL);
if (errno == ERANGE && va == -1) {
return CONTINUE;
}
double vb = strtod(b, NULL);
double vb = taosStr2Double(b, NULL);
if (errno == ERANGE && va == -1) {
return CONTINUE;
}

View File

@ -123,7 +123,7 @@ static bool checkAndSplitEndpoint(SAstCreateContext* pCxt, const SToken* pEp, ch
pCxt->errCode = generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_ENDPOINT);
} else {
strncpy(pFqdn, ep, pColon - ep);
*pPort = strtol(pColon + 1, NULL, 10);
*pPort = taosStr2Int32(pColon + 1, NULL, 10);
if (*pPort >= UINT16_MAX || *pPort <= 0) {
pCxt->errCode = generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_PORT);
}
@ -147,7 +147,7 @@ static bool checkPort(SAstCreateContext* pCxt, const SToken* pPortToken, int32_t
if (NULL == pPortToken) {
pCxt->errCode = TSDB_CODE_PAR_SYNTAX_ERROR;
} else {
*pPort = strtol(pPortToken->z, NULL, 10);
*pPort = taosStr2Int32(pPortToken->z, NULL, 10);
if (*pPort >= UINT16_MAX || *pPort <= 0) {
pCxt->errCode = generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_PORT);
}
@ -476,9 +476,9 @@ SNode* createJoinTableNode(SAstCreateContext* pCxt, EJoinType type, SNode* pLeft
SNode* createLimitNode(SAstCreateContext* pCxt, const SToken* pLimit, const SToken* pOffset) {
SLimitNode* limitNode = (SLimitNode*)nodesMakeNode(QUERY_NODE_LIMIT);
CHECK_OUT_OF_MEM(limitNode);
limitNode->limit = strtol(pLimit->z, NULL, 10);
limitNode->limit = taosStr2Int64(pLimit->z, NULL, 10);
if (NULL != pOffset) {
limitNode->offset = strtol(pOffset->z, NULL, 10);
limitNode->offset = taosStr2Int64(pOffset->z, NULL, 10);
}
return (SNode*)limitNode;
}
@ -694,59 +694,59 @@ SNode* createAlterDatabaseOptions(SAstCreateContext* pCxt) {
SNode* setDatabaseOption(SAstCreateContext* pCxt, SNode* pOptions, EDatabaseOptionType type, void* pVal) {
switch (type) {
case DB_OPTION_BUFFER:
((SDatabaseOptions*)pOptions)->buffer = strtol(((SToken*)pVal)->z, NULL, 10);
((SDatabaseOptions*)pOptions)->buffer = taosStr2Int32(((SToken*)pVal)->z, NULL, 10);
break;
case DB_OPTION_CACHELAST:
((SDatabaseOptions*)pOptions)->cachelast = strtol(((SToken*)pVal)->z, NULL, 10);
((SDatabaseOptions*)pOptions)->cachelast = taosStr2Int8(((SToken*)pVal)->z, NULL, 10);
break;
case DB_OPTION_COMP:
((SDatabaseOptions*)pOptions)->compressionLevel = strtol(((SToken*)pVal)->z, NULL, 10);
((SDatabaseOptions*)pOptions)->compressionLevel = taosStr2Int8(((SToken*)pVal)->z, NULL, 10);
break;
case DB_OPTION_DAYS: {
SToken* pToken = pVal;
if (TK_NK_INTEGER == pToken->type) {
((SDatabaseOptions*)pOptions)->daysPerFile = strtol(pToken->z, NULL, 10) * 1440;
((SDatabaseOptions*)pOptions)->daysPerFile = taosStr2Int32(pToken->z, NULL, 10) * 1440;
} else {
((SDatabaseOptions*)pOptions)->pDaysPerFile = (SValueNode*)createDurationValueNode(pCxt, pToken);
}
break;
}
case DB_OPTION_FSYNC:
((SDatabaseOptions*)pOptions)->fsyncPeriod = strtol(((SToken*)pVal)->z, NULL, 10);
((SDatabaseOptions*)pOptions)->fsyncPeriod = taosStr2Int32(((SToken*)pVal)->z, NULL, 10);
break;
case DB_OPTION_MAXROWS:
((SDatabaseOptions*)pOptions)->maxRowsPerBlock = strtol(((SToken*)pVal)->z, NULL, 10);
((SDatabaseOptions*)pOptions)->maxRowsPerBlock = taosStr2Int32(((SToken*)pVal)->z, NULL, 10);
break;
case DB_OPTION_MINROWS:
((SDatabaseOptions*)pOptions)->minRowsPerBlock = strtol(((SToken*)pVal)->z, NULL, 10);
((SDatabaseOptions*)pOptions)->minRowsPerBlock = taosStr2Int32(((SToken*)pVal)->z, NULL, 10);
break;
case DB_OPTION_KEEP:
((SDatabaseOptions*)pOptions)->pKeep = pVal;
break;
case DB_OPTION_PAGES:
((SDatabaseOptions*)pOptions)->pages = strtol(((SToken*)pVal)->z, NULL, 10);
((SDatabaseOptions*)pOptions)->pages = taosStr2Int32(((SToken*)pVal)->z, NULL, 10);
break;
case DB_OPTION_PAGESIZE:
((SDatabaseOptions*)pOptions)->pagesize = strtol(((SToken*)pVal)->z, NULL, 10);
((SDatabaseOptions*)pOptions)->pagesize = taosStr2Int32(((SToken*)pVal)->z, NULL, 10);
break;
case DB_OPTION_PRECISION:
copyStringFormStringToken((SToken*)pVal, ((SDatabaseOptions*)pOptions)->precisionStr,
sizeof(((SDatabaseOptions*)pOptions)->precisionStr));
break;
case DB_OPTION_REPLICA:
((SDatabaseOptions*)pOptions)->replica = strtol(((SToken*)pVal)->z, NULL, 10);
((SDatabaseOptions*)pOptions)->replica = taosStr2Int8(((SToken*)pVal)->z, NULL, 10);
break;
case DB_OPTION_STRICT:
((SDatabaseOptions*)pOptions)->strict = strtol(((SToken*)pVal)->z, NULL, 10);
((SDatabaseOptions*)pOptions)->strict = taosStr2Int8(((SToken*)pVal)->z, NULL, 10);
break;
case DB_OPTION_WAL:
((SDatabaseOptions*)pOptions)->walLevel = strtol(((SToken*)pVal)->z, NULL, 10);
((SDatabaseOptions*)pOptions)->walLevel = taosStr2Int8(((SToken*)pVal)->z, NULL, 10);
break;
case DB_OPTION_VGROUPS:
((SDatabaseOptions*)pOptions)->numOfVgroups = strtol(((SToken*)pVal)->z, NULL, 10);
((SDatabaseOptions*)pOptions)->numOfVgroups = taosStr2Int32(((SToken*)pVal)->z, NULL, 10);
break;
case DB_OPTION_SINGLE_STABLE:
((SDatabaseOptions*)pOptions)->singleStable = strtol(((SToken*)pVal)->z, NULL, 10);
((SDatabaseOptions*)pOptions)->singleStable = taosStr2Int8(((SToken*)pVal)->z, NULL, 10);
break;
case DB_OPTION_RETENTIONS:
((SDatabaseOptions*)pOptions)->pRetentions = pVal;
@ -827,16 +827,16 @@ SNode* setTableOption(SAstCreateContext* pCxt, SNode* pOptions, ETableOptionType
sizeof(((STableOptions*)pOptions)->comment));
break;
case TABLE_OPTION_DELAY:
((STableOptions*)pOptions)->delay = strtol(((SToken*)pVal)->z, NULL, 10);
((STableOptions*)pOptions)->delay = taosStr2Int32(((SToken*)pVal)->z, NULL, 10);
break;
case TABLE_OPTION_FILE_FACTOR:
((STableOptions*)pOptions)->filesFactor = strtod(((SToken*)pVal)->z, NULL);
((STableOptions*)pOptions)->filesFactor = taosStr2Float(((SToken*)pVal)->z, NULL);
break;
case TABLE_OPTION_ROLLUP:
((STableOptions*)pOptions)->pRollupFuncs = pVal;
break;
case TABLE_OPTION_TTL:
((STableOptions*)pOptions)->ttl = strtol(((SToken*)pVal)->z, NULL, 10);
((STableOptions*)pOptions)->ttl = taosStr2Int32(((SToken*)pVal)->z, NULL, 10);
break;
case TABLE_OPTION_SMA:
((STableOptions*)pOptions)->pSma = pVal;
@ -868,7 +868,7 @@ SDataType createDataType(uint8_t type) {
}
SDataType createVarLenDataType(uint8_t type, const SToken* pLen) {
SDataType dt = {.type = type, .precision = 0, .scale = 0, .bytes = strtol(pLen->z, NULL, 10)};
SDataType dt = {.type = type, .precision = 0, .scale = 0, .bytes = taosStr2Int16(pLen->z, NULL, 10)};
return dt;
}
@ -1119,7 +1119,7 @@ SNode* createDropDnodeStmt(SAstCreateContext* pCxt, const SToken* pDnode) {
SDropDnodeStmt* pStmt = (SDropDnodeStmt*)nodesMakeNode(QUERY_NODE_DROP_DNODE_STMT);
CHECK_OUT_OF_MEM(pStmt);
if (TK_NK_INTEGER == pDnode->type) {
pStmt->dnodeId = strtol(pDnode->z, NULL, 10);
pStmt->dnodeId = taosStr2Int32(pDnode->z, NULL, 10);
} else {
if (!checkAndSplitEndpoint(pCxt, pDnode, pStmt->fqdn, &pStmt->port)) {
nodesDestroyNode(pStmt);
@ -1133,7 +1133,7 @@ SNode* createAlterDnodeStmt(SAstCreateContext* pCxt, const SToken* pDnode, const
const SToken* pValue) {
SAlterDnodeStmt* pStmt = nodesMakeNode(QUERY_NODE_ALTER_DNODE_STMT);
CHECK_OUT_OF_MEM(pStmt);
pStmt->dnodeId = strtol(pDnode->z, NULL, 10);
pStmt->dnodeId = taosStr2Int32(pDnode->z, NULL, 10);
trimString(pConfig->z, pConfig->n, pStmt->config, sizeof(pStmt->config));
if (NULL != pValue) {
trimString(pValue->z, pValue->n, pStmt->value, sizeof(pStmt->value));
@ -1183,7 +1183,7 @@ SNode* createDropIndexStmt(SAstCreateContext* pCxt, bool ignoreNotExists, SToken
SNode* createCreateComponentNodeStmt(SAstCreateContext* pCxt, ENodeType type, const SToken* pDnodeId) {
SCreateComponentNodeStmt* pStmt = nodesMakeNode(type);
CHECK_OUT_OF_MEM(pStmt);
pStmt->dnodeId = strtol(pDnodeId->z, NULL, 10);
pStmt->dnodeId = taosStr2Int32(pDnodeId->z, NULL, 10);
;
return (SNode*)pStmt;
}
@ -1191,7 +1191,7 @@ SNode* createCreateComponentNodeStmt(SAstCreateContext* pCxt, ENodeType type, co
SNode* createDropComponentNodeStmt(SAstCreateContext* pCxt, ENodeType type, const SToken* pDnodeId) {
SDropComponentNodeStmt* pStmt = nodesMakeNode(type);
CHECK_OUT_OF_MEM(pStmt);
pStmt->dnodeId = strtol(pDnodeId->z, NULL, 10);
pStmt->dnodeId = taosStr2Int32(pDnodeId->z, NULL, 10);
;
return (SNode*)pStmt;
}
@ -1251,7 +1251,7 @@ SNode* setExplainVerbose(SAstCreateContext* pCxt, SNode* pOptions, const SToken*
}
SNode* setExplainRatio(SAstCreateContext* pCxt, SNode* pOptions, const SToken* pVal) {
((SExplainOptions*)pOptions)->ratio = strtod(pVal->z, NULL);
((SExplainOptions*)pOptions)->ratio = taosStr2Double(pVal->z, NULL);
return pOptions;
}
@ -1347,7 +1347,7 @@ SNode* createDropStreamStmt(SAstCreateContext* pCxt, bool ignoreNotExists, const
SNode* createKillStmt(SAstCreateContext* pCxt, ENodeType type, const SToken* pId) {
SKillStmt* pStmt = nodesMakeNode(type);
CHECK_OUT_OF_MEM(pStmt);
pStmt->targetId = strtol(pId->z, NULL, 10);
pStmt->targetId = taosStr2Int32(pId->z, NULL, 10);
return (SNode*)pStmt;
}

View File

@ -442,7 +442,7 @@ static bool isNullStr(SToken* pToken) {
static FORCE_INLINE int32_t toDouble(SToken* pToken, double* value, char** endPtr) {
errno = 0;
*value = strtold(pToken->z, endPtr);
*value = taosStr2Double(pToken->z, endPtr);
// not a valid integer number, return error
if ((*endPtr - pToken->z) != pToken->n) {
@ -482,9 +482,9 @@ static int32_t parseValueToken(char** end, SToken* pToken, SSchema* pSchema, int
return buildSyntaxErrMsg(pMsgBuf, "invalid bool data", pToken->z);
}
} else if (pToken->type == TK_NK_INTEGER) {
return func(pMsgBuf, ((strtoll(pToken->z, NULL, 10) == 0) ? &FALSE_VALUE : &TRUE_VALUE), pSchema->bytes, param);
return func(pMsgBuf, ((taosStr2Int64(pToken->z, NULL, 10) == 0) ? &FALSE_VALUE : &TRUE_VALUE), pSchema->bytes, param);
} else if (pToken->type == TK_NK_FLOAT) {
return func(pMsgBuf, ((strtod(pToken->z, NULL) == 0) ? &FALSE_VALUE : &TRUE_VALUE), pSchema->bytes, param);
return func(pMsgBuf, ((taosStr2Double(pToken->z, NULL) == 0) ? &FALSE_VALUE : &TRUE_VALUE), pSchema->bytes, param);
} else {
return buildSyntaxErrMsg(pMsgBuf, "invalid bool data", pToken->z);
}

View File

@ -498,7 +498,7 @@ static int32_t parseTimeFromValueNode(SValueNode* pVal) {
return TSDB_CODE_SUCCESS;
}
char* pEnd = NULL;
pVal->datum.i = strtoll(pVal->literal, &pEnd, 10);
pVal->datum.i = taosStr2Int64(pVal->literal, &pEnd, 10);
return (NULL != pEnd && '\0' == *pEnd) ? TSDB_CODE_SUCCESS : TSDB_CODE_FAILED;
} else {
return TSDB_CODE_FAILED;
@ -527,61 +527,61 @@ static EDealRes translateValueImpl(STranslateContext* pCxt, SValueNode* pVal, SD
break;
case TSDB_DATA_TYPE_TINYINT: {
char* endPtr = NULL;
pVal->datum.i = strtoll(pVal->literal, &endPtr, 10);
pVal->datum.i = taosStr2Int64(pVal->literal, &endPtr, 10);
*(int8_t*)&pVal->typeData = pVal->datum.i;
break;
}
case TSDB_DATA_TYPE_SMALLINT: {
char* endPtr = NULL;
pVal->datum.i = strtoll(pVal->literal, &endPtr, 10);
pVal->datum.i = taosStr2Int64(pVal->literal, &endPtr, 10);
*(int16_t*)&pVal->typeData = pVal->datum.i;
break;
}
case TSDB_DATA_TYPE_INT: {
char* endPtr = NULL;
pVal->datum.i = strtoll(pVal->literal, &endPtr, 10);
pVal->datum.i = taosStr2Int64(pVal->literal, &endPtr, 10);
*(int32_t*)&pVal->typeData = pVal->datum.i;
break;
}
case TSDB_DATA_TYPE_BIGINT: {
char* endPtr = NULL;
pVal->datum.i = strtoll(pVal->literal, &endPtr, 10);
pVal->datum.i = taosStr2Int64(pVal->literal, &endPtr, 10);
*(int64_t*)&pVal->typeData = pVal->datum.i;
break;
}
case TSDB_DATA_TYPE_UTINYINT: {
char* endPtr = NULL;
pVal->datum.u = strtoull(pVal->literal, &endPtr, 10);
pVal->datum.u = taosStr2UInt64(pVal->literal, &endPtr, 10);
*(uint8_t*)&pVal->typeData = pVal->datum.u;
break;
}
case TSDB_DATA_TYPE_USMALLINT: {
char* endPtr = NULL;
pVal->datum.u = strtoull(pVal->literal, &endPtr, 10);
pVal->datum.u = taosStr2UInt64(pVal->literal, &endPtr, 10);
*(uint16_t*)&pVal->typeData = pVal->datum.u;
break;
}
case TSDB_DATA_TYPE_UINT: {
char* endPtr = NULL;
pVal->datum.u = strtoull(pVal->literal, &endPtr, 10);
pVal->datum.u = taosStr2UInt64(pVal->literal, &endPtr, 10);
*(uint32_t*)&pVal->typeData = pVal->datum.u;
break;
}
case TSDB_DATA_TYPE_UBIGINT: {
char* endPtr = NULL;
pVal->datum.u = strtoull(pVal->literal, &endPtr, 10);
pVal->datum.u = taosStr2UInt64(pVal->literal, &endPtr, 10);
*(uint64_t*)&pVal->typeData = pVal->datum.u;
break;
}
case TSDB_DATA_TYPE_FLOAT: {
char* endPtr = NULL;
pVal->datum.d = strtold(pVal->literal, &endPtr);
pVal->datum.d = taosStr2Double(pVal->literal, &endPtr);
*(float*)&pVal->typeData = pVal->datum.d;
break;
}
case TSDB_DATA_TYPE_DOUBLE: {
char* endPtr = NULL;
pVal->datum.d = strtold(pVal->literal, &endPtr);
pVal->datum.d = taosStr2Double(pVal->literal, &endPtr);
*(double*)&pVal->typeData = pVal->datum.d;
break;
}

View File

@ -3863,7 +3863,7 @@ static YYACTIONTYPE yy_reduce(
{ yymsp[1].minor.yy140 = 0; }
break;
case 245: /* bufsize_opt ::= BUFSIZE NK_INTEGER */
{ yymsp[-1].minor.yy140 = strtol(yymsp[0].minor.yy0.z, NULL, 10); }
{ yymsp[-1].minor.yy140 = taosStr2Int32(yymsp[0].minor.yy0.z, NULL, 10); }
break;
case 246: /* cmd ::= CREATE STREAM not_exists_opt stream_name stream_options into_opt AS query_expression */
{ pCxt->pRootNode = createCreateStreamStmt(pCxt, yymsp[-5].minor.yy617, &yymsp[-4].minor.yy105, yymsp[-2].minor.yy172, yymsp[-3].minor.yy172, yymsp[0].minor.yy172); }

View File

@ -724,7 +724,7 @@ int32_t castFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutp
case TSDB_DATA_TYPE_BIGINT: {
if (inputType == TSDB_DATA_TYPE_BINARY) {
memcpy(output, varDataVal(input), varDataLen(input));
*(int64_t *)output = strtoll(output, NULL, 10);
*(int64_t *)output = taosStr2Int64(output, NULL, 10);
} else if (inputType == TSDB_DATA_TYPE_NCHAR) {
char *newBuf = taosMemoryCalloc(1, outputLen * TSDB_NCHAR_SIZE + 1);
int32_t len = taosUcs4ToMbs((TdUcs4 *)varDataVal(input), varDataLen(input), newBuf);
@ -733,7 +733,7 @@ int32_t castFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutp
return TSDB_CODE_FAILED;
}
newBuf[len] = 0;
*(int64_t *)output = strtoll(newBuf, NULL, 10);
*(int64_t *)output = taosStr2Int64(newBuf, NULL, 10);
taosMemoryFree(newBuf);
} else {
GET_TYPED_DATA(*(int64_t *)output, int64_t, inputType, input);
@ -743,7 +743,7 @@ int32_t castFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutp
case TSDB_DATA_TYPE_UBIGINT: {
if (inputType == TSDB_DATA_TYPE_BINARY) {
memcpy(output, varDataVal(input), varDataLen(input));
*(uint64_t *)output = strtoull(output, NULL, 10);
*(uint64_t *)output = taosStr2UInt64(output, NULL, 10);
} else if (inputType == TSDB_DATA_TYPE_NCHAR) {
char *newBuf = taosMemoryCalloc(1, outputLen * TSDB_NCHAR_SIZE + 1);
int32_t len = taosUcs4ToMbs((TdUcs4 *)varDataVal(input), varDataLen(input), newBuf);
@ -752,7 +752,7 @@ int32_t castFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutp
return TSDB_CODE_FAILED;
}
newBuf[len] = 0;
*(uint64_t *)output = strtoull(newBuf, NULL, 10);
*(uint64_t *)output = taosStr2UInt64(newBuf, NULL, 10);
taosMemoryFree(newBuf);
} else {
GET_TYPED_DATA(*(uint64_t *)output, uint64_t, inputType, input);

View File

@ -92,7 +92,7 @@ void convertStringToDouble(const void *inData, void *outData, int8_t inType, int
tmp[len] = 0;
ASSERT(outType == TSDB_DATA_TYPE_DOUBLE);
double value = strtod(tmp, NULL);
double value = taosStr2Double(tmp, NULL);
*((double *)outData) = value;
taosMemoryFreeClear(tmp);
@ -267,22 +267,22 @@ static FORCE_INLINE void varToTimestamp(char *buf, SScalarParam* pOut, int32_t r
static FORCE_INLINE void varToSigned(char *buf, SScalarParam* pOut, int32_t rowIndex) {
switch (pOut->columnData->info.type) {
case TSDB_DATA_TYPE_TINYINT: {
int8_t value = (int8_t)strtoll(buf, NULL, 10);
int8_t value = (int8_t)taosStr2Int8(buf, NULL, 10);
colDataAppendInt8(pOut->columnData, rowIndex, (int8_t*)&value);
break;
}
case TSDB_DATA_TYPE_SMALLINT: {
int16_t value = (int16_t)strtoll(buf, NULL, 10);
int16_t value = (int16_t)taosStr2Int16(buf, NULL, 10);
colDataAppendInt16(pOut->columnData, rowIndex, (int16_t*)&value);
break;
}
case TSDB_DATA_TYPE_INT: {
int32_t value = (int32_t)strtoll(buf, NULL, 10);
int32_t value = (int32_t)taosStr2Int32(buf, NULL, 10);
colDataAppendInt32(pOut->columnData, rowIndex, (int32_t*)&value);
break;
}
case TSDB_DATA_TYPE_BIGINT: {
int64_t value = (int64_t)strtoll(buf, NULL, 10);
int64_t value = (int64_t)taosStr2Int64(buf, NULL, 10);
colDataAppendInt64(pOut->columnData, rowIndex, (int64_t*)&value);
break;
}
@ -292,22 +292,22 @@ static FORCE_INLINE void varToSigned(char *buf, SScalarParam* pOut, int32_t rowI
static FORCE_INLINE void varToUnsigned(char *buf, SScalarParam* pOut, int32_t rowIndex) {
switch (pOut->columnData->info.type) {
case TSDB_DATA_TYPE_UTINYINT: {
uint8_t value = (uint8_t)strtoull(buf, NULL, 10);
uint8_t value = (uint8_t)taosStr2UInt8(buf, NULL, 10);
colDataAppendInt8(pOut->columnData, rowIndex, (int8_t*)&value);
break;
}
case TSDB_DATA_TYPE_USMALLINT: {
uint16_t value = (uint16_t)strtoull(buf, NULL, 10);
uint16_t value = (uint16_t)taosStr2UInt16(buf, NULL, 10);
colDataAppendInt16(pOut->columnData, rowIndex, (int16_t*)&value);
break;
}
case TSDB_DATA_TYPE_UINT: {
uint32_t value = (uint32_t)strtoull(buf, NULL, 10);
uint32_t value = (uint32_t)taosStr2UInt32(buf, NULL, 10);
colDataAppendInt32(pOut->columnData, rowIndex, (int32_t*)&value);
break;
}
case TSDB_DATA_TYPE_UBIGINT: {
uint64_t value = (uint64_t)strtoull(buf, NULL, 10);
uint64_t value = (uint64_t)taosStr2UInt64(buf, NULL, 10);
colDataAppendInt64(pOut->columnData, rowIndex, (int64_t*)&value);
break;
}
@ -315,12 +315,12 @@ static FORCE_INLINE void varToUnsigned(char *buf, SScalarParam* pOut, int32_t ro
}
static FORCE_INLINE void varToFloat(char *buf, SScalarParam* pOut, int32_t rowIndex) {
double value = strtod(buf, NULL);
double value = taosStr2Double(buf, NULL);
colDataAppendDouble(pOut->columnData, rowIndex, &value);
}
static FORCE_INLINE void varToBool(char *buf, SScalarParam* pOut, int32_t rowIndex) {
int64_t value = strtoll(buf, NULL, 10);
int64_t value = taosStr2Int64(buf, NULL, 10);
bool v = (value != 0)? true:false;
colDataAppendInt8(pOut->columnData, rowIndex, (int8_t*) &v);
}

View File

@ -39,6 +39,12 @@ enum {
SCH_WRITE,
};
typedef enum {
SCH_RES_TYPE_QUERY,
SCH_RES_TYPE_FETCH,
} SCH_RES_TYPE;
typedef struct SSchTrans {
void *transInst;
void *transHandle;
@ -159,7 +165,6 @@ typedef struct SSchTask {
typedef struct SSchJobAttr {
EExplainMode explainMode;
bool needRes;
bool syncSchedule;
bool queryJob;
bool needFlowCtrl;
@ -192,6 +197,7 @@ typedef struct SSchJob {
int32_t errCode;
SArray *errList; // SArray<SQueryErrorInfo>
SRWLatch resLock;
SCH_RES_TYPE resType;
void *resData; //TODO free it or not
int32_t resNumOfRows;
const char *sql;

View File

@ -70,7 +70,7 @@ int32_t schInitTask(SSchJob *pJob, SSchTask *pTask, SSubplan *pPlan, SSchLevel *
}
int32_t schInitJob(SSchJob **pSchJob, SQueryPlan *pDag, void *transport, SArray *pNodeList, const char *sql,
int64_t startTs, bool needRes, bool syncSchedule) {
int64_t startTs, bool syncSchedule) {
int32_t code = 0;
int64_t refId = -1;
SSchJob *pJob = taosMemoryCalloc(1, sizeof(SSchJob));
@ -81,7 +81,6 @@ int32_t schInitJob(SSchJob **pSchJob, SQueryPlan *pDag, void *transport, SArray
pJob->attr.explainMode = pDag->explainInfo.mode;
pJob->attr.syncSchedule = syncSchedule;
pJob->attr.needRes = needRes;
pJob->transport = transport;
pJob->sql = sql;
@ -1059,6 +1058,8 @@ _return:
int32_t schProcessOnExplainDone(SSchJob *pJob, SSchTask *pTask, SRetrieveTableRsp *pRsp) {
SCH_TASK_DLOG("got explain rsp, rows:%d, complete:%d", htonl(pRsp->numOfRows), pRsp->completed);
pJob->resType = SCH_RES_TYPE_FETCH;
atomic_store_32(&pJob->resNumOfRows, htonl(pRsp->numOfRows));
atomic_store_ptr(&pJob->resData, pRsp);
@ -1179,23 +1180,20 @@ int32_t schHandleResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t msgType, ch
atomic_add_fetch_32(&pJob->resNumOfRows, rsp->affectedRows);
SCH_TASK_DLOG("submit succeed, affectedRows:%d", rsp->affectedRows);
if (pJob->attr.needRes) {
SCH_LOCK(SCH_WRITE, &pJob->resLock);
if (pJob->resData) {
SSubmitRsp *sum = pJob->resData;
sum->affectedRows += rsp->affectedRows;
sum->nBlocks += rsp->nBlocks;
sum->pBlocks = taosMemoryRealloc(sum->pBlocks, sum->nBlocks * sizeof(*sum->pBlocks));
memcpy(sum->pBlocks + sum->nBlocks - rsp->nBlocks, rsp->pBlocks, rsp->nBlocks * sizeof(*sum->pBlocks));
taosMemoryFree(rsp->pBlocks);
taosMemoryFree(rsp);
} else {
pJob->resData = rsp;
}
SCH_UNLOCK(SCH_WRITE, &pJob->resLock);
pJob->resType = SCH_RES_TYPE_QUERY;
SCH_LOCK(SCH_WRITE, &pJob->resLock);
if (pJob->resData) {
SSubmitRsp *sum = pJob->resData;
sum->affectedRows += rsp->affectedRows;
sum->nBlocks += rsp->nBlocks;
sum->pBlocks = taosMemoryRealloc(sum->pBlocks, sum->nBlocks * sizeof(*sum->pBlocks));
memcpy(sum->pBlocks + sum->nBlocks - rsp->nBlocks, rsp->pBlocks, rsp->nBlocks * sizeof(*sum->pBlocks));
taosMemoryFree(rsp->pBlocks);
taosMemoryFree(rsp);
} else {
tFreeSSubmitRsp(rsp);
pJob->resData = rsp;
}
SCH_UNLOCK(SCH_WRITE, &pJob->resLock);
}
SCH_ERR_RET(schProcessOnTaskSuccess(pJob, pTask));
@ -2412,7 +2410,7 @@ void schFreeJobImpl(void *job) {
}
static int32_t schExecJobImpl(void *transport, SArray *pNodeList, SQueryPlan *pDag, int64_t *job, const char *sql,
int64_t startTs, bool needRes, bool syncSchedule) {
int64_t startTs, bool syncSchedule) {
qDebug("QID:0x%" PRIx64 " job started", pDag->queryId);
if (pNodeList == NULL || taosArrayGetSize(pNodeList) <= 0) {
@ -2421,7 +2419,7 @@ static int32_t schExecJobImpl(void *transport, SArray *pNodeList, SQueryPlan *pD
int32_t code = 0;
SSchJob *pJob = NULL;
SCH_ERR_JRET(schInitJob(&pJob, pDag, transport, pNodeList, sql, startTs, needRes, syncSchedule));
SCH_ERR_JRET(schInitJob(&pJob, pDag, transport, pNodeList, sql, startTs, syncSchedule));
SCH_ERR_JRET(schLaunchJob(pJob));
@ -2463,6 +2461,8 @@ int32_t schExecStaticExplain(void *transport, SArray *pNodeList, SQueryPlan *pDa
SCH_ERR_JRET(qExecStaticExplain(pDag, (SRetrieveTableRsp **)&pJob->resData));
pJob->resType = SCH_RES_TYPE_FETCH;
int64_t refId = taosAddRef(schMgmt.jobRef, pJob);
if (refId < 0) {
SCH_JOB_ELOG("taosAddRef job failed, error:%s", tstrerror(terrno));
@ -2535,7 +2535,7 @@ int32_t schedulerInit(SSchedulerCfg *cfg) {
}
int32_t schedulerExecJob(void *transport, SArray *nodeList, SQueryPlan *pDag, int64_t *pJob, const char *sql,
int64_t startTs, bool needRes, SQueryResult *pRes) {
int64_t startTs, SQueryResult *pRes) {
if (NULL == transport || NULL == pDag || NULL == pDag->pSubplans || NULL == pJob || NULL == pRes) {
SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
}
@ -2543,14 +2543,14 @@ int32_t schedulerExecJob(void *transport, SArray *nodeList, SQueryPlan *pDag, in
if (EXPLAIN_MODE_STATIC == pDag->explainInfo.mode) {
SCH_ERR_RET(schExecStaticExplain(transport, nodeList, pDag, pJob, sql, true));
} else {
SCH_ERR_RET(schExecJobImpl(transport, nodeList, pDag, pJob, sql, startTs, needRes, true));
SCH_ERR_RET(schExecJobImpl(transport, nodeList, pDag, pJob, sql, startTs, true));
}
SSchJob *job = schAcquireJob(*pJob);
pRes->code = atomic_load_32(&job->errCode);
pRes->numOfRows = job->resNumOfRows;
if (needRes) {
if (SCH_RES_TYPE_QUERY == job->resType) {
pRes->res = job->resData;
job->resData = NULL;
}
@ -2568,7 +2568,7 @@ int32_t schedulerAsyncExecJob(void *transport, SArray *pNodeList, SQueryPlan *pD
if (EXPLAIN_MODE_STATIC == pDag->explainInfo.mode) {
SCH_ERR_RET(schExecStaticExplain(transport, pNodeList, pDag, pJob, sql, false));
} else {
SCH_ERR_RET(schExecJobImpl(transport, pNodeList, pDag, pJob, sql, 0, false, false));
SCH_ERR_RET(schExecJobImpl(transport, pNodeList, pDag, pJob, sql, 0, false));
}
return TSDB_CODE_SUCCESS;

View File

@ -985,7 +985,7 @@ TEST(insertTest, normalCase) {
taosThreadCreate(&(thread1), &thattr, schtSendRsp, &insertJobRefId);
SQueryResult res = {0};
code = schedulerExecJob(mockPointer, qnodeList, &dag, &insertJobRefId, "insert into tb values(now,1)", 0, false, &res);
code = schedulerExecJob(mockPointer, qnodeList, &dag, &insertJobRefId, "insert into tb values(now,1)", 0, &res);
ASSERT_EQ(code, 0);
ASSERT_EQ(res.numOfRows, 20);

View File

@ -27,6 +27,9 @@ if(BUILD_ADDR2LINE)
os PUBLIC addr2line dl z
)
endif ()
if(CHECK_STR2INT_ERROR)
add_definitions(-DTD_CHECK_STR_TO_INT_ERROR)
endif()
target_link_libraries(
os PUBLIC pthread
)

View File

@ -254,4 +254,103 @@ char *taosStrCaseStr(const char *str, const char *pattern) {
}
}
return NULL;
}
int64_t taosStr2Int64(const char *str, char** pEnd, int32_t radix) {
int64_t tmp = strtoll(str, pEnd, radix);
#ifdef TD_CHECK_STR_TO_INT_ERROR
assert(errno != ERANGE);
assert(errno != EINVAL);
#endif
return tmp;
}
uint64_t taosStr2UInt64(const char *str, char** pEnd, int32_t radix) {
uint64_t tmp = strtoull(str, pEnd, radix);
#ifdef TD_CHECK_STR_TO_INT_ERROR
assert(errno != ERANGE);
assert(errno != EINVAL);
#endif
return tmp;
}
int32_t taosStr2Int32(const char *str, char** pEnd, int32_t radix) {
int32_t tmp = strtol(str, pEnd, radix);
#ifdef TD_CHECK_STR_TO_INT_ERROR
assert(errno != ERANGE);
assert(errno != EINVAL);
#endif
return tmp;
}
uint32_t taosStr2UInt32(const char *str, char** pEnd, int32_t radix) {
uint32_t tmp = strtol(str, pEnd, radix);
#ifdef TD_CHECK_STR_TO_INT_ERROR
assert(errno != ERANGE);
assert(errno != EINVAL);
#endif
return tmp;
}
int16_t taosStr2Int16(const char *str, char** pEnd, int32_t radix) {
int32_t tmp = strtol(str, pEnd, radix);
#ifdef TD_CHECK_STR_TO_INT_ERROR
assert(errno != ERANGE);
assert(errno != EINVAL);
assert(tmp >= SHRT_MIN);
assert(tmp <= SHRT_MAX);
#endif
return (int16_t)tmp;
}
uint16_t taosStr2UInt16(const char *str, char** pEnd, int32_t radix) {
uint32_t tmp = strtoul(str, pEnd, radix);
#ifdef TD_CHECK_STR_TO_INT_ERROR
assert(errno != ERANGE);
assert(errno != EINVAL);
assert(tmp <= USHRT_MAX);
#endif
return (uint16_t)tmp;
}
int8_t taosStr2Int8(const char *str, char** pEnd, int32_t radix) {
int32_t tmp = strtol(str, pEnd, radix);
#ifdef TD_CHECK_STR_TO_INT_ERROR
assert(errno != ERANGE);
assert(errno != EINVAL);
assert(tmp >= SCHAR_MIN);
assert(tmp <= SCHAR_MAX);
#endif
return tmp;
}
uint8_t taosStr2UInt8(const char *str, char** pEnd, int32_t radix) {
uint32_t tmp = strtoul(str, pEnd, radix);
#ifdef TD_CHECK_STR_TO_INT_ERROR
assert(errno != ERANGE);
assert(errno != EINVAL);
assert(tmp <= UCHAR_MAX);
#endif
return tmp;
}
double taosStr2Double(const char *str, char** pEnd) {
double tmp = strtod(str, pEnd);
#ifdef TD_CHECK_STR_TO_INT_ERROR
assert(errno != ERANGE);
assert(errno != EINVAL);
assert(tmp != HUGE_VAL);
#endif
return tmp;
}
float taosStr2Float(const char *str, char** pEnd) {
float tmp = strtof(str, pEnd);
#ifdef TD_CHECK_STR_TO_INT_ERROR
assert(errno != ERANGE);
assert(errno != EINVAL);
assert(tmp != HUGE_VALF);
assert(tmp != NAN);
#endif
return tmp;
}

View File

@ -187,7 +187,7 @@ int32_t tjsonGetBigIntValue(const SJson* pJson, const char* pName, int64_t* pVal
sscanf(p,"%lld",pVal);
#else
// sscanf(p,"%ld",pVal);
*pVal = strtol(p, NULL, 10);
*pVal = taosStr2Int64(p, NULL, 10);
#endif
return TSDB_CODE_SUCCESS;
}
@ -222,7 +222,7 @@ int32_t tjsonGetUBigIntValue(const SJson* pJson, const char* pName, uint64_t* pV
sscanf(p,"%llu",pVal);
#else
// sscanf(p,"%ld",pVal);
*pVal = strtoul(p, NULL, 10);
*pVal = taosStr2UInt64(p, NULL, 10);
#endif
return TSDB_CODE_SUCCESS;
}

View File

@ -185,10 +185,10 @@ void tSkipListPutBatchByIter(SSkipList *pSkipList, void *iter, iter_next_fn_t it
pKey = SL_GET_NODE_KEY(pSkipList, p);
compare = pSkipList->comparFn(pKey, pDataKey);
if (compare >= 0) {
if (compare == 0 && !hasDup) hasDup = true;
if (compare > 0) {
break;
} else {
if (compare == 0 && !hasDup) hasDup = true;
px = p;
p = SL_NODE_GET_FORWARD_POINTER(px, i);
}

View File

@ -57,7 +57,7 @@ class TDSql:
tdLog.notice("'reset query cache' is not supported")
s = 'drop database if exists db'
self.cursor.execute(s)
s = 'create database db'
s = 'create database db days 300'
self.cursor.execute(s)
s = 'use db'
self.cursor.execute(s)

View File

@ -5,7 +5,7 @@ sleep 500
sql connect
print =============== create database
sql create database d0
sql create database d0 days 300
sql use d0
print =============== create super table and child table
@ -254,4 +254,4 @@ endi
#sql select count(*) from car where ts > '2019-05-14 00:00:00' interval(1y, 5d)
system sh/exec.sh -n dnode1 -s stop -x SIGINT
system sh/exec.sh -n dnode1 -s stop -x SIGINT

View File

@ -409,53 +409,53 @@ sql select `_wstartts`, c1, c2 ,c3 ,c4, c5 from streamt;
# row 1
if $data11 != 4 then
print ======$data11
# return -1
return -1
endi
if $data12 != 4 then
print ======$data12
# return -1
return -1
endi
if $data13 != 10 then
print ======$data13
# return -1
return -1
endi
if $data14 != 3 then
print ======$data14
# return -1
return -1
endi
if $data15 != 1 then
print ======$data15
# return -1
return -1
endi
# row 2
if $data21 != 4 then
print ======$data21
# return -1
return -1
endi
if $data22 != 4 then
print ======$data22
# return -1
return -1
endi
if $data23 != 15 then
print ======$data23
# return -1
return -1
endi
if $data24 != 4 then
print ======$data24
# return -1
return -1
endi
if $data25 != 3 then
print ======$data25
# return -1
return -1
endi

View File

@ -368,7 +368,7 @@ void sync_consume_loop(tmq_t* tmq, tmq_list_t* topics) {
/*msg_process(tmqmessage);*/
taos_free_result(tmqmessage);
if ((++msg_count % MIN_COMMIT_COUNT) == 0) tmq_commit(tmq, NULL, 0);
if ((++msg_count % MIN_COMMIT_COUNT) == 0) tmq_commit_sync(tmq, NULL);
}
}

View File

@ -37,10 +37,10 @@ typedef struct {
TdThread thread;
int32_t consumerId;
int32_t ifManualCommit;
//int32_t autoCommitIntervalMs; // 1000 ms
//char autoCommit[8]; // true, false
//char autoOffsetRest[16]; // none, earliest, latest
int32_t ifManualCommit;
// int32_t autoCommitIntervalMs; // 1000 ms
// char autoCommit[8]; // true, false
// char autoOffsetRest[16]; // none, earliest, latest
int32_t ifCheckData;
int64_t expectMsgCnt;
@ -99,21 +99,15 @@ static void printHelp() {
}
void initLogFile() {
time_t now;
struct tm curTime;
char filename[256];
time_t now;
struct tm curTime;
char filename[256];
now = taosTime(NULL);
now = taosTime(NULL);
taosLocalTime(&now, &curTime);
sprintf(filename,"%s/../log/tmqlog_%04d-%02d-%02d %02d-%02d-%02d.txt",
configDir,
curTime.tm_year+1900,
curTime.tm_mon+1,
curTime.tm_mday,
curTime.tm_hour,
curTime.tm_min,
curTime.tm_sec);
//sprintf(filename, "%s/../log/tmqlog.txt", configDir);
sprintf(filename, "%s/../log/tmqlog_%04d-%02d-%02d %02d-%02d-%02d.txt", configDir, curTime.tm_year + 1900,
curTime.tm_mon + 1, curTime.tm_mday, curTime.tm_hour, curTime.tm_min, curTime.tm_sec);
// sprintf(filename, "%s/../log/tmqlog.txt", configDir);
TdFilePtr pFile = taosOpenFile(filename, TD_FILE_TEXT | TD_FILE_WRITE | TD_FILE_TRUNC | TD_FILE_STREAM);
if (NULL == pFile) {
fprintf(stderr, "Failed to open %s for save result\n", filename);
@ -137,9 +131,9 @@ void saveConfigToLogFile() {
for (int32_t i = 0; i < g_stConfInfo.numOfThread; i++) {
taosFprintfFile(g_fp, "# consumer %d info:\n", g_stConfInfo.stThreads[i].consumerId);
//taosFprintfFile(g_fp, " auto commit: %s\n", g_stConfInfo.stThreads[i].autoCommit);
//taosFprintfFile(g_fp, " auto commit interval ms: %d\n", g_stConfInfo.stThreads[i].autoCommitIntervalMs);
//taosFprintfFile(g_fp, " auto offset rest: %s\n", g_stConfInfo.stThreads[i].autoOffsetRest);
// taosFprintfFile(g_fp, " auto commit: %s\n", g_stConfInfo.stThreads[i].autoCommit);
// taosFprintfFile(g_fp, " auto commit interval ms: %d\n", g_stConfInfo.stThreads[i].autoCommitIntervalMs);
// taosFprintfFile(g_fp, " auto offset rest: %s\n", g_stConfInfo.stThreads[i].autoOffsetRest);
taosFprintfFile(g_fp, " Topics: ");
for (int j = 0; j < g_stConfInfo.stThreads[i].numOfTopic; j++) {
taosFprintfFile(g_fp, "%s, ", g_stConfInfo.stThreads[i].topics[j]);
@ -234,17 +228,17 @@ static int32_t msg_process(TAOS_RES* msg, int64_t msgIndex, int32_t threadLable)
while (1) {
TAOS_ROW row = taos_fetch_row(msg);
if (row == NULL) break;
if (row == NULL) break;
TAOS_FIELD* fields = taos_fetch_fields(msg);
TAOS_FIELD* fields = taos_fetch_fields(msg);
int32_t numOfFields = taos_field_count(msg);
taos_print_row(buf, row, fields, numOfFields);
if (0 != g_stConfInfo.showRowFlag) {
if (0 != g_stConfInfo.showRowFlag) {
taosFprintfFile(g_fp, "rows[%d]: %s\n", totalRows, buf);
}
totalRows++;
}
@ -276,7 +270,7 @@ void build_consumer(SThreadInfo* pInfo) {
tmq_conf_set(conf, "td.connect.user", "root");
tmq_conf_set(conf, "td.connect.pass", "taosdata");
//tmq_conf_set(conf, "td.connect.db", g_stConfInfo.dbName);
// tmq_conf_set(conf, "td.connect.db", g_stConfInfo.dbName);
tmq_conf_set_auto_commit_cb(conf, tmq_commit_cb_print, NULL);
@ -299,7 +293,7 @@ void build_consumer(SThreadInfo* pInfo) {
pInfo->tmq = tmq_consumer_new(conf, NULL, 0);
tmq_conf_destroy(conf);
return;
}
@ -322,10 +316,10 @@ int32_t saveConsumeResult(SThreadInfo* pInfo) {
sprintf(sqlStr, "insert into %s.consumeresult values (now, %d, %" PRId64 ", %" PRId64 ", %d)", g_stConfInfo.cdbName,
pInfo->consumerId, pInfo->consumeMsgCnt, pInfo->consumeRowCnt, pInfo->checkresult);
time_t tTime = taosGetTimestampSec();
time_t tTime = taosGetTimestampSec();
struct tm tm = *taosLocalTime(&tTime, NULL);
taosFprintfFile(g_fp, "# save result: %d-%02d-%02d %02d:%02d:%02d, sql: %s\n", tm.tm_year + 1900, tm.tm_mon + 1,
tm.tm_mday, tm.tm_hour, tm.tm_min, tm.tm_sec, sqlStr);
tm.tm_mday, tm.tm_hour, tm.tm_min, tm.tm_sec, sqlStr);
TAOS_RES* pRes = taos_query(pConn, sqlStr);
if (taos_errno(pRes) != 0) {
@ -357,11 +351,11 @@ void loop_consume(SThreadInfo* pInfo) {
totalMsgs++;
if (totalRows >= pInfo->expectMsgCnt) {
taosFprintfFile(g_fp, "==== totalRows >= pInfo->expectMsgCnt, so break\n");
taosFprintfFile(g_fp, "==== totalRows >= pInfo->expectMsgCnt, so break\n");
break;
}
} else {
taosFprintfFile(g_fp, "==== delay over time, so break\n");
} else {
taosFprintfFile(g_fp, "==== delay over time, so break\n");
break;
}
}
@ -389,7 +383,7 @@ void* consumeThreadFunc(void* param) {
pError("tmq_subscribe() fail, reason: %s\n", tmq_err2str(err));
exit(-1);
}
tmq_list_destroy(pInfo->topicList);
pInfo->topicList = NULL;
@ -397,17 +391,18 @@ void* consumeThreadFunc(void* param) {
if (pInfo->ifManualCommit) {
taosFprintfFile(g_fp, "tmq_commit() manual commit when consume end.\n");
pPrint("tmq_commit() manual commit when consume end.\n");
tmq_commit(pInfo->tmq, NULL, 0);
pPrint("tmq_commit() manual commit when consume end.\n");
/*tmq_commit(pInfo->tmq, NULL, 0);*/
tmq_commit_sync(pInfo->tmq, NULL);
}
err = tmq_unsubscribe(pInfo->tmq);
if (err) {
pError("tmq_unsubscribe() fail, reason: %s\n", tmq_err2str(err));
pInfo->consumeMsgCnt = -1;
return NULL;
}
err = tmq_consumer_close(pInfo->tmq);
if (err) {
pError("tmq_consumer_close() fail, reason: %s\n", tmq_err2str(err));
@ -485,9 +480,9 @@ int32_t getConsumeInfo() {
int32_t* lengths = taos_fetch_lengths(pRes);
// set default value
//g_stConfInfo.stThreads[numOfThread].autoCommitIntervalMs = 5000;
//memcpy(g_stConfInfo.stThreads[numOfThread].autoCommit, "true", strlen("true"));
//memcpy(g_stConfInfo.stThreads[numOfThread].autoOffsetRest, "earlieast", strlen("earlieast"));
// g_stConfInfo.stThreads[numOfThread].autoCommitIntervalMs = 5000;
// memcpy(g_stConfInfo.stThreads[numOfThread].autoCommit, "true", strlen("true"));
// memcpy(g_stConfInfo.stThreads[numOfThread].autoOffsetRest, "earlieast", strlen("earlieast"));
for (int i = 0; i < num_fields; ++i) {
if (row[i] == NULL || 0 == i) {

View File

@ -195,16 +195,16 @@ void shellRunSingleCommandImp(char *command) {
et = taosGetTimestampUs();
if (error_no == 0) {
printf("Query OK, %d row(s) in set (%.6fs)\n", numOfRows, (et - st) / 1E6);
printf("Query OK, %d rows affected (%.6fs)\n", numOfRows, (et - st) / 1E6);
} else {
printf("Query interrupted (%s), %d row(s) in set (%.6fs)\n", taos_errstr(pSql), numOfRows, (et - st) / 1E6);
printf("Query interrupted (%s), %d rows affected (%.6fs)\n", taos_errstr(pSql), numOfRows, (et - st) / 1E6);
}
taos_free_result(pSql);
} else {
int32_t num_rows_affacted = taos_affected_rows(pSql);
taos_free_result(pSql);
et = taosGetTimestampUs();
printf("Query OK, %d of %d row(s) in database (%.6fs)\n", num_rows_affacted, num_rows_affacted, (et - st) / 1E6);
printf("Query OK, %d of %d rows affected (%.6fs)\n", num_rows_affacted, num_rows_affacted, (et - st) / 1E6);
}
printf("\n");