Merge branch 'develop' into test/jenkins
This commit is contained in:
commit
cafea44a95
|
@ -32,7 +32,7 @@ ELSEIF (TD_WINDOWS)
|
|||
#INSTALL(TARGETS taos RUNTIME DESTINATION driver)
|
||||
#INSTALL(TARGETS shell RUNTIME DESTINATION .)
|
||||
IF (TD_MVN_INSTALLED)
|
||||
INSTALL(FILES ${LIBRARY_OUTPUT_PATH}/taos-jdbcdriver-2.0.14-dist.jar DESTINATION connector/jdbc)
|
||||
INSTALL(FILES ${LIBRARY_OUTPUT_PATH}/taos-jdbcdriver-2.0.15-dist.jar DESTINATION connector/jdbc)
|
||||
ENDIF ()
|
||||
ELSEIF (TD_DARWIN)
|
||||
SET(TD_MAKE_INSTALL_SH "${TD_COMMUNITY_DIR}/packaging/tools/make_install.sh")
|
||||
|
|
|
@ -382,6 +382,7 @@ typedef struct SSqlObj {
|
|||
|
||||
typedef struct SSqlStream {
|
||||
SSqlObj *pSql;
|
||||
const char* dstTable;
|
||||
uint32_t streamId;
|
||||
char listed;
|
||||
bool isProject;
|
||||
|
@ -408,6 +409,8 @@ typedef struct SSqlStream {
|
|||
struct SSqlStream *prev, *next;
|
||||
} SSqlStream;
|
||||
|
||||
void tscSetStreamDestTable(SSqlStream* pStream, const char* dstTable);
|
||||
|
||||
int32_t tscInitRpc(const char *user, const char *secret, void** pDnodeConn);
|
||||
void tscInitMsgsFp();
|
||||
|
||||
|
|
|
@ -262,6 +262,11 @@ int tscBuildQueryStreamDesc(void *pMsg, STscObj *pObj) {
|
|||
SSqlStream *pStream = pObj->streamList;
|
||||
while (pStream) {
|
||||
tstrncpy(pSdesc->sql, pStream->pSql->sqlstr, sizeof(pSdesc->sql));
|
||||
if (pStream->dstTable == NULL) {
|
||||
pSdesc->dstTable[0] = 0;
|
||||
} else {
|
||||
tstrncpy(pSdesc->dstTable, pStream->dstTable, sizeof(pSdesc->dstTable));
|
||||
}
|
||||
pSdesc->streamId = htonl(pStream->streamId);
|
||||
pSdesc->num = htobe64(pStream->num);
|
||||
|
||||
|
|
|
@ -3282,7 +3282,12 @@ static int32_t extractColumnFilterInfo(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SC
|
|||
((pSchema->type == TSDB_DATA_TYPE_BINARY || pSchema->type == TSDB_DATA_TYPE_NCHAR) ? 1 : 0);
|
||||
|
||||
if (pColFilter->filterstr) {
|
||||
if (pExpr->nSQLOptr != TK_EQ && pExpr->nSQLOptr != TK_NE && pExpr->nSQLOptr != TK_LIKE) {
|
||||
if (pExpr->nSQLOptr != TK_EQ
|
||||
&& pExpr->nSQLOptr != TK_NE
|
||||
&& pExpr->nSQLOptr != TK_ISNULL
|
||||
&& pExpr->nSQLOptr != TK_NOTNULL
|
||||
&& pExpr->nSQLOptr != TK_LIKE
|
||||
) {
|
||||
return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg2);
|
||||
}
|
||||
} else {
|
||||
|
|
|
@ -535,6 +535,10 @@ static void tscCreateStream(void *param, TAOS_RES *res, int code) {
|
|||
pStream, pTableMetaInfo->name, pStream->interval.interval, pStream->interval.sliding, starttime, pSql->sqlstr);
|
||||
}
|
||||
|
||||
void tscSetStreamDestTable(SSqlStream* pStream, const char* dstTable) {
|
||||
pStream->dstTable = dstTable;
|
||||
}
|
||||
|
||||
TAOS_STREAM *taos_open_stream(TAOS *taos, const char *sqlstr, void (*fp)(void *param, TAOS_RES *, TAOS_ROW row),
|
||||
int64_t stime, void *param, void (*callback)(void *)) {
|
||||
STscObj *pObj = (STscObj *)taos;
|
||||
|
|
|
@ -203,10 +203,10 @@ int32_t tsVersion = 0;
|
|||
|
||||
// log
|
||||
int32_t tsNumOfLogLines = 10000000;
|
||||
int32_t mDebugFlag = 135;
|
||||
int32_t sdbDebugFlag = 135;
|
||||
int32_t mDebugFlag = 131;
|
||||
int32_t sdbDebugFlag = 131;
|
||||
int32_t dDebugFlag = 135;
|
||||
int32_t vDebugFlag = 135;
|
||||
int32_t vDebugFlag = 131;
|
||||
int32_t cDebugFlag = 131;
|
||||
int32_t jniDebugFlag = 131;
|
||||
int32_t odbcDebugFlag = 131;
|
||||
|
@ -220,7 +220,7 @@ int32_t debugFlag = 0;
|
|||
int32_t sDebugFlag = 135;
|
||||
int32_t wDebugFlag = 135;
|
||||
int32_t tsdbDebugFlag = 131;
|
||||
int32_t cqDebugFlag = 135;
|
||||
int32_t cqDebugFlag = 131;
|
||||
|
||||
int32_t (*monStartSystemFp)() = NULL;
|
||||
void (*monStopSystemFp)() = NULL;
|
||||
|
@ -416,7 +416,7 @@ static void doInitGlobalConfig(void) {
|
|||
cfg.option = "arbitrator";
|
||||
cfg.ptr = tsArbitrator;
|
||||
cfg.valType = TAOS_CFG_VTYPE_STRING;
|
||||
cfg.cfgType = TSDB_CFG_CTYPE_B_CONFIG | TSDB_CFG_CTYPE_B_CLIENT;
|
||||
cfg.cfgType = TSDB_CFG_CTYPE_B_CONFIG | TSDB_CFG_CTYPE_B_SHOW | TSDB_CFG_CTYPE_B_CLIENT;
|
||||
cfg.minValue = 0;
|
||||
cfg.maxValue = 0;
|
||||
cfg.ptrLength = TSDB_EP_LEN;
|
||||
|
@ -901,7 +901,7 @@ static void doInitGlobalConfig(void) {
|
|||
cfg.option = "timezone";
|
||||
cfg.ptr = tsTimezone;
|
||||
cfg.valType = TAOS_CFG_VTYPE_STRING;
|
||||
cfg.cfgType = TSDB_CFG_CTYPE_B_CONFIG | TSDB_CFG_CTYPE_B_CLIENT;
|
||||
cfg.cfgType = TSDB_CFG_CTYPE_B_CONFIG | TSDB_CFG_CTYPE_B_SHOW | TSDB_CFG_CTYPE_B_CLIENT;
|
||||
cfg.minValue = 0;
|
||||
cfg.maxValue = 0;
|
||||
cfg.ptrLength = tListLen(tsTimezone);
|
||||
|
@ -911,7 +911,7 @@ static void doInitGlobalConfig(void) {
|
|||
cfg.option = "locale";
|
||||
cfg.ptr = tsLocale;
|
||||
cfg.valType = TAOS_CFG_VTYPE_STRING;
|
||||
cfg.cfgType = TSDB_CFG_CTYPE_B_CONFIG | TSDB_CFG_CTYPE_B_CLIENT;
|
||||
cfg.cfgType = TSDB_CFG_CTYPE_B_CONFIG | TSDB_CFG_CTYPE_B_SHOW | TSDB_CFG_CTYPE_B_CLIENT;
|
||||
cfg.minValue = 0;
|
||||
cfg.maxValue = 0;
|
||||
cfg.ptrLength = tListLen(tsLocale);
|
||||
|
@ -921,7 +921,7 @@ static void doInitGlobalConfig(void) {
|
|||
cfg.option = "charset";
|
||||
cfg.ptr = tsCharset;
|
||||
cfg.valType = TAOS_CFG_VTYPE_STRING;
|
||||
cfg.cfgType = TSDB_CFG_CTYPE_B_CONFIG | TSDB_CFG_CTYPE_B_CLIENT;
|
||||
cfg.cfgType = TSDB_CFG_CTYPE_B_CONFIG | TSDB_CFG_CTYPE_B_SHOW | TSDB_CFG_CTYPE_B_CLIENT;
|
||||
cfg.minValue = 0;
|
||||
cfg.maxValue = 0;
|
||||
cfg.ptrLength = tListLen(tsCharset);
|
||||
|
|
|
@ -8,7 +8,7 @@ IF (TD_MVN_INSTALLED)
|
|||
ADD_CUSTOM_COMMAND(OUTPUT ${JDBC_CMD_NAME}
|
||||
POST_BUILD
|
||||
COMMAND mvn -Dmaven.test.skip=true install -f ${CMAKE_CURRENT_SOURCE_DIR}/pom.xml
|
||||
COMMAND ${CMAKE_COMMAND} -E copy ${CMAKE_CURRENT_SOURCE_DIR}/target/taos-jdbcdriver-2.0.14-dist.jar ${LIBRARY_OUTPUT_PATH}
|
||||
COMMAND ${CMAKE_COMMAND} -E copy ${CMAKE_CURRENT_SOURCE_DIR}/target/taos-jdbcdriver-2.0.15-dist.jar ${LIBRARY_OUTPUT_PATH}
|
||||
COMMAND mvn -Dmaven.test.skip=true clean -f ${CMAKE_CURRENT_SOURCE_DIR}/pom.xml
|
||||
COMMENT "build jdbc driver")
|
||||
ADD_CUSTOM_TARGET(${JDBC_TARGET_NAME} ALL WORKING_DIRECTORY ${EXECUTABLE_OUTPUT_PATH} DEPENDS ${JDBC_CMD_NAME})
|
||||
|
|
|
@ -5,7 +5,7 @@
|
|||
|
||||
<groupId>com.taosdata.jdbc</groupId>
|
||||
<artifactId>taos-jdbcdriver</artifactId>
|
||||
<version>2.0.14</version>
|
||||
<version>2.0.15</version>
|
||||
<packaging>jar</packaging>
|
||||
|
||||
<name>JDBCDriver</name>
|
||||
|
@ -36,7 +36,6 @@
|
|||
</developer>
|
||||
</developers>
|
||||
|
||||
|
||||
<dependencies>
|
||||
<dependency>
|
||||
<groupId>commons-logging</groupId>
|
||||
|
|
|
@ -3,7 +3,7 @@
|
|||
<modelVersion>4.0.0</modelVersion>
|
||||
<groupId>com.taosdata.jdbc</groupId>
|
||||
<artifactId>taos-jdbcdriver</artifactId>
|
||||
<version>2.0.14</version>
|
||||
<version>2.0.15</version>
|
||||
<packaging>jar</packaging>
|
||||
<name>JDBCDriver</name>
|
||||
<url>https://github.com/taosdata/TDengine/tree/master/src/connector/jdbc</url>
|
||||
|
|
|
@ -81,7 +81,7 @@ public class TSDBStatement implements Statement {
|
|||
}
|
||||
|
||||
if (!this.connector.isUpdateQuery(pSql)) {
|
||||
TSDBResultSet res = new TSDBResultSet(this.connector, resultSetPointer);
|
||||
TSDBResultSet res = new TSDBResultSet(this.connector, resultSetPointer);
|
||||
res.setBatchFetch(this.connection.getBatchFetch());
|
||||
return res;
|
||||
} else {
|
||||
|
@ -125,7 +125,8 @@ public class TSDBStatement implements Statement {
|
|||
}
|
||||
|
||||
public int getMaxFieldSize() throws SQLException {
|
||||
throw new SQLException(TSDBConstants.UNSUPPORT_METHOD_EXCEPTIONZ_MSG);
|
||||
return 0;
|
||||
// throw new SQLException(TSDBConstants.UNSUPPORT_METHOD_EXCEPTIONZ_MSG);
|
||||
}
|
||||
|
||||
public void setMaxFieldSize(int max) throws SQLException {
|
||||
|
@ -218,7 +219,8 @@ public class TSDBStatement implements Statement {
|
|||
}
|
||||
|
||||
public int getFetchDirection() throws SQLException {
|
||||
throw new SQLException(TSDBConstants.UNSUPPORT_METHOD_EXCEPTIONZ_MSG);
|
||||
return ResultSet.FETCH_FORWARD;
|
||||
// throw new SQLException(TSDBConstants.UNSUPPORT_METHOD_EXCEPTIONZ_MSG);
|
||||
}
|
||||
|
||||
/*
|
||||
|
|
|
@ -57,6 +57,7 @@ typedef struct SCqObj {
|
|||
uint64_t uid;
|
||||
int32_t tid; // table ID
|
||||
int32_t rowSize; // bytes of a row
|
||||
char * dstTable;
|
||||
char * sqlStr; // SQL string
|
||||
STSchema * pSchema; // pointer to schema array
|
||||
void * pStream;
|
||||
|
@ -185,7 +186,7 @@ void cqStop(void *handle) {
|
|||
pthread_mutex_unlock(&pContext->mutex);
|
||||
}
|
||||
|
||||
void *cqCreate(void *handle, uint64_t uid, int32_t tid, char *sqlStr, STSchema *pSchema) {
|
||||
void *cqCreate(void *handle, uint64_t uid, int32_t sid, const char* dstTable, char *sqlStr, STSchema *pSchema) {
|
||||
if (tsEnableStream == 0) {
|
||||
return NULL;
|
||||
}
|
||||
|
@ -195,9 +196,11 @@ void *cqCreate(void *handle, uint64_t uid, int32_t tid, char *sqlStr, STSchema *
|
|||
if (pObj == NULL) return NULL;
|
||||
|
||||
pObj->uid = uid;
|
||||
pObj->tid = tid;
|
||||
pObj->sqlStr = malloc(strlen(sqlStr)+1);
|
||||
strcpy(pObj->sqlStr, sqlStr);
|
||||
pObj->tid = sid;
|
||||
if (dstTable != NULL) {
|
||||
pObj->dstTable = strdup(dstTable);
|
||||
}
|
||||
pObj->sqlStr = strdup(sqlStr);
|
||||
|
||||
pObj->pSchema = tdDupSchema(pSchema);
|
||||
pObj->rowSize = schemaTLen(pSchema);
|
||||
|
@ -247,6 +250,7 @@ void cqDrop(void *handle) {
|
|||
|
||||
cInfo("vgId:%d, id:%d CQ:%s is dropped", pContext->vgId, pObj->tid, pObj->sqlStr);
|
||||
tdFreeSchema(pObj->pSchema);
|
||||
free(pObj->dstTable);
|
||||
free(pObj->sqlStr);
|
||||
free(pObj);
|
||||
|
||||
|
@ -292,6 +296,7 @@ static void cqCreateStream(SCqContext *pContext, SCqObj *pObj) {
|
|||
if (pObj->pStream == NULL) {
|
||||
pObj->pStream = taos_open_stream(pContext->dbConn, pObj->sqlStr, cqProcessStreamRes, 0, pObj, NULL);
|
||||
if (pObj->pStream) {
|
||||
tscSetStreamDestTable(pObj->pStream, pObj->dstTable);
|
||||
pContext->num++;
|
||||
cInfo("vgId:%d, id:%d CQ:%s is openned", pContext->vgId, pObj->tid, pObj->sqlStr);
|
||||
} else {
|
||||
|
|
|
@ -70,7 +70,7 @@ int main(int argc, char *argv[]) {
|
|||
tdDestroyTSchemaBuilder(&schemaBuilder);
|
||||
|
||||
for (int sid =1; sid<10; ++sid) {
|
||||
cqCreate(pCq, sid, sid, "select avg(speed) from demo.t1 sliding(1s) interval(5s)", pSchema);
|
||||
cqCreate(pCq, sid, sid, NULL, "select avg(speed) from demo.t1 sliding(1s) interval(5s)", pSchema);
|
||||
}
|
||||
|
||||
tdFreeSchema(pSchema);
|
||||
|
|
|
@ -36,6 +36,14 @@ extern int32_t dDebugFlag;
|
|||
#define dDebug(...) { if (dDebugFlag & DEBUG_DEBUG) { taosPrintLog("DND ", dDebugFlag, __VA_ARGS__); }}
|
||||
#define dTrace(...) { if (dDebugFlag & DEBUG_TRACE) { taosPrintLog("DND ", dDebugFlag, __VA_ARGS__); }}
|
||||
|
||||
typedef enum {
|
||||
TSDB_RUN_STATUS_INITIALIZE,
|
||||
TSDB_RUN_STATUS_RUNING,
|
||||
TSDB_RUN_STATUS_STOPPED
|
||||
} SRunStatus;
|
||||
|
||||
SRunStatus dnodeGetRunStatus();
|
||||
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
#endif
|
||||
|
|
|
@ -22,8 +22,8 @@ extern "C" {
|
|||
#include "dnodeInt.h"
|
||||
|
||||
int32_t dnodeInitModules();
|
||||
void dnodeStartModules();
|
||||
void dnodeCleanupModules();
|
||||
bool dnodeStartMnode(SMInfos *pMinfos);
|
||||
void dnodeProcessModuleStatus(uint32_t moduleStatus);
|
||||
|
||||
#ifdef __cplusplus
|
||||
|
|
|
@ -23,8 +23,8 @@ extern "C" {
|
|||
|
||||
int32_t dnodeInitVnodes();
|
||||
void dnodeCleanupVnodes();
|
||||
int32_t dnodeInitTimer();
|
||||
void dnodeCleanupTimer();
|
||||
int32_t dnodeInitStatusTimer();
|
||||
void dnodeCleanupStatusTimer();
|
||||
void dnodeSendStatusMsgToMnode();
|
||||
|
||||
#ifdef __cplusplus
|
||||
|
|
|
@ -237,7 +237,7 @@ PRASE_EPS_OVER:
|
|||
dnodeUpdateEp(dnodeGetDnodeId(), tsLocalEp, tsLocalFqdn, &tsServerPort);
|
||||
#else
|
||||
if (dnodeCheckEpChanged(dnodeGetDnodeId(), tsLocalEp)) {
|
||||
dError("dnode:%d, localEp is changed to %s in dnodeEps.json and need reconfigured", dnodeGetDnodeId(), tsLocalEp);
|
||||
dError("dnode:%d, localEp is different from %s in dnodeEps.json and need reconfigured", dnodeGetDnodeId(), tsLocalEp);
|
||||
return -1;
|
||||
}
|
||||
#endif
|
||||
|
|
|
@ -121,7 +121,7 @@ void dnodeDispatchToMWriteQueue(SRpcMsg *pMsg) {
|
|||
dnodeSendRedirectMsg(pMsg, true);
|
||||
} else {
|
||||
SMnodeMsg *pWrite = mnodeCreateMsg(pMsg);
|
||||
dDebug("msg:%p, app:%p type:%s is put into mwrite queue:%p", pWrite, pWrite->rpcMsg.ahandle,
|
||||
dTrace("msg:%p, app:%p type:%s is put into mwrite queue:%p", pWrite, pWrite->rpcMsg.ahandle,
|
||||
taosMsg[pWrite->rpcMsg.msgType], tsMWriteQueue);
|
||||
taosWriteQitem(tsMWriteQueue, TAOS_QTYPE_RPC, pWrite);
|
||||
}
|
||||
|
@ -130,7 +130,7 @@ void dnodeDispatchToMWriteQueue(SRpcMsg *pMsg) {
|
|||
}
|
||||
|
||||
static void dnodeFreeMWriteMsg(SMnodeMsg *pWrite) {
|
||||
dDebug("msg:%p, app:%p type:%s is freed from mwrite queue:%p", pWrite, pWrite->rpcMsg.ahandle,
|
||||
dTrace("msg:%p, app:%p type:%s is freed from mwrite queue:%p", pWrite, pWrite->rpcMsg.ahandle,
|
||||
taosMsg[pWrite->rpcMsg.msgType], tsMWriteQueue);
|
||||
|
||||
mnodeCleanupMsg(pWrite);
|
||||
|
|
|
@ -17,6 +17,7 @@
|
|||
#include "os.h"
|
||||
#include "taos.h"
|
||||
#include "tnote.h"
|
||||
#include "ttimer.h"
|
||||
#include "tconfig.h"
|
||||
#include "tfile.h"
|
||||
#include "twal.h"
|
||||
|
@ -39,6 +40,7 @@
|
|||
#include "dnodeShell.h"
|
||||
#include "dnodeTelemetry.h"
|
||||
|
||||
void *tsDnodeTmr = NULL;
|
||||
static SRunStatus tsRunStatus = TSDB_RUN_STATUS_STOPPED;
|
||||
|
||||
static int32_t dnodeInitStorage();
|
||||
|
@ -68,8 +70,8 @@ static SStep tsDnodeSteps[] = {
|
|||
{"dnode-server", dnodeInitServer, dnodeCleanupServer},
|
||||
{"dnode-vnodes", dnodeInitVnodes, dnodeCleanupVnodes},
|
||||
{"dnode-modules", dnodeInitModules, dnodeCleanupModules},
|
||||
{"dnode-tmr", dnodeInitTimer, dnodeCleanupTimer},
|
||||
{"dnode-shell", dnodeInitShell, dnodeCleanupShell},
|
||||
{"dnode-statustmr", dnodeInitStatusTimer,dnodeCleanupStatusTimer},
|
||||
{"dnode-telemetry", dnodeInitTelemetry, dnodeCleanupTelemetry},
|
||||
};
|
||||
|
||||
|
@ -91,6 +93,23 @@ static int32_t dnodeInitComponents() {
|
|||
return dnodeStepInit(tsDnodeSteps, stepSize);
|
||||
}
|
||||
|
||||
static int32_t dnodeInitTmr() {
|
||||
tsDnodeTmr = taosTmrInit(100, 200, 60000, "DND-DM");
|
||||
if (tsDnodeTmr == NULL) {
|
||||
dError("failed to init dnode timer");
|
||||
return -1;
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
static void dnodeCleanupTmr() {
|
||||
if (tsDnodeTmr != NULL) {
|
||||
taosTmrCleanUp(tsDnodeTmr);
|
||||
tsDnodeTmr = NULL;
|
||||
}
|
||||
}
|
||||
|
||||
int32_t dnodeInitSystem() {
|
||||
dnodeSetRunStatus(TSDB_RUN_STATUS_INITIALIZE);
|
||||
tscEmbedded = 1;
|
||||
|
@ -100,6 +119,7 @@ int32_t dnodeInitSystem() {
|
|||
taosReadGlobalLogCfg();
|
||||
taosSetCoreDump();
|
||||
taosInitNotes();
|
||||
dnodeInitTmr();
|
||||
signal(SIGPIPE, SIG_IGN);
|
||||
|
||||
if (dnodeCreateDir(tsLogDir) < 0) {
|
||||
|
@ -125,7 +145,6 @@ int32_t dnodeInitSystem() {
|
|||
return -1;
|
||||
}
|
||||
|
||||
dnodeStartModules();
|
||||
dnodeSetRunStatus(TSDB_RUN_STATUS_RUNING);
|
||||
|
||||
dInfo("TDengine is initialized successfully");
|
||||
|
@ -136,6 +155,7 @@ int32_t dnodeInitSystem() {
|
|||
void dnodeCleanUpSystem() {
|
||||
if (dnodeGetRunStatus() != TSDB_RUN_STATUS_STOPPED) {
|
||||
dnodeSetRunStatus(TSDB_RUN_STATUS_STOPPED);
|
||||
dnodeCleanupTmr();
|
||||
dnodeCleanupComponents();
|
||||
taos_cleanup();
|
||||
taosCloseLog();
|
||||
|
|
|
@ -97,6 +97,20 @@ void dnodeCleanupModules() {
|
|||
}
|
||||
}
|
||||
|
||||
static int32_t dnodeStartModules() {
|
||||
for (EModuleType module = 1; module < TSDB_MOD_MAX; ++module) {
|
||||
if (tsModule[module].enable && tsModule[module].startFp) {
|
||||
int32_t code = (*tsModule[module].startFp)();
|
||||
if (code != 0) {
|
||||
dError("failed to start module:%s, code:%d", tsModule[module].name, code);
|
||||
return code;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
int32_t dnodeInitModules() {
|
||||
dnodeAllocModules();
|
||||
|
||||
|
@ -110,17 +124,7 @@ int32_t dnodeInitModules() {
|
|||
}
|
||||
|
||||
dInfo("dnode modules is initialized");
|
||||
return 0;
|
||||
}
|
||||
|
||||
void dnodeStartModules() {
|
||||
for (EModuleType module = 1; module < TSDB_MOD_MAX; ++module) {
|
||||
if (tsModule[module].enable && tsModule[module].startFp) {
|
||||
if ((*tsModule[module].startFp)() != 0) {
|
||||
dError("failed to start module:%s", tsModule[module].name);
|
||||
}
|
||||
}
|
||||
}
|
||||
return dnodeStartModules();
|
||||
}
|
||||
|
||||
void dnodeProcessModuleStatus(uint32_t moduleStatus) {
|
||||
|
|
|
@ -96,7 +96,7 @@ static void dnodeProcessReqMsgFromDnode(SRpcMsg *pMsg, SRpcEpSet *pEpSet) {
|
|||
rspMsg.code = TSDB_CODE_APP_NOT_READY;
|
||||
rpcSendResponse(&rspMsg);
|
||||
rpcFreeCont(pMsg->pCont);
|
||||
dDebug("RPC %p, msg:%s is ignored since dnode not running", pMsg->handle, taosMsg[pMsg->msgType]);
|
||||
dTrace("RPC %p, msg:%s is ignored since dnode not running", pMsg->handle, taosMsg[pMsg->msgType]);
|
||||
return;
|
||||
}
|
||||
|
||||
|
@ -151,7 +151,7 @@ void dnodeCleanupClient() {
|
|||
static void dnodeProcessRspFromDnode(SRpcMsg *pMsg, SRpcEpSet *pEpSet) {
|
||||
if (dnodeGetRunStatus() == TSDB_RUN_STATUS_STOPPED) {
|
||||
if (pMsg == NULL || pMsg->pCont == NULL) return;
|
||||
dDebug("msg:%p is ignored since dnode is stopping", pMsg);
|
||||
dTrace("msg:%p is ignored since dnode is stopping", pMsg);
|
||||
rpcFreeCont(pMsg->pCont);
|
||||
return;
|
||||
}
|
||||
|
|
|
@ -126,14 +126,14 @@ static void *dnodeProcessMgmtQueue(void *param) {
|
|||
}
|
||||
|
||||
pMsg = &pMgmt->rpcMsg;
|
||||
dDebug("msg:%p, ahandle:%p type:%s will be processed", pMgmt, pMsg->ahandle, taosMsg[pMsg->msgType]);
|
||||
dTrace("msg:%p, ahandle:%p type:%s will be processed", pMgmt, pMsg->ahandle, taosMsg[pMsg->msgType]);
|
||||
if (dnodeProcessMgmtMsgFp[pMsg->msgType]) {
|
||||
rsp.code = (*dnodeProcessMgmtMsgFp[pMsg->msgType])(pMsg);
|
||||
} else {
|
||||
rsp.code = TSDB_CODE_DND_MSG_NOT_PROCESSED;
|
||||
}
|
||||
|
||||
dDebug("msg:%p, is processed, code:0x%x", pMgmt, rsp.code);
|
||||
dTrace("msg:%p, is processed, code:0x%x", pMgmt, rsp.code);
|
||||
if (rsp.code != TSDB_CODE_DND_ACTION_IN_PROGRESS) {
|
||||
rsp.handle = pMsg->handle;
|
||||
rsp.pCont = NULL;
|
||||
|
|
|
@ -30,39 +30,28 @@ typedef struct {
|
|||
int32_t * vnodeList;
|
||||
} SOpenVnodeThread;
|
||||
|
||||
void * tsDnodeTmr = NULL;
|
||||
extern void * tsDnodeTmr;
|
||||
static void * tsStatusTimer = NULL;
|
||||
static uint32_t tsRebootTime = 0;
|
||||
|
||||
static void dnodeSendStatusMsg(void *handle, void *tmrId);
|
||||
static void dnodeProcessStatusRsp(SRpcMsg *pMsg);
|
||||
|
||||
int32_t dnodeInitTimer() {
|
||||
tsDnodeTmr = taosTmrInit(100, 200, 60000, "DND-DM");
|
||||
if (tsDnodeTmr == NULL) {
|
||||
dError("failed to init dnode timer");
|
||||
return -1;
|
||||
}
|
||||
|
||||
int32_t dnodeInitStatusTimer() {
|
||||
dnodeAddClientRspHandle(TSDB_MSG_TYPE_DM_STATUS_RSP, dnodeProcessStatusRsp);
|
||||
|
||||
tsRebootTime = taosGetTimestampSec();
|
||||
taosTmrReset(dnodeSendStatusMsg, 500, NULL, tsDnodeTmr, &tsStatusTimer);
|
||||
|
||||
dInfo("dnode timer is initialized");
|
||||
dInfo("dnode status timer is initialized");
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
void dnodeCleanupTimer() {
|
||||
void dnodeCleanupStatusTimer() {
|
||||
if (tsStatusTimer != NULL) {
|
||||
taosTmrStopA(&tsStatusTimer);
|
||||
tsStatusTimer = NULL;
|
||||
}
|
||||
|
||||
if (tsDnodeTmr != NULL) {
|
||||
taosTmrCleanUp(tsDnodeTmr);
|
||||
tsDnodeTmr = NULL;
|
||||
}
|
||||
}
|
||||
|
||||
static int32_t dnodeGetVnodeList(int32_t vnodeList[], int32_t *numOfVnodes) {
|
||||
|
|
|
@ -29,13 +29,6 @@ typedef struct {
|
|||
int32_t httpReqNum;
|
||||
} SStatisInfo;
|
||||
|
||||
typedef enum {
|
||||
TSDB_RUN_STATUS_INITIALIZE,
|
||||
TSDB_RUN_STATUS_RUNING,
|
||||
TSDB_RUN_STATUS_STOPPED
|
||||
} SRunStatus;
|
||||
|
||||
SRunStatus dnodeGetRunStatus();
|
||||
SStatisInfo dnodeGetStatisInfo();
|
||||
|
||||
bool dnodeIsFirstDeploy();
|
||||
|
|
|
@ -787,6 +787,7 @@ typedef struct {
|
|||
|
||||
typedef struct {
|
||||
char sql[TSDB_SHOW_SQL_LEN];
|
||||
char dstTable[TSDB_TABLE_NAME_LEN];
|
||||
uint32_t streamId;
|
||||
int64_t num; // number of computing/cycles
|
||||
int64_t useconds;
|
||||
|
|
|
@ -42,7 +42,7 @@ void cqStart(void *handle);
|
|||
void cqStop(void *handle);
|
||||
|
||||
// cqCreate is called by TSDB to start an instance of CQ
|
||||
void *cqCreate(void *handle, uint64_t uid, int32_t sid, char *sqlStr, STSchema *pSchema);
|
||||
void *cqCreate(void *handle, uint64_t uid, int32_t sid, const char* dstTable, char *sqlStr, STSchema *pSchema);
|
||||
|
||||
// cqDrop is called by TSDB to stop an instance of CQ, handle is the return value of cqCreate
|
||||
void cqDrop(void *handle);
|
||||
|
|
|
@ -48,7 +48,7 @@ typedef struct {
|
|||
void *cqH;
|
||||
int (*notifyStatus)(void *, int status, int eno);
|
||||
int (*eventCallBack)(void *);
|
||||
void *(*cqCreateFunc)(void *handle, uint64_t uid, int sid, char *sqlStr, STSchema *pSchema);
|
||||
void *(*cqCreateFunc)(void *handle, uint64_t uid, int32_t sid, const char* dstTable, char *sqlStr, STSchema *pSchema);
|
||||
void (*cqDropFunc)(void *handle);
|
||||
} STsdbAppH;
|
||||
|
||||
|
|
|
@ -45,6 +45,10 @@ void printHelp() {
|
|||
printf("%s%s%s\n", indent, indent, "Database to use when connecting to the server.");
|
||||
printf("%s%s\n", indent, "-t");
|
||||
printf("%s%s%s\n", indent, indent, "Time zone of the shell, default is local.");
|
||||
printf("%s%s\n", indent, "-n");
|
||||
printf("%s%s%s\n", indent, indent, "Net role when network connectivity test, default is startup, options: client|server|rpc|startup.");
|
||||
printf("%s%s\n", indent, "-l");
|
||||
printf("%s%s%s\n", indent, indent, "Packet length used for net test, default is 1000 bytes.");
|
||||
|
||||
exit(EXIT_SUCCESS);
|
||||
}
|
||||
|
@ -137,6 +141,24 @@ void shellParseArgument(int argc, char *argv[], SShellArguments *arguments) {
|
|||
exit(EXIT_FAILURE);
|
||||
}
|
||||
}
|
||||
// For time zone
|
||||
else if (strcmp(argv[i], "-n") == 0) {
|
||||
if (i < argc - 1) {
|
||||
arguments->netTestRole = argv[++i];
|
||||
} else {
|
||||
fprintf(stderr, "option -n requires an argument\n");
|
||||
exit(EXIT_FAILURE);
|
||||
}
|
||||
}
|
||||
// For time zone
|
||||
else if (strcmp(argv[i], "-l") == 0) {
|
||||
if (i < argc - 1) {
|
||||
arguments->pktLen = atoi(argv[++i]);
|
||||
} else {
|
||||
fprintf(stderr, "option -l requires an argument\n");
|
||||
exit(EXIT_FAILURE);
|
||||
}
|
||||
}
|
||||
// For temperory command TODO
|
||||
else if (strcmp(argv[i], "--help") == 0) {
|
||||
printHelp();
|
||||
|
|
File diff suppressed because it is too large
Load Diff
|
@ -450,6 +450,12 @@ static int32_t mnodeGetStreamMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *p
|
|||
pSchema[cols].bytes = htons(pShow->bytes[cols]);
|
||||
cols++;
|
||||
|
||||
pShow->bytes[cols] = TSDB_TABLE_NAME_LEN + VARSTR_HEADER_SIZE;
|
||||
pSchema[cols].type = TSDB_DATA_TYPE_BINARY;
|
||||
strcpy(pSchema[cols].name, "dest table");
|
||||
pSchema[cols].bytes = htons(pShow->bytes[cols]);
|
||||
cols++;
|
||||
|
||||
pShow->bytes[cols] = TSDB_IPv4ADDR_LEN + 6 + VARSTR_HEADER_SIZE;
|
||||
pSchema[cols].type = TSDB_DATA_TYPE_BINARY;
|
||||
strcpy(pSchema[cols].name, "ip:port");
|
||||
|
@ -524,6 +530,10 @@ static int32_t mnodeRetrieveStreams(SShowObj *pShow, char *data, int32_t rows, v
|
|||
STR_WITH_MAXSIZE_TO_VARSTR(pWrite, pConnObj->user, pShow->bytes[cols]);
|
||||
cols++;
|
||||
|
||||
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
|
||||
STR_WITH_MAXSIZE_TO_VARSTR(pWrite, pDesc->dstTable, pShow->bytes[cols]);
|
||||
cols++;
|
||||
|
||||
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
|
||||
snprintf(ipStr, sizeof(ipStr), "%s:%u", taosIpStr(pConnObj->ip), pConnObj->port);
|
||||
STR_WITH_MAXSIZE_TO_VARSTR(pWrite, ipStr, pShow->bytes[cols]);
|
||||
|
|
|
@ -396,14 +396,15 @@ static void mnodeAddTableIntoStable(SSTableObj *pStable, SCTableObj *pCtable) {
|
|||
atomic_add_fetch_32(&pStable->numOfTables, 1);
|
||||
|
||||
if (pStable->vgHash == NULL) {
|
||||
pStable->vgHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_NO_LOCK);
|
||||
pStable->vgHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_ENTRY_LOCK);
|
||||
mDebug("table:%s, create hash:%p", pStable->info.tableId, pStable->vgHash);
|
||||
}
|
||||
|
||||
if (pStable->vgHash != NULL) {
|
||||
if (taosHashGet(pStable->vgHash, &pCtable->vgId, sizeof(pCtable->vgId)) == NULL) {
|
||||
taosHashPut(pStable->vgHash, &pCtable->vgId, sizeof(pCtable->vgId), &pCtable->vgId, sizeof(pCtable->vgId));
|
||||
mDebug("table:%s, vgId:%d is put into stable vgList, sizeOfVgList:%d", pStable->info.tableId, pCtable->vgId,
|
||||
(int32_t)taosHashGetSize(pStable->vgHash));
|
||||
mDebug("table:%s, vgId:%d is put into stable hash:%p, sizeOfVgList:%d", pStable->info.tableId, pCtable->vgId,
|
||||
pStable->vgHash, taosHashGetSize(pStable->vgHash));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -416,13 +417,14 @@ static void mnodeRemoveTableFromStable(SSTableObj *pStable, SCTableObj *pCtable)
|
|||
SVgObj *pVgroup = mnodeGetVgroup(pCtable->vgId);
|
||||
if (pVgroup == NULL) {
|
||||
taosHashRemove(pStable->vgHash, &pCtable->vgId, sizeof(pCtable->vgId));
|
||||
mDebug("table:%s, vgId:%d is remove from stable vgList, sizeOfVgList:%d", pStable->info.tableId, pCtable->vgId,
|
||||
(int32_t)taosHashGetSize(pStable->vgHash));
|
||||
mDebug("table:%s, vgId:%d is remove from stable hash:%p sizeOfVgList:%d", pStable->info.tableId, pCtable->vgId,
|
||||
pStable->vgHash, taosHashGetSize(pStable->vgHash));
|
||||
}
|
||||
mnodeDecVgroupRef(pVgroup);
|
||||
}
|
||||
|
||||
static void mnodeDestroySuperTable(SSTableObj *pStable) {
|
||||
mDebug("table:%s, is destroyed, stable hash:%p", pStable->info.tableId, pStable->vgHash);
|
||||
if (pStable->vgHash != NULL) {
|
||||
taosHashCleanup(pStable->vgHash);
|
||||
pStable->vgHash = NULL;
|
||||
|
@ -464,6 +466,9 @@ static int32_t mnodeSuperTableActionUpdate(SSdbRow *pRow) {
|
|||
SSTableObj *pNew = pRow->pObj;
|
||||
SSTableObj *pTable = mnodeGetSuperTable(pNew->info.tableId);
|
||||
if (pTable != NULL && pTable != pNew) {
|
||||
mDebug("table:%s, will be updated, hash:%p sizeOfVgList:%d, new hash:%p sizeOfVgList:%d", pTable->info.tableId,
|
||||
pTable->vgHash, taosHashGetSize(pTable->vgHash), pNew->vgHash, taosHashGetSize(pNew->vgHash));
|
||||
|
||||
void *oldTableId = pTable->info.tableId;
|
||||
void *oldSchema = pTable->schema;
|
||||
void *oldVgHash = pTable->vgHash;
|
||||
|
@ -479,6 +484,9 @@ static int32_t mnodeSuperTableActionUpdate(SSdbRow *pRow) {
|
|||
free(pNew);
|
||||
free(oldTableId);
|
||||
free(oldSchema);
|
||||
|
||||
mDebug("table:%s, update finished, hash:%p sizeOfVgList:%d", pTable->info.tableId, pTable->vgHash,
|
||||
taosHashGetSize(pTable->vgHash));
|
||||
}
|
||||
|
||||
mnodeDecTableRef(pTable);
|
||||
|
@ -783,8 +791,8 @@ static int32_t mnodeProcessDropTableMsg(SMnodeMsg *pMsg) {
|
|||
|
||||
if (pMsg->pTable->type == TSDB_SUPER_TABLE) {
|
||||
SSTableObj *pSTable = (SSTableObj *)pMsg->pTable;
|
||||
mInfo("msg:%p, app:%p table:%s, start to drop stable, uid:%" PRIu64 ", numOfChildTables:%d, sizeOfVgList:%d",
|
||||
pMsg, pMsg->rpcMsg.ahandle, pDrop->tableId, pSTable->uid, pSTable->numOfTables, (int32_t)taosHashGetSize(pSTable->vgHash));
|
||||
mInfo("msg:%p, app:%p table:%s, start to drop stable, uid:%" PRIu64 ", numOfChildTables:%d, sizeOfVgList:%d", pMsg,
|
||||
pMsg->rpcMsg.ahandle, pDrop->tableId, pSTable->uid, pSTable->numOfTables, taosHashGetSize(pSTable->vgHash));
|
||||
return mnodeProcessDropSuperTableMsg(pMsg);
|
||||
} else {
|
||||
SCTableObj *pCTable = (SCTableObj *)pMsg->pTable;
|
||||
|
@ -925,7 +933,10 @@ static int32_t mnodeProcessDropSuperTableMsg(SMnodeMsg *pMsg) {
|
|||
if (pMsg == NULL) return TSDB_CODE_MND_APP_ERROR;
|
||||
|
||||
SSTableObj *pStable = (SSTableObj *)pMsg->pTable;
|
||||
if (pStable->vgHash != NULL /*pStable->numOfTables != 0*/) {
|
||||
mInfo("msg:%p, app:%p stable:%s will be dropped, hash:%p sizeOfVgList:%d", pMsg, pMsg->rpcMsg.ahandle,
|
||||
pStable->info.tableId, pStable->vgHash, taosHashGetSize(pStable->vgHash));
|
||||
|
||||
if (pStable->vgHash != NULL /*pStable->numOfTables != 0*/) {
|
||||
int32_t *pVgId = taosHashIterate(pStable->vgHash, NULL);
|
||||
while (pVgId) {
|
||||
SVgObj *pVgroup = mnodeGetVgroup(*pVgId);
|
||||
|
@ -938,8 +949,9 @@ static int32_t mnodeProcessDropSuperTableMsg(SMnodeMsg *pMsg) {
|
|||
pDrop->uid = htobe64(pStable->uid);
|
||||
mnodeExtractTableName(pStable->info.tableId, pDrop->tableId);
|
||||
|
||||
mInfo("msg:%p, app:%p stable:%s, send drop stable msg to vgId:%d", pMsg, pMsg->rpcMsg.ahandle,
|
||||
pStable->info.tableId, pVgroup->vgId);
|
||||
mInfo("msg:%p, app:%p stable:%s, send drop stable msg to vgId:%d, hash:%p sizeOfVgList:%d", pMsg,
|
||||
pMsg->rpcMsg.ahandle, pStable->info.tableId, pVgroup->vgId, pStable->vgHash,
|
||||
taosHashGetSize(pStable->vgHash));
|
||||
SRpcEpSet epSet = mnodeGetEpSetFromVgroup(pVgroup);
|
||||
SRpcMsg rpcMsg = {.pCont = pDrop, .contLen = sizeof(SDropSTableMsg), .msgType = TSDB_MSG_TYPE_MD_DROP_STABLE};
|
||||
dnodeSendMsgToDnode(&epSet, &rpcMsg);
|
||||
|
@ -1482,8 +1494,8 @@ static int32_t mnodeGetSuperTableMeta(SMnodeMsg *pMsg) {
|
|||
|
||||
pMsg->rpcRsp.rsp = pMeta;
|
||||
|
||||
mDebug("msg:%p, app:%p stable:%s, uid:%" PRIu64 " table meta is retrieved", pMsg, pMsg->rpcMsg.ahandle,
|
||||
pTable->info.tableId, pTable->uid);
|
||||
mDebug("msg:%p, app:%p stable:%s, uid:%" PRIu64 " table meta is retrieved, sizeOfVgList:%d numOfTables:%d", pMsg,
|
||||
pMsg->rpcMsg.ahandle, pTable->info.tableId, pTable->uid, taosHashGetSize(pTable->vgHash), pTable->numOfTables);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
|
@ -1512,7 +1524,7 @@ static int32_t mnodeProcessSuperTableVgroupMsg(SMnodeMsg *pMsg) {
|
|||
char *msg = (char *)pRsp + sizeof(SSTableVgroupRspMsg);
|
||||
|
||||
for (int32_t i = 0; i < numOfTable; ++i) {
|
||||
char * stableName = (char *)pInfo + sizeof(SSTableVgroupMsg) + (TSDB_TABLE_FNAME_LEN)*i;
|
||||
char *stableName = (char *)pInfo + sizeof(SSTableVgroupMsg) + (TSDB_TABLE_FNAME_LEN)*i;
|
||||
SSTableObj *pTable = mnodeGetSuperTable(stableName);
|
||||
if (pTable == NULL) {
|
||||
mError("msg:%p, app:%p stable:%s, not exist while get stable vgroup info", pMsg, pMsg->rpcMsg.ahandle, stableName);
|
||||
|
@ -1533,6 +1545,8 @@ static int32_t mnodeProcessSuperTableVgroupMsg(SMnodeMsg *pMsg) {
|
|||
msg += sizeof(SVgroupsMsg);
|
||||
} else {
|
||||
SVgroupsMsg *pVgroupMsg = (SVgroupsMsg *)msg;
|
||||
mDebug("msg:%p, app:%p stable:%s, hash:%p sizeOfVgList:%d will be returned", pMsg, pMsg->rpcMsg.ahandle,
|
||||
pTable->info.tableId, pTable->vgHash, taosHashGetSize(pTable->vgHash));
|
||||
|
||||
int32_t *pVgId = taosHashIterate(pTable->vgHash, NULL);
|
||||
int32_t vgSize = 0;
|
||||
|
|
|
@ -79,7 +79,7 @@ bool httpInitContexts() {
|
|||
void httpCleanupContexts() {
|
||||
if (tsHttpServer.contextCache != NULL) {
|
||||
SCacheObj *cache = tsHttpServer.contextCache;
|
||||
httpInfo("context cache is cleanuping, size:%" PRIzu "", taosHashGetSize(cache->pHashTable));
|
||||
httpInfo("context cache is cleanuping, size:%d", taosHashGetSize(cache->pHashTable));
|
||||
taosCacheCleanup(tsHttpServer.contextCache);
|
||||
tsHttpServer.contextCache = NULL;
|
||||
}
|
||||
|
|
|
@ -107,7 +107,7 @@ static void httpDestroySession(void *data) {
|
|||
void httpCleanUpSessions() {
|
||||
if (tsHttpServer.sessionCache != NULL) {
|
||||
SCacheObj *cache = tsHttpServer.sessionCache;
|
||||
httpInfo("session cache is cleanuping, size:%" PRIzu "", taosHashGetSize(cache->pHashTable));
|
||||
httpInfo("session cache is cleanuping, size:%d", taosHashGetSize(cache->pHashTable));
|
||||
taosCacheCleanup(tsHttpServer.sessionCache);
|
||||
tsHttpServer.sessionCache = NULL;
|
||||
}
|
||||
|
|
|
@ -872,7 +872,7 @@ static void tsdbStartStream(STsdbRepo *pRepo) {
|
|||
for (int i = 0; i < pMeta->maxTables; i++) {
|
||||
STable *pTable = pMeta->tables[i];
|
||||
if (pTable && pTable->type == TSDB_STREAM_TABLE) {
|
||||
pTable->cqhandle = (*pRepo->appH.cqCreateFunc)(pRepo->appH.cqH, TABLE_UID(pTable), TABLE_TID(pTable), pTable->sql,
|
||||
pTable->cqhandle = (*pRepo->appH.cqCreateFunc)(pRepo->appH.cqH, TABLE_UID(pTable), TABLE_TID(pTable), TABLE_NAME(pTable)->data, pTable->sql,
|
||||
tsdbGetTableSchemaImpl(pTable, false, false, -1));
|
||||
}
|
||||
}
|
||||
|
|
|
@ -828,7 +828,7 @@ static int tsdbAddTableToMeta(STsdbRepo *pRepo, STable *pTable, bool addIdx, boo
|
|||
|
||||
if (lock && tsdbUnlockRepoMeta(pRepo) < 0) return -1;
|
||||
if (TABLE_TYPE(pTable) == TSDB_STREAM_TABLE && addIdx) {
|
||||
pTable->cqhandle = (*pRepo->appH.cqCreateFunc)(pRepo->appH.cqH, TABLE_UID(pTable), TABLE_TID(pTable), pTable->sql,
|
||||
pTable->cqhandle = (*pRepo->appH.cqCreateFunc)(pRepo->appH.cqH, TABLE_UID(pTable), TABLE_TID(pTable), TABLE_NAME(pTable)->data, pTable->sql,
|
||||
tsdbGetTableSchemaImpl(pTable, false, false, -1));
|
||||
}
|
||||
|
||||
|
|
|
@ -82,7 +82,7 @@ SHashObj *taosHashInit(size_t capacity, _hash_fn_t fn, bool update, SHashLockTyp
|
|||
* @param pHashObj
|
||||
* @return
|
||||
*/
|
||||
size_t taosHashGetSize(const SHashObj *pHashObj);
|
||||
int32_t taosHashGetSize(const SHashObj *pHashObj);
|
||||
|
||||
/**
|
||||
* put element into hash table, if the element with the same key exists, update it
|
||||
|
|
|
@ -189,7 +189,7 @@ SHashObj *taosHashInit(size_t capacity, _hash_fn_t fn, bool update, SHashLockTyp
|
|||
return pHashObj;
|
||||
}
|
||||
|
||||
size_t taosHashGetSize(const SHashObj *pHashObj) { return (pHashObj == NULL) ? 0 : pHashObj->size; }
|
||||
int32_t taosHashGetSize(const SHashObj *pHashObj) { return (int32_t)((pHashObj == NULL) ? 0 : pHashObj->size); }
|
||||
|
||||
int32_t taosHashPut(SHashObj *pHashObj, const void *key, size_t keyLen, void *data, size_t size) {
|
||||
uint32_t hashVal = (*pHashObj->hashFp)(key, (uint32_t)keyLen);
|
||||
|
|
|
@ -240,9 +240,6 @@ void taosReadGlobalLogCfg() {
|
|||
int olen, vlen;
|
||||
char fileName[PATH_MAX] = {0};
|
||||
|
||||
mDebugFlag = 135;
|
||||
sdbDebugFlag = 135;
|
||||
|
||||
wordexp_t full_path;
|
||||
if ( 0 != wordexp(configDir, &full_path, 0)) {
|
||||
printf("\nconfig file: %s wordexp fail! reason:%s\n", configDir, strerror(errno));
|
||||
|
|
|
@ -43,12 +43,13 @@ static void *taosNetBindUdpPort(void *sarg) {
|
|||
char buffer[BUFFER_SIZE];
|
||||
int32_t iDataNum;
|
||||
socklen_t sin_size;
|
||||
int32_t bufSize = 1024000;
|
||||
|
||||
struct sockaddr_in server_addr;
|
||||
struct sockaddr_in clientAddr;
|
||||
|
||||
if ((serverSocket = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP)) < 0) {
|
||||
uError("failed to create udp socket since %s", strerror(errno));
|
||||
uError("failed to create UDP socket since %s", strerror(errno));
|
||||
return NULL;
|
||||
}
|
||||
|
||||
|
@ -58,11 +59,23 @@ static void *taosNetBindUdpPort(void *sarg) {
|
|||
server_addr.sin_addr.s_addr = htonl(INADDR_ANY);
|
||||
|
||||
if (bind(serverSocket, (struct sockaddr *)&server_addr, sizeof(server_addr)) < 0) {
|
||||
uError("failed to bind udp port:%d since %s", port, strerror(errno));
|
||||
uError("failed to bind UDP port:%d since %s", port, strerror(errno));
|
||||
return NULL;
|
||||
}
|
||||
|
||||
uInfo("udp server at port:%d is listening", port);
|
||||
if (taosSetSockOpt(serverSocket, SOL_SOCKET, SO_SNDBUF, (void *)&bufSize, sizeof(bufSize)) != 0) {
|
||||
uError("failed to set the send buffer size for UDP socket\n");
|
||||
taosCloseSocket(serverSocket);
|
||||
return NULL;
|
||||
}
|
||||
|
||||
if (taosSetSockOpt(serverSocket, SOL_SOCKET, SO_RCVBUF, (void *)&bufSize, sizeof(bufSize)) != 0) {
|
||||
uError("failed to set the receive buffer size for UDP socket\n");
|
||||
taosCloseSocket(serverSocket);
|
||||
return NULL;
|
||||
}
|
||||
|
||||
uInfo("UDP server at port:%d is listening", port);
|
||||
|
||||
while (1) {
|
||||
memset(buffer, 0, BUFFER_SIZE);
|
||||
|
@ -74,10 +87,13 @@ static void *taosNetBindUdpPort(void *sarg) {
|
|||
continue;
|
||||
}
|
||||
|
||||
uInfo("UDP: recv:%d bytes from %s at %d", iDataNum, taosInetNtoa(clientAddr.sin_addr), port);
|
||||
|
||||
if (iDataNum > 0) {
|
||||
uInfo("UDP: recv:%d bytes from %s:%d", iDataNum, taosInetNtoa(clientAddr.sin_addr), port);
|
||||
sendto(serverSocket, buffer, iDataNum, 0, (struct sockaddr *)&clientAddr, (int32_t)sin_size);
|
||||
iDataNum = taosSendto(serverSocket, buffer, iDataNum, 0, (struct sockaddr *)&clientAddr, (int32_t)sin_size);
|
||||
}
|
||||
|
||||
uInfo("UDP: send:%d bytes to %s at %d", iDataNum, taosInetNtoa(clientAddr.sin_addr), port);
|
||||
}
|
||||
|
||||
taosCloseSocket(serverSocket);
|
||||
|
@ -94,10 +110,9 @@ static void *taosNetBindTcpPort(void *sarg) {
|
|||
int32_t addr_len = sizeof(clientAddr);
|
||||
SOCKET client;
|
||||
char buffer[BUFFER_SIZE];
|
||||
int32_t iDataNum = 0;
|
||||
|
||||
if ((serverSocket = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP)) < 0) {
|
||||
uError("failed to create tcp socket since %s", strerror(errno));
|
||||
uError("failed to create TCP socket since %s", strerror(errno));
|
||||
return NULL;
|
||||
}
|
||||
|
||||
|
@ -106,130 +121,103 @@ static void *taosNetBindTcpPort(void *sarg) {
|
|||
server_addr.sin_port = htons(port);
|
||||
server_addr.sin_addr.s_addr = htonl(INADDR_ANY);
|
||||
|
||||
int32_t reuse = 1;
|
||||
if (taosSetSockOpt(serverSocket, SOL_SOCKET, SO_REUSEADDR, (void *)&reuse, sizeof(reuse)) < 0) {
|
||||
uError("setsockopt SO_REUSEADDR failed: %d (%s)", errno, strerror(errno));
|
||||
taosCloseSocket(serverSocket);
|
||||
return NULL;
|
||||
}
|
||||
|
||||
if (bind(serverSocket, (struct sockaddr *)&server_addr, sizeof(server_addr)) < 0) {
|
||||
uError("failed to bind tcp port:%d since %s", port, strerror(errno));
|
||||
uError("failed to bind TCP port:%d since %s", port, strerror(errno));
|
||||
return NULL;
|
||||
}
|
||||
|
||||
if (listen(serverSocket, 5) < 0) {
|
||||
uError("failed to listen tcp port:%d since %s", port, strerror(errno));
|
||||
|
||||
if (taosKeepTcpAlive(serverSocket) < 0) {
|
||||
uError("failed to set tcp server keep-alive option since %s", strerror(errno));
|
||||
taosCloseSocket(serverSocket);
|
||||
return NULL;
|
||||
}
|
||||
|
||||
uInfo("tcp server at port:%d is listening", port);
|
||||
if (listen(serverSocket, 10) < 0) {
|
||||
uError("failed to listen TCP port:%d since %s", port, strerror(errno));
|
||||
return NULL;
|
||||
}
|
||||
|
||||
uInfo("TCP server at port:%d is listening", port);
|
||||
|
||||
while (1) {
|
||||
client = accept(serverSocket, (struct sockaddr *)&clientAddr, (socklen_t *)&addr_len);
|
||||
if (client < 0) {
|
||||
uDebug("failed to accept from tcp port:%d since %s", port, strerror(errno));
|
||||
uDebug("TCP: failed to accept at port:%d since %s", port, strerror(errno));
|
||||
continue;
|
||||
}
|
||||
|
||||
iDataNum = 0;
|
||||
memset(buffer, 0, BUFFER_SIZE);
|
||||
int32_t nleft, nread;
|
||||
char * ptr = buffer;
|
||||
nleft = pinfo->pktLen;
|
||||
|
||||
while (nleft > 0) {
|
||||
nread = recv(client, ptr, BUFFER_SIZE, 0);
|
||||
|
||||
if (nread == 0) {
|
||||
break;
|
||||
} else if (nread < 0) {
|
||||
if (errno == EINTR) {
|
||||
continue;
|
||||
} else {
|
||||
uError("failed to perform recv func at %d since %s", port, strerror(errno));
|
||||
taosCloseSocket(serverSocket);
|
||||
return NULL;
|
||||
}
|
||||
} else {
|
||||
nleft -= nread;
|
||||
ptr += nread;
|
||||
iDataNum += nread;
|
||||
}
|
||||
int32_t ret = taosReadMsg(client, buffer, pinfo->pktLen);
|
||||
if (ret < 0 || ret != pinfo->pktLen) {
|
||||
uError("TCP: failed to read %d bytes at port:%d since %s", pinfo->pktLen, port, strerror(errno));
|
||||
taosCloseSocket(serverSocket);
|
||||
return NULL;
|
||||
}
|
||||
|
||||
if (iDataNum > 0) {
|
||||
uInfo("TCP: recv:%d bytes from %s:%d", iDataNum, taosInetNtoa(clientAddr.sin_addr), port);
|
||||
send(client, buffer, iDataNum, 0);
|
||||
uInfo("TCP: read:%d bytes from %s at %d", pinfo->pktLen, taosInetNtoa(clientAddr.sin_addr), port);
|
||||
|
||||
ret = taosWriteMsg(client, buffer, pinfo->pktLen);
|
||||
if (ret < 0) {
|
||||
uError("TCP: failed to write %d bytes at %d since %s", pinfo->pktLen, port, strerror(errno));
|
||||
taosCloseSocket(serverSocket);
|
||||
return NULL;
|
||||
}
|
||||
|
||||
uInfo("TCP: write:%d bytes to %s at %d", pinfo->pktLen, taosInetNtoa(clientAddr.sin_addr), port);
|
||||
}
|
||||
|
||||
|
||||
taosCloseSocket(serverSocket);
|
||||
return NULL;
|
||||
}
|
||||
|
||||
static int32_t taosNetCheckTcpPort(STestInfo *info) {
|
||||
SOCKET clientSocket;
|
||||
char sendbuf[BUFFER_SIZE];
|
||||
char recvbuf[BUFFER_SIZE];
|
||||
int32_t iDataNum = 0;
|
||||
SOCKET clientSocket;
|
||||
char buffer[BUFFER_SIZE] = {0};
|
||||
|
||||
struct sockaddr_in serverAddr;
|
||||
if ((clientSocket = socket(AF_INET, SOCK_STREAM, 0)) < 0) {
|
||||
uError("failed to create tcp client socket since %s", strerror(errno));
|
||||
uError("failed to create TCP client socket since %s", strerror(errno));
|
||||
return -1;
|
||||
}
|
||||
|
||||
// set send and recv overtime
|
||||
struct timeval timeout;
|
||||
timeout.tv_sec = 2; // s
|
||||
timeout.tv_usec = 0; // us
|
||||
if (setsockopt(clientSocket, SOL_SOCKET, SO_SNDTIMEO, (char *)&timeout, sizeof(struct timeval)) == -1) {
|
||||
uError("failed to setsockopt send timer since %s", strerror(errno));
|
||||
}
|
||||
if (setsockopt(clientSocket, SOL_SOCKET, SO_RCVTIMEO, (char *)&timeout, sizeof(struct timeval)) == -1) {
|
||||
uError("failed to setsockopt recv timer since %s", strerror(errno));
|
||||
int32_t reuse = 1;
|
||||
if (taosSetSockOpt(clientSocket, SOL_SOCKET, SO_REUSEADDR, (void *)&reuse, sizeof(reuse)) < 0) {
|
||||
uError("setsockopt SO_REUSEADDR failed: %d (%s)", errno, strerror(errno));
|
||||
taosCloseSocket(clientSocket);
|
||||
return -1;
|
||||
}
|
||||
|
||||
struct sockaddr_in serverAddr;
|
||||
memset((char *)&serverAddr, 0, sizeof(serverAddr));
|
||||
serverAddr.sin_family = AF_INET;
|
||||
serverAddr.sin_port = htons(info->port);
|
||||
serverAddr.sin_port = (uint16_t)htons((uint16_t)info->port);
|
||||
serverAddr.sin_addr.s_addr = info->hostIp;
|
||||
|
||||
if (connect(clientSocket, (struct sockaddr *)&serverAddr, sizeof(serverAddr)) < 0) {
|
||||
uError("failed to connect port:%d since %s", info->port, strerror(errno));
|
||||
uError("TCP: failed to connect port %s:%d since %s", taosIpStr(info->hostIp), info->port, strerror(errno));
|
||||
return -1;
|
||||
}
|
||||
|
||||
memset(sendbuf, 0, BUFFER_SIZE);
|
||||
memset(recvbuf, 0, BUFFER_SIZE);
|
||||
taosKeepTcpAlive(clientSocket);
|
||||
|
||||
struct in_addr ipStr;
|
||||
memcpy(&ipStr, &info->hostIp, 4);
|
||||
sprintf(sendbuf, "client send tcp pkg to %s:%d, content: 1122334455", taosInetNtoa(ipStr), info->port);
|
||||
sprintf(sendbuf + info->pktLen - 16, "1122334455667788");
|
||||
sprintf(buffer, "client send TCP pkg to %s:%d, content: 1122334455", taosIpStr(info->hostIp), info->port);
|
||||
sprintf(buffer + info->pktLen - 16, "1122334455667788");
|
||||
|
||||
send(clientSocket, sendbuf, info->pktLen, 0);
|
||||
|
||||
memset(recvbuf, 0, BUFFER_SIZE);
|
||||
int32_t nleft, nread;
|
||||
char * ptr = recvbuf;
|
||||
nleft = info->pktLen;
|
||||
|
||||
while (nleft > 0) {
|
||||
nread = recv(clientSocket, ptr, BUFFER_SIZE, 0);;
|
||||
|
||||
if (nread == 0) {
|
||||
break;
|
||||
} else if (nread < 0) {
|
||||
if (errno == EINTR) {
|
||||
continue;
|
||||
} else {
|
||||
uError("faild to recv pkg from TCP port:%d since %s", info->port, strerror(errno));
|
||||
taosCloseSocket(clientSocket);
|
||||
return -1;
|
||||
}
|
||||
} else {
|
||||
nleft -= nread;
|
||||
ptr += nread;
|
||||
iDataNum += nread;
|
||||
}
|
||||
int32_t ret = taosWriteMsg(clientSocket, buffer, info->pktLen);
|
||||
if (ret < 0) {
|
||||
uError("TCP: failed to write msg to %s:%d since %s", taosIpStr(info->hostIp), info->port, strerror(errno));
|
||||
return -1;
|
||||
}
|
||||
|
||||
if (iDataNum < info->pktLen) {
|
||||
uError("TCP: received ack:%d bytes, less than send:%d bytes from port:%d", iDataNum, info->pktLen, info->port);
|
||||
ret = taosReadMsg(clientSocket, buffer, info->pktLen);
|
||||
if (ret < 0) {
|
||||
uError("TCP: failed to read msg from %s:%d since %s", taosIpStr(info->hostIp), info->port, strerror(errno));
|
||||
return -1;
|
||||
}
|
||||
|
||||
|
@ -239,9 +227,9 @@ static int32_t taosNetCheckTcpPort(STestInfo *info) {
|
|||
|
||||
static int32_t taosNetCheckUdpPort(STestInfo *info) {
|
||||
SOCKET clientSocket;
|
||||
char sendbuf[BUFFER_SIZE];
|
||||
char recvbuf[BUFFER_SIZE];
|
||||
char buffer[BUFFER_SIZE] = {0};
|
||||
int32_t iDataNum = 0;
|
||||
int32_t bufSize = 1024000;
|
||||
|
||||
struct sockaddr_in serverAddr;
|
||||
|
||||
|
@ -250,41 +238,39 @@ static int32_t taosNetCheckUdpPort(STestInfo *info) {
|
|||
return -1;
|
||||
}
|
||||
|
||||
// set overtime
|
||||
struct timeval timeout;
|
||||
timeout.tv_sec = 2; // s
|
||||
timeout.tv_usec = 0; // us
|
||||
if (setsockopt(clientSocket, SOL_SOCKET, SO_SNDTIMEO, (char *)&timeout, sizeof(struct timeval)) == -1) {
|
||||
uError("failed to setsockopt send timer since %s", strerror(errno));
|
||||
if (taosSetSockOpt(clientSocket, SOL_SOCKET, SO_SNDBUF, (void *)&bufSize, sizeof(bufSize)) != 0) {
|
||||
uError("failed to set the send buffer size for UDP socket\n");
|
||||
return -1;
|
||||
}
|
||||
if (setsockopt(clientSocket, SOL_SOCKET, SO_RCVTIMEO, (char *)&timeout, sizeof(struct timeval)) == -1) {
|
||||
uError("failed to setsockopt recv timer since %s", strerror(errno));
|
||||
|
||||
if (taosSetSockOpt(clientSocket, SOL_SOCKET, SO_RCVBUF, (void *)&bufSize, sizeof(bufSize)) != 0) {
|
||||
uError("failed to set the receive buffer size for UDP socket\n");
|
||||
return -1;
|
||||
}
|
||||
|
||||
serverAddr.sin_family = AF_INET;
|
||||
serverAddr.sin_port = htons(info->port);
|
||||
serverAddr.sin_addr.s_addr = info->hostIp;
|
||||
|
||||
memset(sendbuf, 0, BUFFER_SIZE);
|
||||
memset(recvbuf, 0, BUFFER_SIZE);
|
||||
|
||||
struct in_addr ipStr;
|
||||
memcpy(&ipStr, &info->hostIp, 4);
|
||||
sprintf(sendbuf, "client send udp pkg to %s:%d, content: 1122334455", taosInetNtoa(ipStr), info->port);
|
||||
sprintf(sendbuf + info->pktLen - 16, "1122334455667788");
|
||||
sprintf(buffer, "client send udp pkg to %s:%d, content: 1122334455", taosInetNtoa(ipStr), info->port);
|
||||
sprintf(buffer + info->pktLen - 16, "1122334455667788");
|
||||
|
||||
socklen_t sin_size = sizeof(*(struct sockaddr *)&serverAddr);
|
||||
|
||||
int32_t code = sendto(clientSocket, sendbuf, info->pktLen, 0, (struct sockaddr *)&serverAddr, (int32_t)sin_size);
|
||||
if (code < 0) {
|
||||
uError("failed to perform sendto func since %s", strerror(errno));
|
||||
iDataNum = taosSendto(clientSocket, buffer, info->pktLen, 0, (struct sockaddr *)&serverAddr, (int32_t)sin_size);
|
||||
if (iDataNum < 0 || iDataNum != info->pktLen) {
|
||||
uError("UDP: failed to perform sendto func since %s", strerror(errno));
|
||||
return -1;
|
||||
}
|
||||
|
||||
iDataNum = recvfrom(clientSocket, recvbuf, BUFFER_SIZE, 0, (struct sockaddr *)&serverAddr, &sin_size);
|
||||
memset(buffer, 0, BUFFER_SIZE);
|
||||
sin_size = sizeof(*(struct sockaddr *)&serverAddr);
|
||||
iDataNum = recvfrom(clientSocket, buffer, BUFFER_SIZE, 0, (struct sockaddr *)&serverAddr, &sin_size);
|
||||
|
||||
if (iDataNum < info->pktLen) {
|
||||
uError("UDP: received ack:%d bytes, less than send:%d bytes from port:%d", iDataNum, info->pktLen, info->port);
|
||||
if (iDataNum < 0 || iDataNum != info->pktLen) {
|
||||
uError("UDP: received ack:%d bytes(expect:%d) from port:%d since %s", iDataNum, info->pktLen, info->port, strerror(errno));
|
||||
return -1;
|
||||
}
|
||||
|
||||
|
@ -304,19 +290,18 @@ static void taosNetCheckPort(uint32_t hostIp, int32_t startPort, int32_t endPort
|
|||
info.port = port;
|
||||
ret = taosNetCheckTcpPort(&info);
|
||||
if (ret != 0) {
|
||||
uError("failed to test tcp port:%d", port);
|
||||
uError("failed to test TCP port:%d", port);
|
||||
} else {
|
||||
uInfo("successed to test tcp port:%d", port);
|
||||
uInfo("successed to test TCP port:%d", port);
|
||||
}
|
||||
|
||||
ret = taosNetCheckUdpPort(&info);
|
||||
if (ret != 0) {
|
||||
uError("failed to test udp port:%d", port);
|
||||
uError("failed to test UDP port:%d", port);
|
||||
} else {
|
||||
uInfo("successed to test udp port:%d", port);
|
||||
uInfo("successed to test UDP port:%d", port);
|
||||
}
|
||||
}
|
||||
return;
|
||||
}
|
||||
|
||||
void *taosNetInitRpc(char *secretEncrypt, char spi) {
|
||||
|
@ -440,9 +425,9 @@ static void taosNetTestRpc(char *host, int32_t startPort, int32_t pkgLen) {
|
|||
|
||||
int32_t ret = taosNetCheckRpc(host, port, sendpkgLen, spi, NULL);
|
||||
if (ret < 0) {
|
||||
uError("failed to test tcp port:%d", port);
|
||||
uError("failed to test TCP port:%d", port);
|
||||
} else {
|
||||
uInfo("successed to test tcp port:%d", port);
|
||||
uInfo("successed to test TCP port:%d", port);
|
||||
}
|
||||
|
||||
if (pkgLen >= tsRpcMaxUdpSize) {
|
||||
|
@ -453,9 +438,9 @@ static void taosNetTestRpc(char *host, int32_t startPort, int32_t pkgLen) {
|
|||
|
||||
ret = taosNetCheckRpc(host, port, pkgLen, spi, NULL);
|
||||
if (ret < 0) {
|
||||
uError("failed to test udp port:%d", port);
|
||||
uError("failed to test UDP port:%d", port);
|
||||
} else {
|
||||
uInfo("successed to test udp port:%d", port);
|
||||
uInfo("successed to test UDP port:%d", port);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -492,14 +477,15 @@ static void taosNetTestServer(char *host, int32_t startPort, int32_t pkgLen) {
|
|||
tcpInfo->pktLen = pkgLen;
|
||||
|
||||
if (pthread_create(pids + i, NULL, taosNetBindTcpPort, tcpInfo) != 0) {
|
||||
uInfo("failed to create tcp test thread, %s:%d", tcpInfo->hostFqdn, tcpInfo->port);
|
||||
uInfo("failed to create TCP test thread, %s:%d", tcpInfo->hostFqdn, tcpInfo->port);
|
||||
exit(-1);
|
||||
}
|
||||
|
||||
STestInfo *udpInfo = uinfos + i;
|
||||
udpInfo->port = (uint16_t)(port + i);
|
||||
udpInfo->port = port + i;
|
||||
tcpInfo->pktLen = pkgLen;
|
||||
if (pthread_create(pids + num + i, NULL, taosNetBindUdpPort, udpInfo) != 0) {
|
||||
uInfo("failed to create udp test thread, %s:%d", tcpInfo->hostFqdn, tcpInfo->port);
|
||||
uInfo("failed to create UDP test thread, %s:%d", tcpInfo->hostFqdn, tcpInfo->port);
|
||||
exit(-1);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -228,7 +228,7 @@ static int32_t taosOpenNoteWithMaxLines(char *fn, int32_t maxLines, int32_t maxN
|
|||
}
|
||||
|
||||
void taosNotePrintBuffer(SNoteObj *pNote, char *buffer, int32_t len) {
|
||||
if (pNote->fd < 0) return;
|
||||
if (pNote->fd <= 0) return;
|
||||
taosWrite(pNote->fd, buffer, len);
|
||||
|
||||
if (pNote->maxLines > 0) {
|
||||
|
|
|
@ -441,7 +441,6 @@ static int32_t vnodeProcessTsdbStatus(void *arg, int32_t status, int32_t eno) {
|
|||
|
||||
if (status == TSDB_STATUS_COMMIT_START) {
|
||||
pVnode->isCommiting = 1;
|
||||
pVnode->fversion = pVnode->version;
|
||||
vDebug("vgId:%d, start commit, fver:%" PRIu64 " vver:%" PRIu64, pVnode->vgId, pVnode->fversion, pVnode->version);
|
||||
if (!vnodeInInitStatus(pVnode)) {
|
||||
return walRenew(pVnode->wal);
|
||||
|
@ -450,9 +449,10 @@ static int32_t vnodeProcessTsdbStatus(void *arg, int32_t status, int32_t eno) {
|
|||
}
|
||||
|
||||
if (status == TSDB_STATUS_COMMIT_OVER) {
|
||||
vDebug("vgId:%d, commit over, fver:%" PRIu64 " vver:%" PRIu64, pVnode->vgId, pVnode->fversion, pVnode->version);
|
||||
pVnode->isCommiting = 0;
|
||||
pVnode->isFull = 0;
|
||||
pVnode->fversion = pVnode->version;
|
||||
vDebug("vgId:%d, commit over, fver:%" PRIu64 " vver:%" PRIu64, pVnode->vgId, pVnode->fversion, pVnode->version);
|
||||
if (!vnodeInInitStatus(pVnode)) {
|
||||
walRemoveOneOldFile(pVnode->wal);
|
||||
}
|
||||
|
|
|
@ -166,14 +166,14 @@ int32_t walRestore(void *handle, void *pVnode, FWalWrite writeFp) {
|
|||
char walName[WAL_FILE_LEN];
|
||||
snprintf(walName, sizeof(pWal->name), "%s/%s%" PRId64, pWal->path, WAL_PREFIX, fileId);
|
||||
|
||||
wDebug("vgId:%d, file:%s, will be restored", pWal->vgId, walName);
|
||||
wInfo("vgId:%d, file:%s, will be restored", pWal->vgId, walName);
|
||||
int32_t code = walRestoreWalFile(pWal, pVnode, writeFp, walName, fileId);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
wError("vgId:%d, file:%s, failed to restore since %s", pWal->vgId, walName, tstrerror(code));
|
||||
continue;
|
||||
}
|
||||
|
||||
wDebug("vgId:%d, file:%s, restore success", pWal->vgId, walName);
|
||||
wInfo("vgId:%d, file:%s, restore success", pWal->vgId, walName);
|
||||
|
||||
count++;
|
||||
}
|
||||
|
@ -326,7 +326,7 @@ static int32_t walRestoreWalFile(SWal *pWal, void *pVnode, FWalWrite writeFp, ch
|
|||
|
||||
offset = offset + sizeof(SWalHead) + pHead->len;
|
||||
|
||||
wDebug("vgId:%d, restore wal, fileId:%" PRId64 " hver:%" PRIu64 " wver:%" PRIu64 " len:%d", pWal->vgId,
|
||||
wTrace("vgId:%d, restore wal, fileId:%" PRId64 " hver:%" PRIu64 " wver:%" PRIu64 " len:%d", pWal->vgId,
|
||||
fileId, pHead->version, pWal->version, pHead->len);
|
||||
|
||||
pWal->version = pHead->version;
|
||||
|
|
|
@ -1,3 +1,4 @@
|
|||
|
||||
// execute this before anything else, including requesting any time on an agent
|
||||
if (currentBuild.rawBuild.getCauses().toString().contains('BranchIndexingCause')) {
|
||||
print "INFO: Build skipped due to trigger being Branch Indexing"
|
||||
|
@ -7,6 +8,7 @@ if (currentBuild.rawBuild.getCauses().toString().contains('BranchIndexingCause')
|
|||
properties([pipelineTriggers([githubPush()])])
|
||||
node {
|
||||
git url: 'https://github.com/taosdata/TDengine.git'
|
||||
|
||||
}
|
||||
|
||||
def pre_test(){
|
||||
|
|
|
@ -38,11 +38,16 @@
|
|||
<artifactId>h2</artifactId>
|
||||
<scope>runtime</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>com.alibaba</groupId>
|
||||
<artifactId>druid</artifactId>
|
||||
<version>1.1.17</version>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>com.taosdata.jdbc</groupId>
|
||||
<artifactId>taos-jdbcdriver</artifactId>
|
||||
<version>2.0.11</version>
|
||||
<version>2.0.14</version>
|
||||
</dependency>
|
||||
|
||||
<!-- https://mvnrepository.com/artifact/mysql/mysql-connector-java -->
|
||||
|
|
|
@ -1,17 +1,5 @@
|
|||
spring:
|
||||
datasource:
|
||||
# driver-class-name: org.h2.Driver
|
||||
# schema: classpath:db/schema-mysql.sql
|
||||
# data: classpath:db/data-mysql.sql
|
||||
# url: jdbc:h2:mem:test
|
||||
# username: root
|
||||
# password: test
|
||||
|
||||
# driver-class-name: com.mysql.jdbc.Driver
|
||||
# url: jdbc:mysql://master:3306/test?useSSL=false
|
||||
# username: root
|
||||
# password: 123456
|
||||
|
||||
driver-class-name: com.taosdata.jdbc.TSDBDriver
|
||||
url: jdbc:TAOS://localhost:6030/mp_test
|
||||
user: root
|
||||
|
@ -20,6 +8,12 @@ spring:
|
|||
locale: en_US.UTF-8
|
||||
timezone: UTC-8
|
||||
|
||||
druid:
|
||||
initial-size: 5
|
||||
min-idle: 5
|
||||
max-active: 5
|
||||
|
||||
|
||||
mybatis-plus:
|
||||
configuration:
|
||||
map-underscore-to-camel-case: false
|
||||
|
|
|
@ -65,7 +65,18 @@ function runQueryPerfTest {
|
|||
echoInfo "Run Performance Test"
|
||||
cd $WORK_DIR/TDengine/tests/pytest
|
||||
|
||||
python3 query/queryPerformance.py 0 | tee -a $PERFORMANCE_TEST_REPORT
|
||||
python3 query/queryPerformance.py -c $LOCAL_COMMIT | tee -a $PERFORMANCE_TEST_REPORT
|
||||
|
||||
python3 insert/insertFromCSVPerformance.py -c $LOCAL_COMMIT | tee -a $PERFORMANCE_TEST_REPORT
|
||||
|
||||
yes | taosdemo -c /etc/taosperf/ -d taosdemo_insert_test -t 1000 -n 1000 > taosdemoperf.txt
|
||||
|
||||
CREATETABLETIME=`grep 'Spent' taosdemoperf.txt | awk 'NR==1{print $2}'`
|
||||
INSERTRECORDSTIME=`grep 'Spent' taosdemoperf.txt | awk 'NR==2{print $2}'`
|
||||
REQUESTSPERSECOND=`grep 'Spent' taosdemoperf.txt | awk 'NR==2{print $13}'`
|
||||
|
||||
python3 tools/taosdemoPerformance.py -c $LOCAL_COMMIT -t $CREATETABLETIME -i $INSERTRECORDSTIME -r $REQUESTSPERSECOND | tee -a $PERFORMANCE_TEST_REPORT
|
||||
[ -f taosdemoperf.txt ] && rm taosdemoperf.txt
|
||||
}
|
||||
|
||||
|
||||
|
|
|
@ -0,0 +1,85 @@
|
|||
###################################################################
|
||||
# Copyright (c) 2016 by TAOS Technologies, Inc.
|
||||
# All rights reserved.
|
||||
#
|
||||
# This file is proprietary and confidential to TAOS Technologies.
|
||||
# No part of this file may be reproduced, stored, transmitted,
|
||||
# disclosed or used in any form or by any means other than as
|
||||
# expressly provided by the written permission from Jianhui Tao
|
||||
#
|
||||
###################################################################
|
||||
|
||||
# -*- coding: utf-8 -*-
|
||||
|
||||
import sys
|
||||
from util.log import *
|
||||
from util.cases import *
|
||||
from util.sql import *
|
||||
from util.dnodes import *
|
||||
import taos
|
||||
if __name__ == "__main__":
|
||||
|
||||
logSql = True
|
||||
deployPath = ""
|
||||
testCluster = False
|
||||
valgrind = 0
|
||||
|
||||
print("start to execute %s" % __file__)
|
||||
tdDnodes.init(deployPath)
|
||||
tdDnodes.setTestCluster(testCluster)
|
||||
tdDnodes.setValgrind(valgrind)
|
||||
|
||||
tdDnodes.stopAll()
|
||||
tdDnodes.addSimExtraCfg("maxSQLLength", "1048576")
|
||||
tdDnodes.deploy(1)
|
||||
tdDnodes.start(1)
|
||||
host = '127.0.0.1'
|
||||
|
||||
tdLog.info("Procedures for tdengine deployed in %s" % (host))
|
||||
|
||||
tdCases.logSql(logSql)
|
||||
print('1')
|
||||
conn = taos.connect(
|
||||
host,
|
||||
config=tdDnodes.getSimCfgPath())
|
||||
|
||||
tdSql.init(conn.cursor(), True)
|
||||
|
||||
print("==========step1")
|
||||
print("create table ")
|
||||
tdSql.execute("create database db")
|
||||
tdSql.execute("use db")
|
||||
tdSql.execute("create table t1 (ts timestamp, c1 int,c2 int ,c3 int)")
|
||||
|
||||
print("==========step2")
|
||||
print("insert maxSQLLength data ")
|
||||
data = 'insert into t1 values'
|
||||
ts = 1604298064000
|
||||
i = 0
|
||||
while ((len(data)<(1024*1024)) & (i < 32767 - 1) ):
|
||||
data += '(%s,%d,%d,%d)'%(ts+i,i%1000,i%1000,i%1000)
|
||||
i+=1
|
||||
tdSql.execute(data)
|
||||
|
||||
print("==========step4")
|
||||
print("insert data batch larger than 32767 ")
|
||||
i = 0
|
||||
while ((len(data)<(1024*1024)) & (i < 32767) ):
|
||||
data += '(%s,%d,%d,%d)'%(ts+i,i%1000,i%1000,i%1000)
|
||||
i+=1
|
||||
tdSql.error(data)
|
||||
|
||||
print("==========step4")
|
||||
print("insert data larger than maxSQLLength ")
|
||||
tdSql.execute("create table t2 (ts timestamp, c1 binary(50))")
|
||||
data = 'insert into t2 values'
|
||||
i = 0
|
||||
while ((len(data)<(1024*1024)) & (i < 32767 - 1 ) ):
|
||||
data += '(%s,%s)'%(ts+i,'a'*50)
|
||||
i+=1
|
||||
tdSql.error(data)
|
||||
tdSql.close()
|
||||
tdLog.success("%s successfully executed" % __file__)
|
||||
|
||||
|
||||
|
|
@ -20,13 +20,23 @@ python3 insert/retentionpolicy.py
|
|||
python3 ./test.py -f insert/alterTableAndInsert.py
|
||||
python3 ./test.py -f insert/insertIntoTwoTables.py
|
||||
python3 ./test.py -f insert/before_1970.py
|
||||
python3 bug2265.py
|
||||
|
||||
#table
|
||||
python3 ./test.py -f table/alter_wal0.py
|
||||
python3 ./test.py -f table/column_name.py
|
||||
python3 ./test.py -f table/column_num.py
|
||||
python3 ./test.py -f table/db_table.py
|
||||
python3 ./test.py -f table/create_sensitive.py
|
||||
#python3 ./test.py -f table/tablename-boundary.py
|
||||
python3 ./test.py -f table/max_table_length.py
|
||||
python3 ./test.py -f table/alter_column.py
|
||||
python3 ./test.py -f table/boundary.py
|
||||
python3 ./test.py -f table/create-a-lot.py
|
||||
python3 ./test.py -f table/create.py
|
||||
python3 ./test.py -f table/del_stable.py
|
||||
python3 ./test.py -f table/queryWithTaosdKilled.py
|
||||
|
||||
|
||||
# tag
|
||||
python3 ./test.py -f tag_lite/filter.py
|
||||
|
@ -135,9 +145,6 @@ python3 ./test.py -f user/pass_len.py
|
|||
# stable
|
||||
python3 ./test.py -f stable/query_after_reset.py
|
||||
|
||||
# table
|
||||
python3 ./test.py -f table/del_stable.py
|
||||
|
||||
#query
|
||||
python3 ./test.py -f query/filter.py
|
||||
python3 ./test.py -f query/filterCombo.py
|
||||
|
@ -164,7 +171,8 @@ python3 ./test.py -f query/bug2117.py
|
|||
python3 ./test.py -f query/bug2143.py
|
||||
python3 ./test.py -f query/sliding.py
|
||||
python3 ./test.py -f query/unionAllTest.py
|
||||
|
||||
python3 ./test.py -f query/bug2281.py
|
||||
python3 ./test.py -f query/bug2119.py
|
||||
#stream
|
||||
python3 ./test.py -f stream/metric_1.py
|
||||
python3 ./test.py -f stream/new.py
|
||||
|
|
|
@ -0,0 +1,131 @@
|
|||
###################################################################
|
||||
# Copyright (c) 2016 by TAOS Technologies, Inc.
|
||||
# All rights reserved.
|
||||
#
|
||||
# This file is proprietary and confidential to TAOS Technologies.
|
||||
# No part of this file may be reproduced, stored, transmitted,
|
||||
# disclosed or used in any form or by any means other than as
|
||||
# expressly provided by the written permission from Jianhui Tao
|
||||
#
|
||||
###################################################################
|
||||
|
||||
# -*- coding: utf-8 -*-
|
||||
|
||||
import sys
|
||||
import taos
|
||||
import time
|
||||
import datetime
|
||||
import csv
|
||||
import random
|
||||
import pandas as pd
|
||||
import argparse
|
||||
import os.path
|
||||
|
||||
class insertFromCSVPerformace:
|
||||
def __init__(self, commitID, dbName, stbName, branchName):
|
||||
self.commitID = commitID
|
||||
self.dbName = dbName
|
||||
self.stbName = stbName
|
||||
self.branchName = branchName
|
||||
self.ts = 1500074556514
|
||||
self.host = "127.0.0.1"
|
||||
self.user = "root"
|
||||
self.password = "taosdata"
|
||||
self.config = "/etc/taosperf"
|
||||
self.conn = taos.connect(
|
||||
self.host,
|
||||
self.user,
|
||||
self.password,
|
||||
self.config)
|
||||
|
||||
def writeCSV(self):
|
||||
with open('test3.csv','w', encoding='utf-8', newline='') as csvFile:
|
||||
writer = csv.writer(csvFile, dialect='excel')
|
||||
for i in range(1000000):
|
||||
newTimestamp = self.ts + random.randint(10000000, 10000000000) + random.randint(1000, 10000000) + random.randint(1, 1000)
|
||||
d = datetime.datetime.fromtimestamp(newTimestamp / 1000)
|
||||
dt = str(d.strftime("%Y-%m-%d %H:%M:%S.%f"))
|
||||
writer.writerow(["'%s'" % dt, random.randint(1, 100), random.uniform(1, 100), random.randint(1, 100), random.randint(1, 100)])
|
||||
|
||||
def removCSVHeader(self):
|
||||
data = pd.read_csv("ordered.csv")
|
||||
data = data.drop([0])
|
||||
data.to_csv("ordered.csv", header = False, index = False)
|
||||
|
||||
def createTables(self):
|
||||
cursor = self.conn.cursor()
|
||||
|
||||
cursor.execute("create database if not exists %s" % self.dbName)
|
||||
cursor.execute("use %s" % self.dbName)
|
||||
cursor.execute("create table if not exists %s(ts timestamp, in_order_time float, out_of_order_time float, commit_id binary(50)) tags(branch binary(50))" % self.stbName)
|
||||
cursor.execute("create table if not exists %s using %s tags('%s')" % (self.branchName, self.stbName, self.branchName))
|
||||
|
||||
cursor.execute("create table if not exists t1(ts timestamp, c1 int, c2 float, c3 int, c4 int)")
|
||||
cursor.execute("create table if not exists t2(ts timestamp, c1 int, c2 float, c3 int, c4 int)")
|
||||
|
||||
cursor.close()
|
||||
|
||||
def run(self):
|
||||
cursor = self.conn.cursor()
|
||||
cursor.execute("use %s" % self.dbName)
|
||||
print("==================== CSV insert performance ====================")
|
||||
|
||||
totalTime = 0
|
||||
for i in range(10):
|
||||
cursor.execute("create table if not exists t1(ts timestamp, c1 int, c2 float, c3 int, c4 int)")
|
||||
startTime = time.time()
|
||||
cursor.execute("insert into t1 file 'outoforder.csv'")
|
||||
totalTime += time.time() - startTime
|
||||
cursor.execute("drop table if exists t1")
|
||||
out_of_order_time = (float) (totalTime / 10)
|
||||
print("Out of Order - Insert time: %f" % out_of_order_time)
|
||||
|
||||
totalTime = 0
|
||||
for i in range(10):
|
||||
cursor.execute("create table if not exists t2(ts timestamp, c1 int, c2 float, c3 int, c4 int)")
|
||||
startTime = time.time()
|
||||
cursor.execute("insert into t2 file 'ordered.csv'")
|
||||
totalTime += time.time() - startTime
|
||||
cursor.execute("drop table if exists t2")
|
||||
|
||||
in_order_time = (float) (totalTime / 10)
|
||||
print("In order - Insert time: %f" % in_order_time)
|
||||
cursor.execute("insert into %s values(now, %f, %f, '%s')" % (self.branchName, in_order_time, out_of_order_time, self.commitID))
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
parser = argparse.ArgumentParser()
|
||||
parser.add_argument(
|
||||
'-c',
|
||||
'--commit-id',
|
||||
action='store',
|
||||
default='null',
|
||||
type=str,
|
||||
help='git commit id (default: null)')
|
||||
parser.add_argument(
|
||||
'-d',
|
||||
'--database-name',
|
||||
action='store',
|
||||
default='perf',
|
||||
type=str,
|
||||
help='Database name to be created (default: perf)')
|
||||
parser.add_argument(
|
||||
'-t',
|
||||
'--stable-name',
|
||||
action='store',
|
||||
default='csv_insert',
|
||||
type=str,
|
||||
help='Database name to be created (default: csv_insert)')
|
||||
parser.add_argument(
|
||||
'-b',
|
||||
'--branch-name',
|
||||
action='store',
|
||||
default='develop',
|
||||
type=str,
|
||||
help='branch name (default: develop)')
|
||||
|
||||
args = parser.parse_args()
|
||||
perftest = insertFromCSVPerformace(args.commit_id, args.database_name, args.stable_name, args.branch_name)
|
||||
|
||||
perftest.createTables()
|
||||
perftest.run()
|
|
@ -20,12 +20,20 @@ python3 insert/retentionpolicy.py
|
|||
python3 ./test.py -f insert/alterTableAndInsert.py
|
||||
python3 ./test.py -f insert/insertIntoTwoTables.py
|
||||
|
||||
#table
|
||||
python3 ./test.py -f table/alter_wal0.py
|
||||
python3 ./test.py -f table/column_name.py
|
||||
python3 ./test.py -f table/column_num.py
|
||||
python3 ./test.py -f table/db_table.py
|
||||
python3 ./test.py -f table/create_sensitive.py
|
||||
#python3 ./test.py -f table/tablename-boundary.py
|
||||
python3 ./test.py -f table/max_table_length.py
|
||||
python3 ./test.py -f table/alter_column.py
|
||||
python3 ./test.py -f table/boundary.py
|
||||
python3 ./test.py -f table/create-a-lot.py
|
||||
python3 ./test.py -f table/create.py
|
||||
python3 ./test.py -f table/del_stable.py
|
||||
python3 ./test.py -f table/queryWithTaosdKilled.py
|
||||
|
||||
# tag
|
||||
python3 ./test.py -f tag_lite/filter.py
|
||||
|
@ -134,9 +142,6 @@ python3 ./test.py -f user/pass_len.py
|
|||
# stable
|
||||
python3 ./test.py -f stable/query_after_reset.py
|
||||
|
||||
# table
|
||||
python3 ./test.py -f table/del_stable.py
|
||||
|
||||
#query
|
||||
python3 ./test.py -f query/filter.py
|
||||
python3 ./test.py -f query/filterCombo.py
|
||||
|
@ -159,7 +164,9 @@ python3 ./test.py -f query/bug1874.py
|
|||
python3 ./test.py -f query/bug1875.py
|
||||
python3 ./test.py -f query/bug1876.py
|
||||
python3 ./test.py -f query/bug2218.py
|
||||
|
||||
python3 ./test.py -f query/bug2281.py
|
||||
python3 ./test.py -f query/bug2119.py
|
||||
python3 bug2265.py
|
||||
#stream
|
||||
python3 ./test.py -f stream/metric_1.py
|
||||
python3 ./test.py -f stream/new.py
|
||||
|
|
|
@ -0,0 +1,41 @@
|
|||
###################################################################
|
||||
# Copyright (c) 2016 by TAOS Technologies, Inc.
|
||||
# All rights reserved.
|
||||
#
|
||||
# This file is proprietary and confidential to TAOS Technologies.
|
||||
# No part of this file may be reproduced, stored, transmitted,
|
||||
# disclosed or used in any form or by any means other than as
|
||||
# expressly provided by the written permission from Jianhui Tao
|
||||
#
|
||||
###################################################################
|
||||
|
||||
# -*- coding: utf-8 -*-
|
||||
|
||||
import sys
|
||||
from util.log import *
|
||||
from util.cases import *
|
||||
from util.sql import *
|
||||
from util.dnodes import *
|
||||
class TDTestCase:
|
||||
def init(self, conn, logSql):
|
||||
tdLog.debug("start to execute %s" % __file__)
|
||||
tdSql.init(conn.cursor(), logSql)
|
||||
|
||||
def run(self):
|
||||
tdSql.prepare()
|
||||
print("==========step1")
|
||||
print("create table && insert data")
|
||||
|
||||
tdSql.execute("create table t1 (ts timestamp, c1 int, c2 float)")
|
||||
|
||||
|
||||
print("==========step2")
|
||||
print("query percentile from blank table")
|
||||
tdSql.query('select percentile(c1,1) from t1')
|
||||
|
||||
def stop(self):
|
||||
tdSql.close()
|
||||
tdLog.success("%s successfully executed" % __file__)
|
||||
|
||||
tdCases.addWindows(__file__, TDTestCase())
|
||||
tdCases.addLinux(__file__, TDTestCase())
|
|
@ -0,0 +1,47 @@
|
|||
###################################################################
|
||||
# Copyright (c) 2016 by TAOS Technologies, Inc.
|
||||
# All rights reserved.
|
||||
#
|
||||
# This file is proprietary and confidential to TAOS Technologies.
|
||||
# No part of this file may be reproduced, stored, transmitted,
|
||||
# disclosed or used in any form or by any means other than as
|
||||
# expressly provided by the written permission from Jianhui Tao
|
||||
#
|
||||
###################################################################
|
||||
|
||||
# -*- coding: utf-8 -*-
|
||||
|
||||
import sys
|
||||
from util.log import *
|
||||
from util.cases import *
|
||||
from util.sql import *
|
||||
from util.dnodes import *
|
||||
class TDTestCase:
|
||||
def init(self, conn, logSql):
|
||||
tdLog.debug("start to execute %s" % __file__)
|
||||
tdSql.init(conn.cursor(), logSql)
|
||||
|
||||
def run(self):
|
||||
tdSql.prepare()
|
||||
print("==========step1")
|
||||
print("create table && insert data")
|
||||
|
||||
tdSql.execute("create table t1 (ts timestamp, c1 int, c2 float)")
|
||||
insertRows = 10
|
||||
t0 = 1604298064000
|
||||
tdLog.info("insert %d rows" % (insertRows))
|
||||
for i in range(insertRows):
|
||||
ret = tdSql.execute(
|
||||
"insert into t1 values (%d , %d,%d)" %
|
||||
(t0+i,i%100,i/2.0))
|
||||
|
||||
print("==========step2")
|
||||
print("query diff && top")
|
||||
tdSql.error('select diff(c1),top(c2) from t1')
|
||||
|
||||
def stop(self):
|
||||
tdSql.close()
|
||||
tdLog.success("%s successfully executed" % __file__)
|
||||
|
||||
tdCases.addWindows(__file__, TDTestCase())
|
||||
tdCases.addLinux(__file__, TDTestCase())
|
|
@ -16,10 +16,16 @@ import sys
|
|||
import os
|
||||
import taos
|
||||
import time
|
||||
import argparse
|
||||
|
||||
|
||||
class taosdemoQueryPerformace:
|
||||
def initConnection(self):
|
||||
def __init__(self, clearCache, commitID, dbName, stbName, tbPerfix):
|
||||
self.clearCache = clearCache
|
||||
self.commitID = commitID
|
||||
self.dbName = dbName
|
||||
self.stbName = stbName
|
||||
self.tbPerfix = tbPerfix
|
||||
self.host = "127.0.0.1"
|
||||
self.user = "root"
|
||||
self.password = "taosdata"
|
||||
|
@ -30,92 +36,109 @@ class taosdemoQueryPerformace:
|
|||
self.password,
|
||||
self.config)
|
||||
|
||||
def createPerfTables(self):
|
||||
cursor = self.conn.cursor()
|
||||
cursor.execute("create database if not exists %s" % self.dbName)
|
||||
cursor.execute("use %s" % self.dbName)
|
||||
cursor.execute("create table if not exists %s(ts timestamp, query_time float, commit_id binary(50)) tags(query_id int, query_sql binary(300))" % self.stbName)
|
||||
|
||||
sql = "select count(*) from test.meters"
|
||||
tableid = 1
|
||||
cursor.execute("create table if not exists %s%d using %s tags(%d, '%s')" % (self.tbPerfix, tableid, self.stbName, tableid, sql))
|
||||
sql = "select avg(f1), max(f2), min(f3) from test.meters"
|
||||
tableid = 2
|
||||
cursor.execute("create table if not exists %s%d using %s tags(%d, '%s')" % (self.tbPerfix, tableid, self.stbName, tableid, sql))
|
||||
sql = "select count(*) from test.meters where loc='beijing'"
|
||||
tableid = 3
|
||||
cursor.execute("create table if not exists %s%d using %s tags(%d, \"%s\")" % (self.tbPerfix, tableid, self.stbName, tableid, sql))
|
||||
sql = "select avg(f1), max(f2), min(f3) from test.meters where areaid=10"
|
||||
tableid = 4
|
||||
cursor.execute("create table if not exists %s%d using %s tags(%d, '%s')" % (self.tbPerfix, tableid, self.stbName, tableid, sql))
|
||||
sql = "select avg(f1), max(f2), min(f3) from test.t10 interval(10s)"
|
||||
tableid = 5
|
||||
cursor.execute("create table if not exists %s%d using %s tags(%d, '%s')" % (self.tbPerfix, tableid, self.stbName, tableid, sql))
|
||||
sql = "select last_row(*) from meters"
|
||||
tableid = 6
|
||||
cursor.execute("create table if not exists %s%d using %s tags(%d, '%s')" % (self.tbPerfix, tableid, self.stbName, tableid, sql))
|
||||
sql = "select * from meters"
|
||||
tableid = 7
|
||||
cursor.execute("create table if not exists %s%d using %s tags(%d, '%s')" % (self.tbPerfix, tableid, self.stbName, tableid, sql))
|
||||
sql = "select avg(f1), max(f2), min(f3) from meters where ts <= '2017-07-15 10:40:01.000' and ts <= '2017-07-15 14:00:40.000'"
|
||||
tableid = 8
|
||||
cursor.execute("create table if not exists %s%d using %s tags(%d, \"%s\")" % (self.tbPerfix, tableid, self.stbName, tableid, sql))
|
||||
|
||||
cursor.close()
|
||||
|
||||
def query(self):
|
||||
cursor = self.conn.cursor()
|
||||
cursor.execute("use test")
|
||||
cursor = self.conn.cursor()
|
||||
print("==================== query performance ====================")
|
||||
|
||||
totalTime = 0
|
||||
for i in range(100):
|
||||
if(sys.argv[1] == '1'):
|
||||
# root permission is required
|
||||
os.system("echo 3 > /proc/sys/vm/drop_caches")
|
||||
startTime = time.time()
|
||||
cursor.execute("select count(*) from test.meters")
|
||||
totalTime += time.time() - startTime
|
||||
print("query time for: select count(*) from test.meters %f seconds" % (totalTime / 100))
|
||||
cursor.execute("use %s" % self.dbName)
|
||||
cursor.execute("select tbname, query_id, query_sql from %s" % self.stbName)
|
||||
|
||||
totalTime = 0
|
||||
for i in range(100):
|
||||
if(sys.argv[1] == '1'):
|
||||
# root permission is required
|
||||
os.system("echo 3 > /proc/sys/vm/drop_caches")
|
||||
startTime = time.time()
|
||||
cursor.execute("select avg(f1), max(f2), min(f3) from test.meters")
|
||||
totalTime += time.time() - startTime
|
||||
print("query time for: select avg(f1), max(f2), min(f3) from test.meters %f seconds" % (totalTime / 100))
|
||||
for data in cursor:
|
||||
table_name = data[0]
|
||||
query_id = data[1]
|
||||
sql = data[2]
|
||||
|
||||
totalTime = 0
|
||||
cursor2 = self.conn.cursor()
|
||||
cursor2.execute("use test")
|
||||
for i in range(100):
|
||||
if(self.clearCache == True):
|
||||
# root permission is required
|
||||
os.system("echo 3 > /proc/sys/vm/drop_caches")
|
||||
|
||||
startTime = time.time()
|
||||
cursor2.execute(sql)
|
||||
totalTime += time.time() - startTime
|
||||
cursor2.close()
|
||||
print("query time for: %s %f seconds" % (sql, totalTime / 100))
|
||||
|
||||
cursor3 = self.conn.cursor()
|
||||
cursor3.execute("insert into %s.%s values(now, %f, '%s')" % (self.dbName, table_name, totalTime / 100, self.commitID))
|
||||
|
||||
totalTime = 0
|
||||
for i in range(100):
|
||||
if(sys.argv[1] == '1'):
|
||||
# root permission is required
|
||||
os.system("echo 3 > /proc/sys/vm/drop_caches")
|
||||
startTime = time.time()
|
||||
cursor.execute("select count(*) from test.meters where loc='beijing'")
|
||||
totalTime += time.time() - startTime
|
||||
print("query time for: select count(*) from test.meters where loc='beijing' %f seconds" % (totalTime / 100))
|
||||
|
||||
totalTime = 0
|
||||
for i in range(100):
|
||||
if(sys.argv[1] == '1'):
|
||||
# root permission is required
|
||||
os.system("echo 3 > /proc/sys/vm/drop_caches")
|
||||
startTime = time.time()
|
||||
cursor.execute("select avg(f1), max(f2), min(f3) from test.meters where areaid=10")
|
||||
totalTime += time.time() - startTime
|
||||
print("query time for: select avg(f1), max(f2), min(f3) from test.meters where areaid=10 %f seconds" % (totalTime / 100))
|
||||
|
||||
totalTime = 0
|
||||
for i in range(100):
|
||||
if(sys.argv[1] == '1'):
|
||||
# root permission is required
|
||||
os.system("echo 3 > /proc/sys/vm/drop_caches")
|
||||
startTime = time.time()
|
||||
cursor.execute("select avg(f1), max(f2), min(f3) from test.t10 interval(10s)")
|
||||
totalTime += time.time() - startTime
|
||||
print("query time for: select avg(f1), max(f2), min(f3) from test.t10 interval(10s) %f seconds" % (totalTime / 100))
|
||||
|
||||
totalTime = 0
|
||||
for i in range(100):
|
||||
if(sys.argv[1] == '1'):
|
||||
# root permission is required
|
||||
os.system("echo 3 > /proc/sys/vm/drop_caches")
|
||||
startTime = time.time()
|
||||
cursor.execute("select last_row(*) from meters")
|
||||
totalTime += time.time() - startTime
|
||||
print("query time for: select last_row(*) from meters %f seconds" % (totalTime / 100))
|
||||
|
||||
totalTime = 0
|
||||
for i in range(100):
|
||||
if(sys.argv[1] == '1'):
|
||||
# root permission is required
|
||||
os.system("echo 3 > /proc/sys/vm/drop_caches")
|
||||
startTime = time.time()
|
||||
cursor.execute("select * from meters")
|
||||
totalTime += time.time() - startTime
|
||||
print("query time for: select * from meters %f seconds" % (totalTime / 100))
|
||||
|
||||
totalTime = 0
|
||||
for i in range(100):
|
||||
if(sys.argv[1] == '1'):
|
||||
# root permission is required
|
||||
os.system("echo 3 > /proc/sys/vm/drop_caches")
|
||||
startTime = time.time()
|
||||
cursor.execute("select avg(f1), max(f2), min(f3) from meters where ts <= '2017-07-15 10:40:01.000' and ts <= '2017-07-15 14:00:40.000'")
|
||||
totalTime += time.time() - startTime
|
||||
print("query time for: select avg(f1), max(f2), min(f3) from meters where ts <= '2017-07-15 10:40:01.000' and ts <= '2017-07-15 14:00:40.000' %f seconds" % (totalTime / 100))
|
||||
cursor3.close()
|
||||
cursor.close()
|
||||
|
||||
if __name__ == '__main__':
|
||||
perftest = taosdemoQueryPerformace()
|
||||
perftest.initConnection()
|
||||
perftest.query()
|
||||
parser = argparse.ArgumentParser()
|
||||
parser.add_argument(
|
||||
'-r',
|
||||
'--remove-cache',
|
||||
action='store_true',
|
||||
default=False,
|
||||
help='clear cache before query (default: False)')
|
||||
parser.add_argument(
|
||||
'-c',
|
||||
'--commit-id',
|
||||
action='store',
|
||||
default='null',
|
||||
type=str,
|
||||
help='git commit id (default: null)')
|
||||
parser.add_argument(
|
||||
'-d',
|
||||
'--database-name',
|
||||
action='store',
|
||||
default='perf',
|
||||
type=str,
|
||||
help='Database name to be created (default: perf)')
|
||||
parser.add_argument(
|
||||
'-t',
|
||||
'--stable-name',
|
||||
action='store',
|
||||
default='query_tb',
|
||||
type=str,
|
||||
help='table name to be created (default: query_tb)')
|
||||
parser.add_argument(
|
||||
'-p',
|
||||
'--table-perfix',
|
||||
action='store',
|
||||
default='q',
|
||||
type=str,
|
||||
help='table name perfix (default: q)')
|
||||
|
||||
args = parser.parse_args()
|
||||
perftest = taosdemoQueryPerformace(args.remove_cache, args.commit_id, args.database_name, args.stable_name, args.table_perfix)
|
||||
perftest.createPerfTables()
|
||||
perftest.query()
|
||||
|
|
|
@ -52,7 +52,7 @@ class TDTestCase:
|
|||
tdSql.checkRows(5)
|
||||
|
||||
sql = ''' select * from st where loc = 'nchar0' limit 1 union all select * from st where loc = 'nchar1' limit 1 union all select * from st where loc = 'nchar2' limit 1
|
||||
union all select * from st where loc = 'nchar3' limit 1 union all select * from st where loc = 'nchar4' limit 1 union all select * from st where loc = 'nchar5''''
|
||||
union all select * from st where loc = 'nchar3' limit 1 union all select * from st where loc = 'nchar4' limit 1 union all select * from st where loc = 'nchar5' limit 1'''
|
||||
tdSql.query(sql)
|
||||
tdSql.checkRows(6)
|
||||
|
||||
|
|
|
@ -0,0 +1,41 @@
|
|||
###################################################################
|
||||
# Copyright (c) 2016 by TAOS Technologies, Inc.
|
||||
# All rights reserved.
|
||||
#
|
||||
# This file is proprietary and confidential to TAOS Technologies.
|
||||
# No part of this file may be reproduced, stored, transmitted,
|
||||
# disclosed or used in any form or by any means other than as
|
||||
# expressly provided by the written permission from Jianhui Tao
|
||||
#
|
||||
###################################################################
|
||||
|
||||
# -*- coding: utf-8 -*-
|
||||
|
||||
import sys
|
||||
import taos
|
||||
from util.log import tdLog
|
||||
from util.cases import tdCases
|
||||
from util.sql import tdSql
|
||||
|
||||
|
||||
class TDTestCase:
|
||||
def init(self, conn, logSql):
|
||||
tdLog.debug("start to execute %s" % __file__)
|
||||
tdSql.init(conn.cursor(), logSql)
|
||||
|
||||
def run(self):
|
||||
tdSql.prepare()
|
||||
|
||||
print("==============step1")
|
||||
tdSql.execute("create table db.cars(ts timestamp, c int) tags(id int);")
|
||||
tdSql.execute("create database db2")
|
||||
tdSql.error("create table db2.car1 using db.cars tags(1)")
|
||||
tdSql.error("insert into db2.car1 using db1.cars tags(1) values(now, 1);")
|
||||
|
||||
def stop(self):
|
||||
tdSql.close()
|
||||
tdLog.success("%s successfully executed" % __file__)
|
||||
|
||||
|
||||
tdCases.addWindows(__file__, TDTestCase())
|
||||
tdCases.addLinux(__file__, TDTestCase())
|
|
@ -0,0 +1,55 @@
|
|||
###################################################################
|
||||
# Copyright (c) 2016 by TAOS Technologies, Inc.
|
||||
# All rights reserved.
|
||||
#
|
||||
# This file is proprietary and confidential to TAOS Technologies.
|
||||
# No part of this file may be reproduced, db_test.stored, transmitted,
|
||||
# disclosed or used in any form or by any means other than as
|
||||
# expressly provided by the written permission from Jianhui Tao
|
||||
#
|
||||
###################################################################
|
||||
|
||||
# -*- coding: utf-8 -*-
|
||||
|
||||
import sys
|
||||
import taos
|
||||
from util.log import tdLog
|
||||
from util.cases import tdCases
|
||||
from util.sql import tdSql
|
||||
|
||||
|
||||
class TDTestCase:
|
||||
|
||||
def init(self, conn, logSql):
|
||||
tdLog.debug("start to execute %s" % __file__)
|
||||
tdSql.init(conn.cursor(), logSql)
|
||||
|
||||
def run(self):
|
||||
tdSql.prepare()
|
||||
|
||||
print("==============step1")
|
||||
|
||||
tdLog.info("check nchar")
|
||||
tdSql.error("create database anal (ts timestamp ,i nchar(4094))")
|
||||
tdSql.execute(
|
||||
"create table anal (ts timestamp ,i nchar(4093))")
|
||||
|
||||
print("==============step2")
|
||||
tdLog.info("check binary")
|
||||
tdSql.error("create database anal (ts timestamp ,i binary(16375))")
|
||||
tdSql.execute(
|
||||
"create table anal1 (ts timestamp ,i binary(16374))")
|
||||
|
||||
print("==============step3")
|
||||
tdLog.info("check int & binary")
|
||||
tdSql.error("create table anal2 (ts timestamp ,i binary(16371),j int)")
|
||||
tdSql.execute("create table anal2 (ts timestamp ,i binary(16370),j int)")
|
||||
tdSql.execute("create table anal3 (ts timestamp ,i binary(16366), j int, k int)")
|
||||
|
||||
def stop(self):
|
||||
tdSql.close()
|
||||
tdLog.success("%s successfully executed" % __file__)
|
||||
|
||||
|
||||
tdCases.addWindows(__file__, TDTestCase())
|
||||
tdCases.addLinux(__file__, TDTestCase())
|
|
@ -0,0 +1,93 @@
|
|||
###################################################################
|
||||
# Copyright (c) 2016 by TAOS Technologies, Inc.
|
||||
# All rights reserved.
|
||||
#
|
||||
# This file is proprietary and confidential to TAOS Technologies.
|
||||
# No part of this file may be reproduced, stored, transmitted,
|
||||
# disclosed or used in any form or by any means other than as
|
||||
# expressly provided by the written permission from Jianhui Tao
|
||||
#
|
||||
###################################################################
|
||||
|
||||
# -*- coding: utf-8 -*-
|
||||
|
||||
import sys
|
||||
import taos
|
||||
import time
|
||||
import datetime
|
||||
import csv
|
||||
import random
|
||||
import pandas as pd
|
||||
import argparse
|
||||
import os.path
|
||||
|
||||
class taosdemoPerformace:
|
||||
def __init__(self, commitID, dbName, createTableTime, insertRecordsTime, recordsPerSecond):
|
||||
self.commitID = commitID
|
||||
self.dbName = dbName
|
||||
self.createTableTime = createTableTime
|
||||
self.insertRecordsTime = insertRecordsTime
|
||||
self.recordsPerSecond = recordsPerSecond
|
||||
self.host = "127.0.0.1"
|
||||
self.user = "root"
|
||||
self.password = "taosdata"
|
||||
self.config = "/etc/taosperf"
|
||||
self.conn = taos.connect(
|
||||
self.host,
|
||||
self.user,
|
||||
self.password,
|
||||
self.config)
|
||||
|
||||
def createTablesAndStoreData(self):
|
||||
cursor = self.conn.cursor()
|
||||
|
||||
cursor.execute("create database if not exists %s" % self.dbName)
|
||||
cursor.execute("use %s" % self.dbName)
|
||||
cursor.execute("create table if not exists taosdemo_perf (ts timestamp, create_table_time float, insert_records_time float, records_per_second float, commit_id binary(50))")
|
||||
print("==================== taosdemo performance ====================")
|
||||
print("create tables time: %f" % self.createTableTime)
|
||||
print("insert records time: %f" % self.insertRecordsTime)
|
||||
print("records per second: %f" % self.recordsPerSecond)
|
||||
cursor.execute("insert into taosdemo_perf values(now, %f, %f, %f, '%s')" % (self.createTableTime, self.insertRecordsTime, self.recordsPerSecond, self.commitID))
|
||||
cursor.execute("drop database if exists taosdemo_insert_test")
|
||||
|
||||
cursor.close()
|
||||
|
||||
if __name__ == '__main__':
|
||||
parser = argparse.ArgumentParser()
|
||||
parser.add_argument(
|
||||
'-c',
|
||||
'--commit-id',
|
||||
action='store',
|
||||
type=str,
|
||||
help='git commit id (default: null)')
|
||||
parser.add_argument(
|
||||
'-d',
|
||||
'--database-name',
|
||||
action='store',
|
||||
default='perf',
|
||||
type=str,
|
||||
help='Database name to be created (default: perf)')
|
||||
parser.add_argument(
|
||||
'-t',
|
||||
'--create-table',
|
||||
action='store',
|
||||
type=float,
|
||||
help='create table time')
|
||||
parser.add_argument(
|
||||
'-i',
|
||||
'--insert-records',
|
||||
action='store',
|
||||
type=float,
|
||||
help='insert records time')
|
||||
parser.add_argument(
|
||||
'-r',
|
||||
'---records-per-second',
|
||||
action='store',
|
||||
type=float,
|
||||
help='records per request')
|
||||
|
||||
args = parser.parse_args()
|
||||
|
||||
perftest = taosdemoPerformace(args.commit_id, args.database_name, args.create_table, args.insert_records, args.records_per_second)
|
||||
perftest.createTablesAndStoreData()
|
|
@ -24,15 +24,17 @@
|
|||
#define GREEN "\033[1;32m"
|
||||
#define NC "\033[0m"
|
||||
|
||||
int32_t capacity = 100000;
|
||||
int32_t q1Times = 1;
|
||||
int32_t q2Times = 1;
|
||||
int32_t capacity = 128;
|
||||
int32_t q1Times = 10;
|
||||
int32_t q2Times = 10;
|
||||
int32_t keyNum = 100000;
|
||||
int32_t printInterval = 10000;
|
||||
int32_t printInterval = 1000;
|
||||
void * hashHandle;
|
||||
pthread_t thread;
|
||||
|
||||
typedef struct HashTestRow {
|
||||
int32_t size;
|
||||
void * ptr;
|
||||
int32_t keySize;
|
||||
char key[100];
|
||||
} HashTestRow;
|
||||
|
||||
void shellParseArgument(int argc, char *argv[]);
|
||||
|
@ -40,7 +42,7 @@ void shellParseArgument(int argc, char *argv[]);
|
|||
void testHashPerformance() {
|
||||
int64_t initialMs = taosGetTimestampMs();
|
||||
_hash_fn_t hashFp = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY);
|
||||
void * hashHandle = taosHashInit(capacity, hashFp, true);
|
||||
hashHandle = taosHashInit(128, hashFp, true, HASH_NO_LOCK);
|
||||
|
||||
int64_t startMs = taosGetTimestampMs();
|
||||
float seconds = (startMs - initialMs) / 1000.0;
|
||||
|
@ -48,17 +50,25 @@ void testHashPerformance() {
|
|||
|
||||
for (int32_t t = 1; t <= keyNum; ++t) {
|
||||
HashTestRow row = {0};
|
||||
char key[100] = {0};
|
||||
int32_t keySize = sprintf(key, "0.db.st%d", t);
|
||||
row.keySize = sprintf(row.key, "0.db.st%d", t);
|
||||
|
||||
for (int32_t q = 0; q < q1Times; q++) {
|
||||
taosHashGet(hashHandle, &key, keySize);
|
||||
taosHashGet(hashHandle, row.key, row.keySize);
|
||||
}
|
||||
|
||||
taosHashPut(hashHandle, key, keySize, &row, sizeof(HashTestRow));
|
||||
taosHashPut(hashHandle, row.key, row.keySize, &row, sizeof(HashTestRow));
|
||||
|
||||
for (int32_t q = 0; q < q2Times; q++) {
|
||||
taosHashGet(hashHandle, &key, keySize);
|
||||
taosHashGet(hashHandle, row.key, row.keySize);
|
||||
}
|
||||
|
||||
// test iterator
|
||||
{
|
||||
HashTestRow *row = taosHashIterate(hashHandle, NULL);
|
||||
while (row) {
|
||||
taosHashGet(hashHandle, row->key, row->keySize);
|
||||
row = taosHashIterate(hashHandle, row);
|
||||
}
|
||||
}
|
||||
|
||||
if (t % printInterval == 0) {
|
||||
|
@ -80,9 +90,35 @@ void testHashPerformance() {
|
|||
taosHashCleanup(hashHandle);
|
||||
}
|
||||
|
||||
void *multiThreadFunc(void *param) {
|
||||
for (int i = 0; i < 100; ++i) {
|
||||
taosMsleep(1000);
|
||||
HashTestRow *row = taosHashIterate(hashHandle, NULL);
|
||||
while (row) {
|
||||
taosHashGet(hashHandle, row->key, row->keySize);
|
||||
row = taosHashIterate(hashHandle, row);
|
||||
}
|
||||
int64_t hashSize = taosHashGetSize(hashHandle);
|
||||
pPrint("i:%d hashSize:%ld", i, hashSize);
|
||||
}
|
||||
|
||||
return NULL;
|
||||
}
|
||||
|
||||
void multiThreadTest() {
|
||||
pthread_attr_t thattr;
|
||||
pthread_attr_init(&thattr);
|
||||
pthread_attr_setdetachstate(&thattr, PTHREAD_CREATE_JOINABLE);
|
||||
|
||||
// Start threads to write
|
||||
pthread_create(&thread, &thattr, multiThreadFunc, NULL);
|
||||
}
|
||||
|
||||
int main(int argc, char *argv[]) {
|
||||
shellParseArgument(argc, argv);
|
||||
multiThreadTest();
|
||||
testHashPerformance();
|
||||
pthread_join(thread, NULL);
|
||||
}
|
||||
|
||||
void printHelp() {
|
||||
|
|
Loading…
Reference in New Issue