Merge branch 'develop' into compress_float

This commit is contained in:
tickduan 2021-07-22 19:42:12 +08:00
commit 49cf75d768
104 changed files with 4704 additions and 3067 deletions

View File

@ -23,6 +23,8 @@
static SBnThread tsBnThread;
static void *bnThreadFunc(void *arg) {
setThreadName("bnThreadd");
while (1) {
pthread_mutex_lock(&tsBnThread.mutex);
if (tsBnThread.stop) {

View File

@ -110,6 +110,7 @@ int32_t tscCreateDataBlock(size_t initialSize, int32_t rowSize, int32_t startOff
void tscDestroyDataBlock(STableDataBlocks* pDataBlock, bool removeMeta);
void tscSortRemoveDataBlockDupRowsRaw(STableDataBlocks* dataBuf);
int tscSortRemoveDataBlockDupRows(STableDataBlocks* dataBuf, SBlockKeyInfo* pBlkKeyInfo);
int32_t tsSetBlockInfo(SSubmitBlk *pBlocks, const STableMeta *pTableMeta, int32_t numOfRows);
void tscDestroyBoundColumnInfo(SParsedDataColInfo* pColInfo);
void doRetrieveSubqueryData(SSchedMsg *pMsg);

View File

@ -138,7 +138,8 @@ typedef struct STableDataBlocks {
uint32_t size;
STableMeta *pTableMeta; // the tableMeta of current table, the table meta will be used during submit, keep a ref to avoid to be removed from cache
char *pData;
bool cloned;
SParsedDataColInfo boundColumnInfo;
// for parameter ('?') binding
@ -411,6 +412,7 @@ int32_t tscSQLSyntaxErrMsg(char* msg, const char* additionalInfo, const char* s
int32_t tscValidateSqlInfo(SSqlObj *pSql, struct SSqlInfo *pInfo);
int32_t tsSetBlockInfo(SSubmitBlk *pBlocks, const STableMeta *pTableMeta, int32_t numOfRows);
extern int32_t sentinel;
extern SHashObj *tscVgroupMap;
extern SHashObj *tscTableMetaInfo;
@ -436,4 +438,4 @@ int32_t getExtendedRowSize(STableComInfo *tinfo);
}
#endif
#endif
#endif

View File

@ -946,3 +946,34 @@ JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_setTableNameTagsI
return JNI_SUCCESS;
}
JNIEXPORT jlong JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_insertLinesImp(JNIEnv *env, jobject jobj,
jobjectArray lines, jlong conn) {
TAOS *taos = (TAOS *)conn;
if (taos == NULL) {
jniError("jobj:%p, connection already closed", jobj);
return JNI_CONNECTION_NULL;
}
int numLines = (*env)->GetArrayLength(env, lines);
char** c_lines = calloc(numLines, sizeof(char*));
for (int i = 0; i < numLines; ++i) {
jstring line = (jstring) ((*env)->GetObjectArrayElement(env, lines, i));
c_lines[i] = (char*)(*env)->GetStringUTFChars(env, line, 0);
}
int code = taos_insert_lines(taos, c_lines, numLines);
for (int i = 0; i < numLines; ++i) {
jstring line = (jstring) ((*env)->GetObjectArrayElement(env, lines, i));
(*env)->ReleaseStringUTFChars(env, line, c_lines[i]);
}
if (code != TSDB_CODE_SUCCESS) {
jniError("jobj:%p, conn:%p, code:%s", jobj, taos, tstrerror(code));
return JNI_TDENGINE_ERROR;
}
return code;
}

View File

@ -1056,7 +1056,7 @@ int32_t tscAllocateMemIfNeed(STableDataBlocks *pDataBlock, int32_t rowSize, int3
return TSDB_CODE_SUCCESS;
}
static int32_t tsSetBlockInfo(SSubmitBlk *pBlocks, const STableMeta *pTableMeta, int32_t numOfRows) {
int32_t FORCE_INLINE tsSetBlockInfo(SSubmitBlk *pBlocks, const STableMeta *pTableMeta, int32_t numOfRows) {
pBlocks->tid = pTableMeta->id.tid;
pBlocks->uid = pTableMeta->id.uid;
pBlocks->sversion = pTableMeta->sversion;
@ -1904,7 +1904,6 @@ int tsInsertInitialCheck(SSqlObj *pSql) {
int tsParseSql(SSqlObj *pSql, bool initial) {
int32_t ret = TSDB_CODE_SUCCESS;
SSqlCmd* pCmd = &pSql->cmd;
if (!initial) {
tscDebug("0x%"PRIx64" resume to parse sql: %s", pSql->self, pCmd->insertParam.sql);
}

File diff suppressed because it is too large Load Diff

View File

@ -47,6 +47,7 @@ typedef struct SNormalStmt {
typedef struct SMultiTbStmt {
bool nameSet;
bool tagSet;
bool subSet;
uint64_t currentUid;
char *sqlstr;
uint32_t tbNum;
@ -54,6 +55,7 @@ typedef struct SMultiTbStmt {
SStrToken stbname;
SStrToken values;
SArray *tags;
STableDataBlocks *lastBlock;
SHashObj *pTableHash;
SHashObj *pTableBlockHashList; // data block for each table
} SMultiTbStmt;
@ -347,11 +349,11 @@ int32_t fillTablesColumnsNull(SSqlObj* pSql) {
////////////////////////////////////////////////////////////////////////////////
// functions for insertion statement preparation
static int doBindParam(STableDataBlocks* pBlock, char* data, SParamInfo* param, TAOS_BIND* bind, int32_t colNum) {
if (bind->is_null != NULL && *(bind->is_null)) {
setNull(data + param->offset, param->type, param->bytes);
return TSDB_CODE_SUCCESS;
}
static FORCE_INLINE int doBindParam(STableDataBlocks* pBlock, char* data, SParamInfo* param, TAOS_BIND* bind, int32_t colNum) {
if (bind->is_null != NULL && *(bind->is_null)) {
setNull(data + param->offset, param->type, param->bytes);
return TSDB_CODE_SUCCESS;
}
#if 0
if (0) {
@ -746,25 +748,25 @@ static int doBindParam(STableDataBlocks* pBlock, char* data, SParamInfo* param,
case TSDB_DATA_TYPE_BOOL:
case TSDB_DATA_TYPE_TINYINT:
case TSDB_DATA_TYPE_UTINYINT:
size = 1;
*(uint8_t *)(data + param->offset) = *(uint8_t *)bind->buffer;
break;
case TSDB_DATA_TYPE_SMALLINT:
case TSDB_DATA_TYPE_USMALLINT:
size = 2;
*(uint16_t *)(data + param->offset) = *(uint16_t *)bind->buffer;
break;
case TSDB_DATA_TYPE_INT:
case TSDB_DATA_TYPE_UINT:
case TSDB_DATA_TYPE_FLOAT:
size = 4;
*(uint32_t *)(data + param->offset) = *(uint32_t *)bind->buffer;
break;
case TSDB_DATA_TYPE_BIGINT:
case TSDB_DATA_TYPE_UBIGINT:
case TSDB_DATA_TYPE_DOUBLE:
case TSDB_DATA_TYPE_TIMESTAMP:
size = 8;
*(uint64_t *)(data + param->offset) = *(uint64_t *)bind->buffer;
break;
case TSDB_DATA_TYPE_BINARY:
@ -790,7 +792,6 @@ static int doBindParam(STableDataBlocks* pBlock, char* data, SParamInfo* param,
return TSDB_CODE_TSC_INVALID_VALUE;
}
memcpy(data + param->offset, bind->buffer, size);
if (param->offset == 0) {
if (tsCheckTimestamp(pBlock, data + param->offset) != TSDB_CODE_SUCCESS) {
tscError("invalid timestamp");
@ -801,6 +802,58 @@ static int doBindParam(STableDataBlocks* pBlock, char* data, SParamInfo* param,
return TSDB_CODE_SUCCESS;
}
static int32_t insertStmtGenLastBlock(STableDataBlocks** lastBlock, STableDataBlocks* pBlock) {
*lastBlock = (STableDataBlocks*)malloc(sizeof(STableDataBlocks));
memcpy(*lastBlock, pBlock, sizeof(STableDataBlocks));
(*lastBlock)->cloned = true;
(*lastBlock)->pData = NULL;
(*lastBlock)->ordered = true;
(*lastBlock)->prevTS = INT64_MIN;
(*lastBlock)->size = sizeof(SSubmitBlk);
(*lastBlock)->tsSource = -1;
return TSDB_CODE_SUCCESS;
}
static int32_t insertStmtGenBlock(STscStmt* pStmt, STableDataBlocks** pBlock, STableMeta* pTableMeta, SName* name) {
int32_t code = 0;
if (pStmt->mtb.lastBlock == NULL) {
tscError("no previous data block");
return TSDB_CODE_TSC_APP_ERROR;
}
int32_t msize = tscGetTableMetaSize(pTableMeta);
int32_t tsize = sizeof(STableDataBlocks) + msize;
void *t = malloc(tsize);
*pBlock = t;
memcpy(*pBlock, pStmt->mtb.lastBlock, sizeof(STableDataBlocks));
t = (char *)t + sizeof(STableDataBlocks);
(*pBlock)->pTableMeta = t;
memcpy((*pBlock)->pTableMeta, pTableMeta, msize);
(*pBlock)->pData = malloc((*pBlock)->nAllocSize);
(*pBlock)->vgId = (*pBlock)->pTableMeta->vgId;
tNameAssign(&(*pBlock)->tableName, name);
SSubmitBlk* blk = (SSubmitBlk*)(*pBlock)->pData;
memset(blk, 0, sizeof(*blk));
code = tsSetBlockInfo(blk, pTableMeta, 0);
if (code != TSDB_CODE_SUCCESS) {
STMT_RET(code);
}
return TSDB_CODE_SUCCESS;
}
static int doBindBatchParam(STableDataBlocks* pBlock, SParamInfo* param, TAOS_MULTI_BIND* bind, int32_t rowNum) {
if (bind->buffer_type != param->type || !isValidDataType(param->type)) {
@ -1172,7 +1225,7 @@ static void insertBatchClean(STscStmt* pStmt) {
static int insertBatchStmtExecute(STscStmt* pStmt) {
int32_t code = 0;
if(pStmt->mtb.nameSet == false) {
tscError("0x%"PRIx64" no table name set", pStmt->pSql->self);
return invalidOperationMsg(tscGetErrorMsgPayload(&pStmt->pSql->cmd), "no table name set");
@ -1227,11 +1280,11 @@ int stmtParseInsertTbTags(SSqlObj* pSql, STscStmt* pStmt) {
pStmt->mtb.tbname = sToken;
pStmt->mtb.nameSet = false;
if (pStmt->mtb.pTableHash == NULL) {
pStmt->mtb.pTableHash = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, false);
pStmt->mtb.pTableHash = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, false);
}
if (pStmt->mtb.pTableBlockHashList == NULL) {
pStmt->mtb.pTableBlockHashList = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, false);
pStmt->mtb.pTableBlockHashList = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, false);
}
pStmt->mtb.tagSet = true;
@ -1522,6 +1575,7 @@ int taos_stmt_prepare(TAOS_STMT* stmt, const char* sql, unsigned long length) {
int taos_stmt_set_tbname_tags(TAOS_STMT* stmt, const char* name, TAOS_BIND* tags) {
STscStmt* pStmt = (STscStmt*)stmt;
int32_t code = 0;
if (stmt == NULL || pStmt->pSql == NULL || pStmt->taos == NULL) {
STMT_RET(TSDB_CODE_TSC_DISCONNECTED);
@ -1559,6 +1613,11 @@ int taos_stmt_set_tbname_tags(TAOS_STMT* stmt, const char* name, TAOS_BIND* tags
SSubmitBlk* pBlk = (SSubmitBlk*) (*t1)->pData;
pCmd->batchSize = pBlk->numOfRows;
if (pBlk->numOfRows == 0) {
(*t1)->prevTS = INT64_MIN;
}
tsSetBlockInfo(pBlk, (*t1)->pTableMeta, pBlk->numOfRows);
taosHashPut(pCmd->insertParam.pTableBlockHashList, (void *)&pStmt->mtb.currentUid, sizeof(pStmt->mtb.currentUid), (void*)t1, POINTER_BYTES);
@ -1566,6 +1625,51 @@ int taos_stmt_set_tbname_tags(TAOS_STMT* stmt, const char* name, TAOS_BIND* tags
STMT_RET(TSDB_CODE_SUCCESS);
}
if (pStmt->mtb.subSet && taosHashGetSize(pStmt->mtb.pTableHash) > 0) {
STableMetaInfo* pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, 0);
STableMeta* pTableMeta = pTableMetaInfo->pTableMeta;
char sTableName[TSDB_TABLE_FNAME_LEN];
strncpy(sTableName, pTableMeta->sTableName, sizeof(sTableName));
SStrToken tname = {0};
tname.type = TK_STRING;
tname.z = (char *)name;
tname.n = (uint32_t)strlen(name);
SName fullname = {0};
tscSetTableFullName(&fullname, &tname, pSql);
memcpy(&pTableMetaInfo->name, &fullname, sizeof(fullname));
code = tscGetTableMeta(pSql, pTableMetaInfo);
if (code != TSDB_CODE_SUCCESS) {
STMT_RET(code);
}
pTableMeta = pTableMetaInfo->pTableMeta;
if (strcmp(sTableName, pTableMeta->sTableName)) {
tscError("0x%"PRIx64" only tables belongs to one stable is allowed", pSql->self);
STMT_RET(TSDB_CODE_TSC_APP_ERROR);
}
STableDataBlocks* pBlock = NULL;
insertStmtGenBlock(pStmt, &pBlock, pTableMeta, &pTableMetaInfo->name);
pCmd->batchSize = 0;
pStmt->mtb.currentUid = pTableMeta->id.uid;
pStmt->mtb.tbNum++;
taosHashPut(pCmd->insertParam.pTableBlockHashList, (void *)&pStmt->mtb.currentUid, sizeof(pStmt->mtb.currentUid), (void*)&pBlock, POINTER_BYTES);
taosHashPut(pStmt->mtb.pTableBlockHashList, (void *)&pStmt->mtb.currentUid, sizeof(pStmt->mtb.currentUid), (void*)&pBlock, POINTER_BYTES);
taosHashPut(pStmt->mtb.pTableHash, name, strlen(name), (char*) &pTableMeta->id.uid, sizeof(pTableMeta->id.uid));
tscDebug("0x%"PRIx64" table:%s is prepared, uid:%" PRIx64, pSql->self, name, pStmt->mtb.currentUid);
STMT_RET(TSDB_CODE_SUCCESS);
}
if (pStmt->mtb.tagSet) {
pStmt->mtb.tbname = tscReplaceStrToken(&pSql->sqlstr, &pStmt->mtb.tbname, name);
} else {
@ -1594,7 +1698,7 @@ int taos_stmt_set_tbname_tags(TAOS_STMT* stmt, const char* name, TAOS_BIND* tags
pCmd->insertParam.pTableBlockHashList = hashList;
}
int32_t code = tsParseSql(pStmt->pSql, true);
code = tsParseSql(pStmt->pSql, true);
if (code == TSDB_CODE_TSC_ACTION_IN_PROGRESS) {
// wait for the callback function to post the semaphore
tsem_wait(&pStmt->pSql->rspSem);
@ -1622,6 +1726,10 @@ int taos_stmt_set_tbname_tags(TAOS_STMT* stmt, const char* name, TAOS_BIND* tags
taosHashPut(pStmt->mtb.pTableBlockHashList, (void *)&pStmt->mtb.currentUid, sizeof(pStmt->mtb.currentUid), (void*)&pBlock, POINTER_BYTES);
taosHashPut(pStmt->mtb.pTableHash, name, strlen(name), (char*) &pTableMeta->id.uid, sizeof(pTableMeta->id.uid));
if (pStmt->mtb.lastBlock == NULL) {
insertStmtGenLastBlock(&pStmt->mtb.lastBlock, pBlock);
}
tscDebug("0x%"PRIx64" table:%s is prepared, uid:%" PRIx64, pSql->self, name, pStmt->mtb.currentUid);
}
@ -1629,7 +1737,17 @@ int taos_stmt_set_tbname_tags(TAOS_STMT* stmt, const char* name, TAOS_BIND* tags
}
int taos_stmt_set_sub_tbname(TAOS_STMT* stmt, const char* name) {
STscStmt* pStmt = (STscStmt*)stmt;
pStmt->mtb.subSet = true;
return taos_stmt_set_tbname_tags(stmt, name, NULL);
}
int taos_stmt_set_tbname(TAOS_STMT* stmt, const char* name) {
STscStmt* pStmt = (STscStmt*)stmt;
pStmt->mtb.subSet = false;
return taos_stmt_set_tbname_tags(stmt, name, NULL);
}
@ -1653,6 +1771,7 @@ int taos_stmt_close(TAOS_STMT* stmt) {
if (pStmt->pSql && pStmt->pSql->res.code != 0) {
rmMeta = true;
}
tscDestroyDataBlock(pStmt->mtb.lastBlock, rmMeta);
pStmt->mtb.pTableBlockHashList = tscDestroyBlockHashTable(pStmt->mtb.pTableBlockHashList, rmMeta);
taosHashCleanup(pStmt->pSql->cmd.insertParam.pTableBlockHashList);
pStmt->pSql->cmd.insertParam.pTableBlockHashList = NULL;
@ -1687,6 +1806,8 @@ int taos_stmt_bind_param(TAOS_STMT* stmt, TAOS_BIND* bind) {
pStmt->last = STMT_BIND;
tscDebug("tableId:%" PRIu64 ", try to bind one row", pStmt->mtb.currentUid);
STMT_RET(insertStmtBindParam(pStmt, bind));
} else {
STMT_RET(normalStmtBindParam(pStmt, bind));

View File

@ -2023,6 +2023,11 @@ static int32_t checkForUdf(SSqlObj* pSql, SQueryInfo* pQueryInfo, SArray* pSelec
*/
static SUdfInfo* isValidUdf(SArray* pUdfInfo, const char* name, int32_t len) {
if(pUdfInfo == NULL){
tscError("udfinfo is null");
return NULL;
}
size_t t = taosArrayGetSize(pUdfInfo);
for(int32_t i = 0; i < t; ++i) {
SUdfInfo* pUdf = taosArrayGet(pUdfInfo, i);

View File

@ -1517,12 +1517,6 @@ void tscDestroyDataBlock(STableDataBlocks* pDataBlock, bool removeMeta) {
}
tfree(pDataBlock->pData);
tfree(pDataBlock->params);
// free the refcount for metermeta
if (pDataBlock->pTableMeta != NULL) {
tfree(pDataBlock->pTableMeta);
}
if (removeMeta) {
char name[TSDB_TABLE_FNAME_LEN] = {0};
@ -1531,7 +1525,17 @@ void tscDestroyDataBlock(STableDataBlocks* pDataBlock, bool removeMeta) {
taosHashRemove(tscTableMetaInfo, name, strnlen(name, TSDB_TABLE_FNAME_LEN));
}
tscDestroyBoundColumnInfo(&pDataBlock->boundColumnInfo);
if (!pDataBlock->cloned) {
tfree(pDataBlock->params);
// free the refcount for metermeta
if (pDataBlock->pTableMeta != NULL) {
tfree(pDataBlock->pTableMeta);
}
tscDestroyBoundColumnInfo(&pDataBlock->boundColumnInfo);
}
tfree(pDataBlock);
}
@ -1710,12 +1714,14 @@ int32_t tscCreateDataBlock(size_t defaultSize, int32_t rowSize, int32_t startOff
dataBuf->nAllocSize = dataBuf->headerSize * 2;
}
dataBuf->pData = calloc(1, dataBuf->nAllocSize);
//dataBuf->pData = calloc(1, dataBuf->nAllocSize);
dataBuf->pData = malloc(dataBuf->nAllocSize);
if (dataBuf->pData == NULL) {
tscError("failed to allocated memory, reason:%s", strerror(errno));
tfree(dataBuf);
return TSDB_CODE_TSC_OUT_OF_MEMORY;
}
memset(dataBuf->pData, 0, sizeof(SSubmitBlk));
//Here we keep the tableMeta to avoid it to be remove by other threads.
dataBuf->pTableMeta = tscTableMetaDup(pTableMeta);
@ -1956,16 +1962,14 @@ static int32_t getRowExpandSize(STableMeta* pTableMeta) {
static void extractTableNameList(SInsertStatementParam *pInsertParam, bool freeBlockMap) {
pInsertParam->numOfTables = (int32_t) taosHashGetSize(pInsertParam->pTableBlockHashList);
if (pInsertParam->pTableNameList == NULL) {
pInsertParam->pTableNameList = calloc(pInsertParam->numOfTables, POINTER_BYTES);
} else {
memset(pInsertParam->pTableNameList, 0, pInsertParam->numOfTables * POINTER_BYTES);
pInsertParam->pTableNameList = malloc(pInsertParam->numOfTables * POINTER_BYTES);
}
STableDataBlocks **p1 = taosHashIterate(pInsertParam->pTableBlockHashList, NULL);
int32_t i = 0;
while(p1) {
STableDataBlocks* pBlocks = *p1;
tfree(pInsertParam->pTableNameList[i]);
//tfree(pInsertParam->pTableNameList[i]);
pInsertParam->pTableNameList[i++] = tNameDup(&pBlocks->tableName);
p1 = taosHashIterate(pInsertParam->pTableBlockHashList, p1);
@ -2009,14 +2013,12 @@ int32_t tscMergeTableDataBlocks(SInsertStatementParam *pInsertParam, bool freeBl
int64_t destSize = dataBuf->size + pOneTableBlock->size + pBlocks->numOfRows * expandSize + sizeof(STColumn) * tscGetNumOfColumns(pOneTableBlock->pTableMeta);
if (dataBuf->nAllocSize < destSize) {
while (dataBuf->nAllocSize < destSize) {
dataBuf->nAllocSize = (uint32_t)(dataBuf->nAllocSize * 1.5);
}
dataBuf->nAllocSize = (uint32_t)(destSize * 1.5);
char* tmp = realloc(dataBuf->pData, dataBuf->nAllocSize);
if (tmp != NULL) {
dataBuf->pData = tmp;
memset(dataBuf->pData + dataBuf->size, 0, dataBuf->nAllocSize - dataBuf->size);
//memset(dataBuf->pData + dataBuf->size, 0, dataBuf->nAllocSize - dataBuf->size);
} else { // failed to allocate memory, free already allocated memory and return error code
tscError("0x%"PRIx64" failed to allocate memory for merging submit block, size:%d", pInsertParam->objectId, dataBuf->nAllocSize);
@ -3559,6 +3561,7 @@ SSqlObj* createSubqueryObj(SSqlObj* pSql, int16_t tableIndex, __async_cb_func_t
pNew->pTscObj = pSql->pTscObj;
pNew->signature = pNew;
pNew->sqlstr = strdup(pSql->sqlstr);
tsem_init(&pNew->rspSem, 0, 0);
SSqlCmd* pnCmd = &pNew->cmd;
memcpy(pnCmd, pCmd, sizeof(SSqlCmd));
@ -3919,7 +3922,7 @@ bool tscIsUpdateQuery(SSqlObj* pSql) {
}
SSqlCmd* pCmd = &pSql->cmd;
return ((pCmd->command >= TSDB_SQL_INSERT && pCmd->command <= TSDB_SQL_DROP_DNODE) || TSDB_SQL_USE_DB == pCmd->command);
return ((pCmd->command >= TSDB_SQL_INSERT && pCmd->command <= TSDB_SQL_DROP_DNODE) || TSDB_SQL_RESET_CACHE == pCmd->command || TSDB_SQL_USE_DB == pCmd->command);
}
char* tscGetSqlStr(SSqlObj* pSql) {
@ -4384,7 +4387,7 @@ STableMeta* tscTableMetaDup(STableMeta* pTableMeta) {
assert(pTableMeta != NULL);
size_t size = tscGetTableMetaSize(pTableMeta);
STableMeta* p = calloc(1, size);
STableMeta* p = malloc(size);
memcpy(p, pTableMeta, size);
return p;
}
@ -4768,15 +4771,6 @@ static void freeContent(void* p) {
tfree(ptr);
}
static int32_t contCompare(const void* p1, const void* p2) {
int32_t ret = strcmp(p1, p2);
if (ret == 0) {
return 0;
} else {
return ret > 0 ? 1:-1;
}
}
int tscTransferTableNameList(SSqlObj *pSql, const char *pNameList, int32_t length, SArray* pNameArray) {
SSqlCmd *pCmd = &pSql->cmd;
@ -4824,7 +4818,7 @@ int tscTransferTableNameList(SSqlObj *pSql, const char *pNameList, int32_t lengt
}
taosArraySort(pNameArray, nameComparFn);
taosArrayRemoveDuplicate(pNameArray, contCompare, freeContent);
taosArrayRemoveDuplicate(pNameArray, nameComparFn, freeContent);
return TSDB_CODE_SUCCESS;
}

View File

@ -306,7 +306,7 @@ bool tIsValidName(const SName* name) {
SName* tNameDup(const SName* name) {
assert(name != NULL);
SName* p = calloc(1, sizeof(SName));
SName* p = malloc(sizeof(SName));
memcpy(p, name, sizeof(SName));
return p;
}

View File

@ -37,13 +37,6 @@
<maven.test.jvmargs></maven.test.jvmargs>
</properties>
<dependencies>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.13.1</version>
<scope>test</scope>
</dependency>
<!-- for restful -->
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpclient</artifactId>
@ -52,12 +45,18 @@
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.58</version>
<version>1.2.76</version>
</dependency>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>30.0-jre</version>
<version>30.1.1-jre</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.13.1</version>
<scope>test</scope>
</dependency>
</dependencies>

View File

@ -171,11 +171,7 @@ public abstract class AbstractConnection extends WrapperImpl implements Connecti
// do nothing
}
@Override
public Statement createStatement(int resultSetType, int resultSetConcurrency) throws SQLException {
if (isClosed())
throw TSDBError.createSQLException(TSDBErrorNumbers.ERROR_CONNECTION_CLOSED);
private void checkResultSetTypeAndResultSetConcurrency(int resultSetType, int resultSetConcurrency) throws SQLException {
switch (resultSetType) {
case ResultSet.TYPE_FORWARD_ONLY:
break;
@ -194,7 +190,14 @@ public abstract class AbstractConnection extends WrapperImpl implements Connecti
default:
throw TSDBError.createSQLException(TSDBErrorNumbers.ERROR_INVALID_VARIABLE);
}
}
@Override
public Statement createStatement(int resultSetType, int resultSetConcurrency) throws SQLException {
if (isClosed())
throw TSDBError.createSQLException(TSDBErrorNumbers.ERROR_CONNECTION_CLOSED);
checkResultSetTypeAndResultSetConcurrency(resultSetType, resultSetConcurrency);
return createStatement();
}
@ -203,24 +206,7 @@ public abstract class AbstractConnection extends WrapperImpl implements Connecti
if (isClosed())
throw TSDBError.createSQLException(TSDBErrorNumbers.ERROR_CONNECTION_CLOSED);
switch (resultSetType) {
case ResultSet.TYPE_FORWARD_ONLY:
break;
case ResultSet.TYPE_SCROLL_INSENSITIVE:
case ResultSet.TYPE_SCROLL_SENSITIVE:
throw TSDBError.createSQLException(TSDBErrorNumbers.ERROR_UNSUPPORTED_METHOD);
default:
throw TSDBError.createSQLException(TSDBErrorNumbers.ERROR_INVALID_VARIABLE);
}
switch (resultSetConcurrency) {
case ResultSet.CONCUR_READ_ONLY:
break;
case ResultSet.CONCUR_UPDATABLE:
throw TSDBError.createSQLException(TSDBErrorNumbers.ERROR_UNSUPPORTED_METHOD);
default:
throw TSDBError.createSQLException(TSDBErrorNumbers.ERROR_INVALID_VARIABLE);
}
checkResultSetTypeAndResultSetConcurrency(resultSetType, resultSetConcurrency);
return prepareStatement(sql);
}

View File

@ -326,7 +326,7 @@ public class EmptyResultSet implements ResultSet {
@Override
public int getFetchDirection() throws SQLException {
return 0;
return ResultSet.FETCH_FORWARD;
}
@Override
@ -341,12 +341,12 @@ public class EmptyResultSet implements ResultSet {
@Override
public int getType() throws SQLException {
return 0;
return ResultSet.TYPE_FORWARD_ONLY;
}
@Override
public int getConcurrency() throws SQLException {
return 0;
return ResultSet.CONCUR_READ_ONLY;
}
@Override
@ -746,7 +746,7 @@ public class EmptyResultSet implements ResultSet {
@Override
public int getHoldability() throws SQLException {
return 0;
return ResultSet.CLOSE_CURSORS_AT_COMMIT;
}
@Override

View File

@ -40,7 +40,7 @@ public class TSDBErrorNumbers {
public static final int ERROR_JNI_FETCH_END = 0x2358; // fetch to the end of resultSet
public static final int ERROR_JNI_OUT_OF_MEMORY = 0x2359; // JNI alloc memory failed
private static final Set<Integer> errorNumbers = new HashSet();
private static final Set<Integer> errorNumbers = new HashSet<>();
static {
errorNumbers.add(ERROR_CONNECTION_CLOSED);

View File

@ -348,4 +348,13 @@ public class TSDBJNIConnector {
}
private native int closeStmt(long stmt, long con);
public void insertLines(String[] lines) throws SQLException {
int code = insertLinesImp(lines, this.taos);
if (code != TSDBConstants.JNI_SUCCESS) {
throw TSDBError.createSQLException(TSDBErrorNumbers.ERROR_UNKNOWN, "failed to insertLines");
}
}
private native int insertLinesImp(String[] lines, long conn);
}

View File

@ -61,7 +61,7 @@ public class RestfulResultSet extends AbstractResultSet implements ResultSet {
return;
// parse row data
for (int rowIndex = 0; rowIndex < data.size(); rowIndex++) {
List<Object> row = new ArrayList();
List<Object> row = new ArrayList<>();
JSONArray jsonRow = data.getJSONArray(rowIndex);
for (int colIndex = 0; colIndex < this.metaData.getColumnCount(); colIndex++) {
row.add(parseColumnData(jsonRow, colIndex, columns.get(colIndex).taos_type));

View File

@ -9,6 +9,7 @@ import org.apache.http.client.protocol.HttpClientContext;
import org.apache.http.conn.ConnectionKeepAliveStrategy;
import org.apache.http.entity.StringEntity;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.DefaultHttpRequestRetryHandler;
import org.apache.http.impl.client.HttpClients;
import org.apache.http.impl.conn.PoolingHttpClientConnectionManager;
import org.apache.http.message.BasicHeaderElementIterator;
@ -34,7 +35,11 @@ public class HttpClientPoolUtil {
PoolingHttpClientConnectionManager connectionManager = new PoolingHttpClientConnectionManager();
connectionManager.setDefaultMaxPerRoute(DEFAULT_MAX_PER_ROUTE);
connectionManager.setMaxTotal(DEFAULT_MAX_TOTAL);
httpClient = HttpClients.custom().setKeepAliveStrategy(DEFAULT_KEEP_ALIVE_STRATEGY).setConnectionManager(connectionManager).build();
httpClient = HttpClients.custom()
.setKeepAliveStrategy(DEFAULT_KEEP_ALIVE_STRATEGY)
.setConnectionManager(connectionManager)
.setRetryHandler(new DefaultHttpRequestRetryHandler(3, true))
.build();
}
}

View File

@ -4,14 +4,14 @@ public class OSUtils {
private static final String OS = System.getProperty("os.name").toLowerCase();
public static boolean isWindows() {
return OS.indexOf("win") >= 0;
return OS.contains("win");
}
public static boolean isMac() {
return OS.indexOf("mac") >= 0;
return OS.contains("mac");
}
public static boolean isLinux() {
return OS.indexOf("nux") >= 0;
return OS.contains("nux");
}
}

View File

@ -16,7 +16,7 @@ package com.taosdata.jdbc.utils;
public class SqlSyntaxValidator {
private static final String[] SQL = {"select", "insert", "import", "create", "use", "alter", "drop", "set", "show", "describe"};
private static final String[] SQL = {"select", "insert", "import", "create", "use", "alter", "drop", "set", "show", "describe", "reset"};
private static final String[] updateSQL = {"insert", "import", "create", "use", "alter", "drop", "set"};
private static final String[] querySQL = {"select", "show", "describe"};
@ -61,29 +61,11 @@ public class SqlSyntaxValidator {
public static boolean isUseSql(String sql) {
return sql.trim().toLowerCase().startsWith("use");
// || sql.trim().toLowerCase().matches("create\\s*database.*") || sql.toLowerCase().toLowerCase().matches("drop\\s*database.*");
}
public static boolean isShowSql(String sql) {
return sql.trim().toLowerCase().startsWith("show");
}
public static boolean isDescribeSql(String sql) {
return sql.trim().toLowerCase().startsWith("describe");
}
public static boolean isInsertSql(String sql) {
return sql.trim().toLowerCase().startsWith("insert") || sql.trim().toLowerCase().startsWith("import");
}
public static boolean isSelectSql(String sql) {
return sql.trim().toLowerCase().startsWith("select");
}
public static boolean isShowDatabaseSql(String sql) {
return sql.trim().toLowerCase().matches("show\\s*databases");
}
}

View File

@ -7,9 +7,9 @@ import java.util.concurrent.atomic.AtomicLong;
public class TaosInfo implements TaosInfoMBean {
private static volatile TaosInfo instance;
private AtomicLong connect_open = new AtomicLong();
private AtomicLong connect_close = new AtomicLong();
private AtomicLong statement_count = new AtomicLong();
private final AtomicLong connect_open = new AtomicLong();
private final AtomicLong connect_close = new AtomicLong();
private final AtomicLong statement_count = new AtomicLong();
static {
try {

View File

@ -22,8 +22,7 @@ import java.util.stream.IntStream;
public class Utils {
private static Pattern ptn = Pattern.compile(".*?'");
private static final Pattern ptn = Pattern.compile(".*?'");
private static final DateTimeFormatter milliSecFormatter = new DateTimeFormatterBuilder().appendPattern("yyyy-MM-dd HH:mm:ss.SSS").toFormatter();
private static final DateTimeFormatter microSecFormatter = new DateTimeFormatterBuilder().appendPattern("yyyy-MM-dd HH:mm:ss.SSSSSS").toFormatter();
private static final DateTimeFormatter nanoSecFormatter = new DateTimeFormatterBuilder().appendPattern("yyyy-MM-dd HH:mm:ss.SSSSSSSSS").toFormatter();
@ -74,7 +73,7 @@ public class Utils {
public static String escapeSingleQuota(String origin) {
Matcher m = ptn.matcher(origin);
StringBuffer sb = new StringBuffer();
StringBuilder sb = new StringBuilder();
int end = 0;
while (m.find()) {
end = m.end();
@ -87,7 +86,7 @@ public class Utils {
sb.append(seg);
}
} else { // len > 1
sb.append(seg.substring(0, seg.length() - 2));
sb.append(seg, 0, seg.length() - 2);
char lastcSec = seg.charAt(seg.length() - 2);
if (lastcSec == '\\') {
sb.append("\\'");
@ -195,7 +194,7 @@ public class Utils {
public static String formatTimestamp(Timestamp timestamp) {
int nanos = timestamp.getNanos();
if (nanos % 1000000l != 0)
if (nanos % 1000000L != 0)
return timestamp.toLocalDateTime().format(microSecFormatter);
return timestamp.toLocalDateTime().format(milliSecFormatter);
}

View File

@ -5,7 +5,6 @@ import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
import javax.management.OperationsException;
import java.sql.*;
import java.util.Properties;

View File

@ -1,16 +1,16 @@
package com.taosdata.jdbc;
import org.junit.Test;
import static org.junit.Assert.*;
import java.lang.management.ManagementFactory;
import java.lang.management.RuntimeMXBean;
import java.sql.SQLException;
import java.sql.SQLWarning;
import java.util.ArrayList;
import java.util.List;
import java.lang.management.ManagementFactory;
import java.lang.management.RuntimeMXBean;
import java.lang.management.ThreadMXBean;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
public class TSDBJNIConnectorTest {
@ -114,6 +114,10 @@ public class TSDBJNIConnectorTest {
throw TSDBError.createSQLException(TSDBErrorNumbers.ERROR_JNI_RESULT_SET_NULL);
}
// close statement
connector.executeQuery("use d");
String[] lines = new String[] {"st,t1=3i64,t2=4f64,t3=\"t3\" c1=3i64,c3=L\"passit\",c2=false,c4=4f64 1626006833639000000ns",
"st,t1=4i64,t3=\"t4\",t2=5f64,t4=5f64 c1=3i64,c3=L\"passitagin\",c2=true,c4=5f64,c5=5f64 1626006833640000000ns"};
connector.insertLines(lines);
// close connection
connector.closeConnection();

View File

@ -1,6 +1,5 @@
package com.taosdata.jdbc;
import com.taosdata.jdbc.rs.RestfulParameterMetaData;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;

View File

@ -2,7 +2,6 @@ package com.taosdata.jdbc;
import org.junit.*;
import java.io.IOException;
import java.math.BigDecimal;
import java.sql.*;
import java.util.ArrayList;
@ -26,7 +25,7 @@ public class TSDBPreparedStatementTest {
long ts = System.currentTimeMillis();
pstmt_insert.setTimestamp(1, new Timestamp(ts));
pstmt_insert.setInt(2, 2);
pstmt_insert.setLong(3, 3l);
pstmt_insert.setLong(3, 3L);
pstmt_insert.setFloat(4, 3.14f);
pstmt_insert.setDouble(5, 3.1415);
pstmt_insert.setShort(6, (short) 6);
@ -53,8 +52,8 @@ public class TSDBPreparedStatementTest {
Assert.assertEquals(ts, rs.getTimestamp(1).getTime());
Assert.assertEquals(2, rs.getInt(2));
Assert.assertEquals(2, rs.getInt("f1"));
Assert.assertEquals(3l, rs.getLong(3));
Assert.assertEquals(3l, rs.getLong("f2"));
Assert.assertEquals(3L, rs.getLong(3));
Assert.assertEquals(3L, rs.getLong("f2"));
Assert.assertEquals(3.14f, rs.getFloat(4), 0.0);
Assert.assertEquals(3.14f, rs.getFloat("f3"), 0.0);
Assert.assertEquals(3.1415, rs.getDouble(5), 0.0);
@ -312,14 +311,14 @@ public class TSDBPreparedStatementTest {
Random r = new Random();
s.setTableName("weather_test");
ArrayList<Long> ts = new ArrayList<Long>();
ArrayList<Long> ts = new ArrayList<>();
for (int i = 0; i < numOfRows; i++) {
ts.add(System.currentTimeMillis() + i);
}
s.setTimestamp(0, ts);
int random = 10 + r.nextInt(5);
ArrayList<String> s2 = new ArrayList<String>();
ArrayList<String> s2 = new ArrayList<>();
for (int i = 0; i < numOfRows; i++) {
if (i % random == 0) {
s2.add(null);
@ -330,7 +329,7 @@ public class TSDBPreparedStatementTest {
s.setNString(1, s2, 4);
random = 10 + r.nextInt(5);
ArrayList<Float> s3 = new ArrayList<Float>();
ArrayList<Float> s3 = new ArrayList<>();
for (int i = 0; i < numOfRows; i++) {
if (i % random == 0) {
s3.add(null);
@ -341,7 +340,7 @@ public class TSDBPreparedStatementTest {
s.setFloat(2, s3);
random = 10 + r.nextInt(5);
ArrayList<Double> s4 = new ArrayList<Double>();
ArrayList<Double> s4 = new ArrayList<>();
for (int i = 0; i < numOfRows; i++) {
if (i % random == 0) {
s4.add(null);
@ -352,7 +351,7 @@ public class TSDBPreparedStatementTest {
s.setDouble(3, s4);
random = 10 + r.nextInt(5);
ArrayList<Long> ts2 = new ArrayList<Long>();
ArrayList<Long> ts2 = new ArrayList<>();
for (int i = 0; i < numOfRows; i++) {
if (i % random == 0) {
ts2.add(null);
@ -379,13 +378,13 @@ public class TSDBPreparedStatementTest {
if (i % random == 0) {
sb.add(null);
} else {
sb.add(i % 2 == 0 ? true : false);
sb.add(i % 2 == 0);
}
}
s.setBoolean(6, sb);
random = 10 + r.nextInt(5);
ArrayList<String> s5 = new ArrayList<String>();
ArrayList<String> s5 = new ArrayList<>();
for (int i = 0; i < numOfRows; i++) {
if (i % random == 0) {
s5.add(null);
@ -424,14 +423,14 @@ public class TSDBPreparedStatementTest {
Random r = new Random();
s.setTableName("weather_test");
ArrayList<Long> ts = new ArrayList<Long>();
ArrayList<Long> ts = new ArrayList<>();
for (int i = 0; i < numOfRows; i++) {
ts.add(System.currentTimeMillis() + i);
}
s.setTimestamp(0, ts);
int random = 10 + r.nextInt(5);
ArrayList<String> s2 = new ArrayList<String>();
ArrayList<String> s2 = new ArrayList<>();
for (int i = 0; i < numOfRows; i++) {
if (i % random == 0) {
s2.add(null);
@ -442,7 +441,7 @@ public class TSDBPreparedStatementTest {
s.setNString(1, s2, 4);
random = 10 + r.nextInt(5);
ArrayList<String> s3 = new ArrayList<String>();
ArrayList<String> s3 = new ArrayList<>();
for (int i = 0; i < numOfRows; i++) {
if (i % random == 0) {
s3.add(null);
@ -471,7 +470,7 @@ public class TSDBPreparedStatementTest {
public void bindDataWithSingleTagTest() throws SQLException {
Statement stmt = conn.createStatement();
String types[] = new String[]{"tinyint", "smallint", "int", "bigint", "bool", "float", "double", "binary(10)", "nchar(10)"};
String[] types = new String[]{"tinyint", "smallint", "int", "bigint", "bool", "float", "double", "binary(10)", "nchar(10)"};
for (String type : types) {
stmt.execute("drop table if exists weather_test");
@ -510,21 +509,21 @@ public class TSDBPreparedStatementTest {
}
ArrayList<Long> ts = new ArrayList<Long>();
ArrayList<Long> ts = new ArrayList<>();
for (int i = 0; i < numOfRows; i++) {
ts.add(System.currentTimeMillis() + i);
}
s.setTimestamp(0, ts);
int random = 10 + r.nextInt(5);
ArrayList<String> s2 = new ArrayList<String>();
ArrayList<String> s2 = new ArrayList<>();
for (int i = 0; i < numOfRows; i++) {
s2.add("分支" + i % 4);
}
s.setNString(1, s2, 10);
random = 10 + r.nextInt(5);
ArrayList<String> s3 = new ArrayList<String>();
ArrayList<String> s3 = new ArrayList<>();
for (int i = 0; i < numOfRows; i++) {
s3.add("test" + i % 4);
}
@ -561,13 +560,13 @@ public class TSDBPreparedStatementTest {
s.setTagString(1, "test");
ArrayList<Long> ts = new ArrayList<Long>();
ArrayList<Long> ts = new ArrayList<>();
for (int i = 0; i < numOfRows; i++) {
ts.add(System.currentTimeMillis() + i);
}
s.setTimestamp(0, ts);
ArrayList<String> s2 = new ArrayList<String>();
ArrayList<String> s2 = new ArrayList<>();
for (int i = 0; i < numOfRows; i++) {
s2.add("test" + i % 4);
}
@ -788,7 +787,7 @@ public class TSDBPreparedStatementTest {
public void setBigDecimal() throws SQLException {
// given
long ts = System.currentTimeMillis();
BigDecimal bigDecimal = new BigDecimal(3.14444);
BigDecimal bigDecimal = new BigDecimal("3.14444");
// when
pstmt_insert.setTimestamp(1, new Timestamp(ts));
@ -999,7 +998,7 @@ public class TSDBPreparedStatementTest {
long ts = System.currentTimeMillis();
pstmt_insert.setTimestamp(1, new Timestamp(ts));
pstmt_insert.setInt(2, 2);
pstmt_insert.setLong(3, 3l);
pstmt_insert.setLong(3, 3L);
pstmt_insert.setFloat(4, 3.14f);
pstmt_insert.setDouble(5, 3.1415);
pstmt_insert.setShort(6, (short) 6);

View File

@ -95,13 +95,13 @@ public class TSDBResultSetTest {
@Test
public void getBigDecimal() throws SQLException {
BigDecimal f1 = rs.getBigDecimal("f1");
Assert.assertEquals(1609430400000l, f1.longValue());
Assert.assertEquals(1609430400000L, f1.longValue());
BigDecimal f2 = rs.getBigDecimal("f2");
Assert.assertEquals(1, f2.intValue());
BigDecimal f3 = rs.getBigDecimal("f3");
Assert.assertEquals(100l, f3.longValue());
Assert.assertEquals(100L, f3.longValue());
BigDecimal f4 = rs.getBigDecimal("f4");
Assert.assertEquals(3.1415f, f4.floatValue(), 0.00000f);
@ -125,13 +125,13 @@ public class TSDBResultSetTest {
Assert.assertEquals(1, Ints.fromByteArray(f2));
byte[] f3 = rs.getBytes("f3");
Assert.assertEquals(100l, Longs.fromByteArray(f3));
Assert.assertEquals(100L, Longs.fromByteArray(f3));
byte[] f4 = rs.getBytes("f4");
Assert.assertEquals(3.1415f, Float.valueOf(new String(f4)), 0.000000f);
Assert.assertEquals(3.1415f, Float.parseFloat(new String(f4)), 0.000000f);
byte[] f5 = rs.getBytes("f5");
Assert.assertEquals(3.1415926, Double.valueOf(new String(f5)), 0.000000f);
Assert.assertEquals(3.1415926, Double.parseDouble(new String(f5)), 0.000000f);
byte[] f6 = rs.getBytes("f6");
Assert.assertTrue(Arrays.equals("abc".getBytes(), f6));
@ -223,7 +223,7 @@ public class TSDBResultSetTest {
Object f3 = rs.getObject("f3");
Assert.assertEquals(Long.class, f3.getClass());
Assert.assertEquals(100l, f3);
Assert.assertEquals(100L, f3);
Object f4 = rs.getObject("f4");
Assert.assertEquals(Float.class, f4.getClass());
@ -421,7 +421,7 @@ public class TSDBResultSetTest {
@Test(expected = SQLFeatureNotSupportedException.class)
public void updateLong() throws SQLException {
rs.updateLong(1, 1l);
rs.updateLong(1, 1L);
}
@Test(expected = SQLFeatureNotSupportedException.class)

View File

@ -3,12 +3,13 @@ package com.taosdata.jdbc.cases;
import com.taosdata.jdbc.TSDBDriver;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
import java.io.IOException;
import java.sql.*;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.Properties;
public class BadLocaleSettingTest {

View File

@ -1,6 +1,7 @@
package com.taosdata.jdbc.cases;
import com.taosdata.jdbc.TSDBDriver;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
@ -9,13 +10,13 @@ import java.util.Properties;
public class ConnectMultiTaosdByRestfulWithDifferentTokenTest {
private static String host1 = "192.168.17.156";
private static String user1 = "root";
private static String password1 = "tqueue";
private static final String host1 = "192.168.17.156";
private static final String user1 = "root";
private static final String password1 = "tqueue";
private Connection conn1;
private static String host2 = "192.168.17.82";
private static String user2 = "root";
private static String password2 = "taosdata";
private static final String host2 = "192.168.17.82";
private static final String user2 = "root";
private static final String password2 = "taosdata";
private Connection conn2;
@Test
@ -30,6 +31,7 @@ public class ConnectMultiTaosdByRestfulWithDifferentTokenTest {
try (Statement stmt = connection.createStatement()) {
ResultSet rs = stmt.executeQuery("select server_status()");
ResultSetMetaData meta = rs.getMetaData();
Assert.assertNotNull(meta);
while (rs.next()) {
}
} catch (SQLException e) {

View File

@ -13,7 +13,7 @@ import java.util.Properties;
public class DriverAutoloadTest {
private Properties properties;
private String host = "127.0.0.1";
private final String host = "127.0.0.1";
@Test
public void testRestful() throws SQLException {

View File

@ -13,9 +13,9 @@ import java.util.Random;
@FixMethodOrder(MethodSorters.NAME_ASCENDING)
public class InsertDbwithoutUseDbTest {
private static String host = "127.0.0.1";
private static final String host = "127.0.0.1";
private static Properties properties;
private static Random random = new Random(System.currentTimeMillis());
private static final Random random = new Random(System.currentTimeMillis());
@Test
public void case001() throws ClassNotFoundException, SQLException {

View File

@ -8,14 +8,14 @@ public class InsertSpecialCharacterJniTest {
private static final String host = "127.0.0.1";
private static Connection conn;
private static String dbName = "spec_char_test";
private static String tbname1 = "test";
private static String tbname2 = "weather";
private static String special_character_str_1 = "$asd$$fsfsf$";
private static String special_character_str_2 = "\\\\asdfsfsf\\\\";
private static String special_character_str_3 = "\\\\asdfsfsf\\";
private static String special_character_str_4 = "?asd??fsf?sf?";
private static String special_character_str_5 = "?#sd@$f(('<(s[P)>\"){]}f?s[]{}%vaew|\"fsfs^a&d*jhg)(j))(f@~!?$";
private static final String dbName = "spec_char_test";
private static final String tbname1 = "test";
private static final String tbname2 = "weather";
private static final String special_character_str_1 = "$asd$$fsfsf$";
private static final String special_character_str_2 = "\\\\asdfsfsf\\\\";
private static final String special_character_str_3 = "\\\\asdfsfsf\\";
private static final String special_character_str_4 = "?asd??fsf?sf?";
private static final String special_character_str_5 = "?#sd@$f(('<(s[P)>\"){]}f?s[]{}%vaew|\"fsfs^a&d*jhg)(j))(f@~!?$";
@Test
public void testCase01() throws SQLException {

View File

@ -8,14 +8,14 @@ public class InsertSpecialCharacterRestfulTest {
private static final String host = "127.0.0.1";
private static Connection conn;
private static String dbName = "spec_char_test";
private static String tbname1 = "test";
private static String tbname2 = "weather";
private static String special_character_str_1 = "$asd$$fsfsf$";
private static String special_character_str_2 = "\\\\asdfsfsf\\\\";
private static String special_character_str_3 = "\\\\asdfsfsf\\";
private static String special_character_str_4 = "?asd??fsf?sf?";
private static String special_character_str_5 = "?#sd@$f(('<(s[P)>\"){]}f?s[]{}%vaew|\"fsfs^a&d*jhg)(j))(f@~!?$";
private static final String dbName = "spec_char_test";
private static final String tbname1 = "test";
private static final String tbname2 = "weather";
private static final String special_character_str_1 = "$asd$$fsfsf$";
private static final String special_character_str_2 = "\\\\asdfsfsf\\\\";
private static final String special_character_str_3 = "\\\\asdfsfsf\\";
private static final String special_character_str_4 = "?asd??fsf?sf?";
private static final String special_character_str_5 = "?#sd@$f(('<(s[P)>\"){]}f?s[]{}%vaew|\"fsfs^a&d*jhg)(j))(f@~!?$";
@Test
public void testCase01() throws SQLException {

View File

@ -8,13 +8,13 @@ import java.util.Properties;
public class InvalidResultSetPointerTest {
private static String host = "127.0.0.1";
private static final String host = "127.0.0.1";
private static final String dbName = "test";
private static final String stbName = "stb";
private static final String tbName = "tb";
private static Connection connection;
private static int numOfSTb = 30000;
private static int numOfTb = 3;
private static final int numOfSTb = 30000;
private static final int numOfTb = 3;
private static int numOfThreads = 100;
@Test
@ -74,7 +74,7 @@ public class InvalidResultSetPointerTest {
b = numOfSTb % numOfThreads;
}
multiThreadingClass instance[] = new multiThreadingClass[numOfThreads];
multiThreadingClass[] instance = new multiThreadingClass[numOfThreads];
int last = 0;
for (int i = 0; i < numOfThreads; i++) {

View File

@ -9,8 +9,7 @@ import java.util.concurrent.TimeUnit;
public class MultiThreadsWithSameStatementTest {
private class Service {
private static class Service {
public Connection conn;
public Statement stmt;

View File

@ -0,0 +1,51 @@
package com.taosdata.jdbc.cases;
import com.taosdata.jdbc.TSDBDriver;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import java.sql.*;
import java.util.Properties;
import static org.junit.Assert.assertEquals;
public class ResetQueryCacheTest {
static Connection connection;
static Statement statement;
static String host = "127.0.0.1";
@Before
public void init() {
try {
Properties properties = new Properties();
properties.setProperty(TSDBDriver.PROPERTY_KEY_CHARSET, "UTF-8");
properties.setProperty(TSDBDriver.PROPERTY_KEY_LOCALE, "en_US.UTF-8");
properties.setProperty(TSDBDriver.PROPERTY_KEY_TIME_ZONE, "UTC-8");
connection = DriverManager.getConnection("jdbc:TAOS://" + host + ":0/", properties);
statement = connection.createStatement();
} catch (SQLException e) {
return;
}
}
@Test
public void testResetQueryCache() throws SQLException {
String resetSql = "reset query cache";
statement.execute(resetSql);
}
@After
public void close() {
try {
if (statement != null)
statement.close();
if (connection != null)
connection.close();
} catch (SQLException e) {
e.printStackTrace();
}
}
}

View File

@ -16,9 +16,9 @@ import static org.junit.Assert.assertEquals;
public class StableTest {
private static Connection connection;
private static String dbName = "test";
private static String stbName = "st";
private static String host = "127.0.0.1";
private static final String dbName = "test";
private static final String stbName = "st";
private static final String host = "127.0.0.1";
@BeforeClass
public static void createDatabase() {

View File

@ -25,7 +25,7 @@ public class TaosInfoMonitorTest {
return null;
}).collect(Collectors.toList());
connectionList.stream().forEach(conn -> {
connectionList.forEach(conn -> {
try (Statement stmt = conn.createStatement()) {
ResultSet rs = stmt.executeQuery("show databases");
while (rs.next()) {
@ -37,7 +37,7 @@ public class TaosInfoMonitorTest {
}
});
connectionList.stream().forEach(conn -> {
connectionList.forEach(conn -> {
try {
conn.close();
TimeUnit.MILLISECONDS.sleep(100);

View File

@ -22,7 +22,7 @@ public class TimestampPrecisionInNanoInJniTest {
private static final long timestamp3 = (timestamp1 + 10) * 1000_000 + 123456;
private static final Format format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
private static final String date1 = format.format(new Date(timestamp1));
private static final String date4 = format.format(new Date(timestamp1 + 10l));
private static final String date4 = format.format(new Date(timestamp1 + 10L));
private static final String date2 = date1 + "123455";
private static final String date3 = date4 + "123456";

View File

@ -44,7 +44,7 @@ public class UnsignedNumberJniTest {
Assert.assertEquals(127, rs.getByte(2));
Assert.assertEquals(32767, rs.getShort(3));
Assert.assertEquals(2147483647, rs.getInt(4));
Assert.assertEquals(9223372036854775807l, rs.getLong(5));
Assert.assertEquals(9223372036854775807L, rs.getLong(5));
}
} catch (SQLException e) {
e.printStackTrace();

View File

@ -45,7 +45,7 @@ public class UnsignedNumberRestfulTest {
Assert.assertEquals(127, rs.getByte(2));
Assert.assertEquals(32767, rs.getShort(3));
Assert.assertEquals(2147483647, rs.getInt(4));
Assert.assertEquals(9223372036854775807l, rs.getLong(5));
Assert.assertEquals(9223372036854775807L, rs.getLong(5));
}
} catch (SQLException e) {
e.printStackTrace();

View File

@ -95,13 +95,13 @@ public class RestfulResultSetTest {
public void getBigDecimal() throws SQLException {
BigDecimal f1 = rs.getBigDecimal("f1");
long actual = (f1 == null) ? 0 : f1.longValue();
Assert.assertEquals(1609430400000l, actual);
Assert.assertEquals(1609430400000L, actual);
BigDecimal f2 = rs.getBigDecimal("f2");
Assert.assertEquals(1, f2.intValue());
BigDecimal f3 = rs.getBigDecimal("f3");
Assert.assertEquals(100l, f3.longValue());
Assert.assertEquals(100L, f3.longValue());
BigDecimal f4 = rs.getBigDecimal("f4");
Assert.assertEquals(3.1415f, f4.floatValue(), 0.00000f);
@ -125,13 +125,13 @@ public class RestfulResultSetTest {
Assert.assertEquals(1, Ints.fromByteArray(f2));
byte[] f3 = rs.getBytes("f3");
Assert.assertEquals(100l, Longs.fromByteArray(f3));
Assert.assertEquals(100L, Longs.fromByteArray(f3));
byte[] f4 = rs.getBytes("f4");
Assert.assertEquals(3.1415f, Float.valueOf(new String(f4)), 0.000000f);
Assert.assertEquals(3.1415f, Float.parseFloat(new String(f4)), 0.000000f);
byte[] f5 = rs.getBytes("f5");
Assert.assertEquals(3.1415926, Double.valueOf(new String(f5)), 0.000000f);
Assert.assertEquals(3.1415926, Double.parseDouble(new String(f5)), 0.000000f);
byte[] f6 = rs.getBytes("f6");
Assert.assertEquals("abc", new String(f6));
@ -222,7 +222,7 @@ public class RestfulResultSetTest {
Object f3 = rs.getObject("f3");
Assert.assertEquals(Long.class, f3.getClass());
Assert.assertEquals(100l, f3);
Assert.assertEquals(100L, f3);
Object f4 = rs.getObject("f4");
Assert.assertEquals(Float.class, f4.getClass());
@ -434,7 +434,7 @@ public class RestfulResultSetTest {
@Test(expected = SQLFeatureNotSupportedException.class)
public void updateLong() throws SQLException {
rs.updateLong(1, 1l);
rs.updateLong(1, 1L);
}
@Test(expected = SQLFeatureNotSupportedException.class)

View File

@ -10,17 +10,17 @@ public class OSUtilsTest {
@Test
public void inWindows() {
Assert.assertEquals(OS.indexOf("win") >= 0, OSUtils.isWindows());
Assert.assertEquals(OS.contains("win"), OSUtils.isWindows());
}
@Test
public void isMac() {
Assert.assertEquals(OS.indexOf("mac") >= 0, OSUtils.isMac());
Assert.assertEquals(OS.contains("mac"), OSUtils.isMac());
}
@Test
public void isLinux() {
Assert.assertEquals(OS.indexOf("nux") >= 0, OSUtils.isLinux());
Assert.assertEquals(OS.contains("nux"), OSUtils.isLinux());
}
@Before

View File

@ -21,47 +21,4 @@ public class TimestampUtil {
SimpleDateFormat sdf = new SimpleDateFormat(datetimeFormat);
return sdf.format(new Date(time));
}
public static class TimeTuple {
public Long start;
public Long end;
public Long timeGap;
TimeTuple(long start, long end, long timeGap) {
this.start = start;
this.end = end;
this.timeGap = timeGap;
}
}
public static TimeTuple range(long start, long timeGap, long size) {
long now = System.currentTimeMillis();
if (timeGap < 1)
timeGap = 1;
if (start == 0)
start = now - size * timeGap;
// 如果size小于1异常
if (size < 1)
throw new IllegalArgumentException("size less than 1.");
// 如果timeGap为1已经超长需要前移start
if (start + size > now) {
start = now - size;
return new TimeTuple(start, now, 1);
}
long end = start + (long) (timeGap * size);
if (end > now) {
//压缩timeGap
end = now;
double gap = (end - start) / (size * 1.0f);
if (gap < 1.0f) {
timeGap = 1;
start = end - size;
} else {
timeGap = (long) gap;
end = start + (long) (timeGap * size);
}
}
return new TimeTuple(start, end, timeGap);
}
}

View File

@ -3,8 +3,6 @@ package com.taosdata.jdbc.utils;
import org.junit.Assert;
import org.junit.Test;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Stream;
public class UtilsTest {
@ -19,14 +17,14 @@ public class UtilsTest {
Assert.assertEquals("\\'\\'\\'\\'\\'a\\'", news);
// given
s = "\'''''a\\'";
s = "'''''a\\'";
// when
news = Utils.escapeSingleQuota(s);
// then
Assert.assertEquals("\\'\\'\\'\\'\\'a\\'", news);
// given
s = "\'\'\'\''a\\'";
s = "'''''a\\'";
// when
news = Utils.escapeSingleQuota(s);
// then

View File

@ -403,6 +403,20 @@ class CTaosInterface(object):
"""
return CTaosInterface.libtaos.taos_affected_rows(result)
@staticmethod
def insertLines(connection, lines):
'''
insert through lines protocol
@lines: list of str
@rtype: tsdb error codes
'''
numLines = len(lines)
c_lines_type = ctypes.c_char_p*numLines
c_lines = c_lines_type()
for i in range(numLines):
c_lines[i] = ctypes.c_char_p(lines[i].encode('utf-8'))
return CTaosInterface.libtaos.taos_insert_lines(connection, c_lines, ctypes.c_int(numLines))
@staticmethod
def subscribe(connection, restart, topic, sql, interval):
"""Create a subscription

View File

@ -66,6 +66,14 @@ class TDengineConnection(object):
self._conn, restart, topic, sql, interval)
return TDengineSubscription(sub)
def insertLines(self, lines):
"""
insert lines through line protocol
"""
if self._conn is None:
return None
return CTaosInterface.insertLines(self._conn, lines)
def cursor(self):
"""Return a new Cursor object using the connection.
"""

View File

@ -150,6 +150,8 @@ static void *dnodeProcessMPeerQueue(void *param) {
SMnodeMsg *pPeerMsg;
int32_t type;
void * unUsed;
setThreadName("dnodeMPeerQ");
while (1) {
if (taosReadQitemFromQset(tsMPeerQset, &type, (void **)&pPeerMsg, &unUsed) == 0) {

View File

@ -155,6 +155,8 @@ static void *dnodeProcessMReadQueue(void *param) {
int32_t type;
void * unUsed;
setThreadName("dnodeMReadQ");
while (1) {
if (taosReadQitemFromQset(tsMReadQset, &type, (void **)&pRead, &unUsed) == 0) {
dDebug("qset:%p, mnode read got no message from qset, exiting", tsMReadQset);

View File

@ -168,7 +168,9 @@ static void *dnodeProcessMWriteQueue(void *param) {
SMnodeMsg *pWrite;
int32_t type;
void * unUsed;
setThreadName("dnodeMWriteQ");
while (1) {
if (taosReadQitemFromQset(tsMWriteQset, &type, (void **)&pWrite, &unUsed) == 0) {
dDebug("qset:%p, mnode write got no message from qset, exiting", tsMWriteQset);

View File

@ -245,6 +245,8 @@ static void* telemetryThread(void* param) {
clock_gettime(CLOCK_REALTIME, &end);
end.tv_sec += 300; // wait 5 minutes before send first report
setThreadName("telemetryThrd");
while (!tsExit) {
int r = 0;
struct timespec ts = end;

View File

@ -103,6 +103,8 @@ static void *dnodeProcessMgmtQueue(void *wparam) {
int32_t qtype;
void * handle;
setThreadName("dnodeMgmtQ");
while (1) {
if (taosReadQitemFromQset(pPool->qset, &qtype, (void **)&pMgmt, &handle) == 0) {
dDebug("qdnode mgmt got no message from qset:%p, , exit", pPool->qset);

View File

@ -118,6 +118,11 @@ static void *dnodeProcessReadQueue(void *wparam) {
SVReadMsg * pRead;
int32_t qtype;
void * pVnode;
char name[16];
memset(name, 0, 16);
snprintf(name, 16, "%s-dnReadQ", pPool->name);
setThreadName(name);
while (1) {
if (taosReadQitemFromQset(pPool->qset, &qtype, (void **)&pRead, &pVnode) == 0) {

View File

@ -191,6 +191,8 @@ static void *dnodeProcessVWriteQueue(void *wparam) {
taosBlockSIGPIPE();
dDebug("dnode vwrite worker:%d is running", pWorker->workerId);
setThreadName("dnodeWriteQ");
while (1) {
numOfMsgs = taosReadAllQitemsFromQset(pWorker->qset, pWorker->qall, &pVnode);
if (numOfMsgs == 0) {

View File

@ -91,6 +91,8 @@ static void *dnodeOpenVnode(void *param) {
dDebug("thread:%d, start to open %d vnodes", pThread->threadIndex, pThread->vnodeNum);
setThreadName("dnodeOpenVnode");
for (int32_t v = 0; v < pThread->vnodeNum; ++v) {
int32_t vgId = pThread->vnodeList[v];
snprintf(stepDesc, TSDB_STEP_DESC_LEN, "vgId:%d, start to restore, %d of %d have been opened", vgId, tsOpenVnodes, tsTotalVnodes);

View File

@ -111,10 +111,12 @@ typedef struct TAOS_MULTI_BIND {
} TAOS_MULTI_BIND;
DLL_EXPORT TAOS_STMT *taos_stmt_init(TAOS *taos);
DLL_EXPORT int taos_stmt_prepare(TAOS_STMT *stmt, const char *sql, unsigned long length);
DLL_EXPORT int taos_stmt_set_tbname_tags(TAOS_STMT* stmt, const char* name, TAOS_BIND* tags);
DLL_EXPORT int taos_stmt_set_tbname(TAOS_STMT* stmt, const char* name);
DLL_EXPORT int taos_stmt_set_sub_tbname(TAOS_STMT* stmt, const char* name);
DLL_EXPORT int taos_stmt_is_insert(TAOS_STMT *stmt, int *insert);
DLL_EXPORT int taos_stmt_num_params(TAOS_STMT *stmt, int *nums);
int taos_stmt_get_param(TAOS_STMT *stmt, int idx, int *type, int *bytes);

View File

@ -101,6 +101,7 @@ int32_t* taosGetErrno();
#define TSDB_CODE_TSC_INVALID_TABLE_NAME TAOS_DEF_ERROR_CODE(0, 0x0218) //"Table does not exist")
#define TSDB_CODE_TSC_EXCEED_SQL_LIMIT TAOS_DEF_ERROR_CODE(0, 0x0219) //"SQL statement too long check maxSQLLength config")
#define TSDB_CODE_TSC_FILE_EMPTY TAOS_DEF_ERROR_CODE(0, 0x021A) //"File is empty")
#define TSDB_CODE_TSC_LINE_SYNTAX_ERROR TAOS_DEF_ERROR_CODE(0, 0x021B) //"Syntax error in Line")
// mnode
#define TSDB_CODE_MND_MSG_NOT_PROCESSED TAOS_DEF_ERROR_CODE(0, 0x0300) //"Message not processed")

View File

@ -104,6 +104,8 @@ static void shellFreeTbnames() {
static void *shellCheckThreadFp(void *arg) {
ShellThreadObj *pThread = (ShellThreadObj *)arg;
setThreadName("shellCheckThrd");
int32_t interval = tbNum / pThread->totalThreads + 1;
int32_t start = pThread->threadIndex * interval;
int32_t end = (pThread->threadIndex + 1) * interval;

View File

@ -336,6 +336,8 @@ void *shellLoopQuery(void *arg) {
TAOS *con = (TAOS *)arg;
setThreadName("shellLoopQuery");
pthread_cleanup_push(cleanup_handler, NULL);
char *command = malloc(MAX_COMMAND_SIZE);

View File

@ -223,6 +223,8 @@ static void shellSourceFile(TAOS *con, char *fptr) {
void* shellImportThreadFp(void *arg)
{
ShellThreadObj *pThread = (ShellThreadObj*)arg;
setThreadName("shellImportThrd");
for (int f = 0; f < shellSQLFileNum; ++f) {
if (f % pThread->totalThreads == pThread->threadIndex) {
char *SQLFileName = shellSQLFiles[f];

View File

@ -336,6 +336,8 @@ void *shellLoopQuery(void *arg) {
TAOS *con = (TAOS *)arg;
setThreadName("shellLoopQuery");
pthread_cleanup_push(cleanup_handler, NULL);
char *command = malloc(MAX_COMMAND_SIZE);

View File

@ -26,6 +26,8 @@ void shellQueryInterruptHandler(int32_t signum, void *sigInfo, void *context) {
}
void *cancelHandler(void *arg) {
setThreadName("cancelHandler");
while(1) {
if (tsem_wait(&cancelSem) != 0) {
taosMsleep(10);

File diff suppressed because it is too large Load Diff

View File

@ -1474,6 +1474,8 @@ static void* taosDumpOutWorkThreadFp(void *arg)
STableRecord tableRecord;
int fd;
setThreadName("dumpOutWorkThrd");
char tmpBuf[4096] = {0};
sprintf(tmpBuf, ".tables.tmp.%d", pThread->threadIndex);
fd = open(tmpBuf, O_RDWR | O_CREAT, S_IRWXU | S_IRGRP | S_IXGRP | S_IROTH);
@ -2571,6 +2573,8 @@ static int taosDumpInOneFile(TAOS* taos, FILE* fp, char* fcharset,
static void* taosDumpInWorkThreadFp(void *arg)
{
SThreadParaObj *pThread = (SThreadParaObj*)arg;
setThreadName("dumpInWorkThrd");
for (int32_t f = 0; f < g_tsSqlFileNum; ++f) {
if (f % pThread->totalThreads == pThread->threadIndex) {
char *SQLFileName = g_tsDumpInSqlFiles[f];

View File

@ -1113,6 +1113,7 @@ static void *sdbWorkerFp(void *pWorker) {
void * unUsed;
taosBlockSIGPIPE();
setThreadName("sdbWorker");
while (1) {
int32_t numOfMsgs = taosReadAllQitemsFromQset(tsSdbWQset, tsSdbWQall, &unUsed);

View File

@ -210,6 +210,25 @@ extern "C" {
#define PRIzu "zu"
#endif
#if defined(_TD_LINUX_64) || defined(_TD_LINUX_32) || defined(_TD_MIPS_64) || defined(_TD_ARM_32) || defined(_TD_ARM_64) || defined(_TD_DARWIN_64)
#if defined(_TD_DARWIN_64)
// MacOS
#if !defined(_GNU_SOURCE)
#define setThreadName(name) do { pthread_setname_np((name)); } while (0)
#else
// pthread_setname_np not defined
#define setThreadName(name)
#endif
#else
// Linux, length of name must <= 16 (the last '\0' included)
#define setThreadName(name) do { prctl(PR_SET_NAME, (name)); } while (0)
#endif
#else
// Windows
#define setThreadName(name)
#endif
#ifdef __cplusplus
}
#endif

View File

@ -85,6 +85,7 @@ extern "C" {
#include <sys/eventfd.h>
#include <sys/resource.h>
#include <sys/sendfile.h>
#include <sys/prctl.h>
#if !(defined(_ALPINE))
#include <error.h>

View File

@ -41,6 +41,8 @@ static semaphore_t sem_exit;
static void* sem_thread_routine(void *arg) {
(void)arg;
setThreadName("sem_thrd");
sem_port = mach_task_self();
kern_return_t ret = semaphore_create(sem_port, &sem_exit, SYNC_POLICY_FIFO, 0);
if (ret != KERN_SUCCESS) {

View File

@ -32,6 +32,7 @@ static volatile int timer_stop = 0;
static void* timer_routine(void *arg) {
(void)arg;
setThreadName("timer");
int r = 0;
struct timespec to = {0};

View File

@ -38,6 +38,8 @@ static void *taosProcessAlarmSignal(void *tharg) {
struct sigevent sevent = {{0}};
setThreadName("alarmSignal");
#ifdef _ALPINE
sevent.sigev_notify = SIGEV_THREAD;
sevent.sigev_value.sival_int = syscall(__NR_gettid);

View File

@ -70,6 +70,8 @@ static void *httpProcessResultQueue(void *param) {
int32_t type;
void * unUsed;
setThreadName("httpResultQ");
while (1) {
if (taosReadQitemFromQset(tsHttpQset, &type, (void **)&pMsg, &unUsed) == 0) {
httpDebug("qset:%p, http queue got no message from qset, exiting", tsHttpQset);

View File

@ -117,6 +117,7 @@ static void httpProcessHttpData(void *param) {
int32_t fdNum;
taosSetMaskSIGPIPE();
setThreadName("httpData");
while (1) {
struct epoll_event events[HTTP_MAX_EVENTS];
@ -208,6 +209,7 @@ static void *httpAcceptHttpConnection(void *arg) {
int32_t totalFds = 0;
taosSetMaskSIGPIPE();
setThreadName("httpAcceptConn");
pServer->fd = taosOpenTcpServerSocket(pServer->serverIp, pServer->serverPort);

View File

@ -114,6 +114,7 @@ int32_t monStartSystem() {
static void *monThreadFunc(void *param) {
monDebug("starting to initialize monitor module ...");
setThreadName("monThrd");
while (1) {
static int32_t accessTimes = 0;

View File

@ -100,6 +100,8 @@ void mqttPublishCallback(void** unused, struct mqtt_response_publish* published)
}
void* mqttClientRefresher(void* client) {
setThreadName("mqttCliRefresh");
while (tsMqttIsRuning) {
mqtt_sync((struct mqtt_client*)client);
taosMsleep(100);
@ -141,4 +143,4 @@ void mqttReconnectClient(struct mqtt_client* client, void** unused) {
mqtt_reinit(client, sockfd, tsMqttStatus.sendbuf, tsMqttStatus.sendbufsz, tsMqttStatus.recvbuf, tsMqttStatus.recvbufsz);
mqtt_connect(client, tsMqttClientId, NULL, NULL, 0, tsMqttUser, tsMqttPass, MQTT_CONNECT_CLEAN_SESSION, 400);
mqtt_subscribe(client, tsMqttTopic, 0);
}
}

View File

@ -254,7 +254,7 @@ typedef struct tSqlExpr {
struct SArray *paramList; // function parameters list
} Expr;
uint32_t functionId; // function id, todo remove it
int32_t functionId; // function id, todo remove it
SStrToken columnName; // table column info
tVariant value; // the use input value
SStrToken exprToken; // original sql expr string

View File

@ -2427,7 +2427,7 @@ static void updateDataCheckOrder(SQInfo *pQInfo, SQueryTableMsg* pQueryMsg, bool
if (pQueryAttr->pointInterpQuery && pQueryAttr->interval.interval == 0) {
if (!QUERY_IS_ASC_QUERY(pQueryAttr)) {
qDebug(msg, pQInfo, "interp", pQueryAttr->order.order, TSDB_ORDER_ASC, pQueryAttr->window.skey, pQueryAttr->window.ekey, pQueryAttr->window.ekey, pQueryAttr->window.skey);
qDebug(msg, pQInfo->qId, "interp", pQueryAttr->order.order, TSDB_ORDER_ASC, pQueryAttr->window.skey, pQueryAttr->window.ekey, pQueryAttr->window.ekey, pQueryAttr->window.skey);
SWAP(pQueryAttr->window.skey, pQueryAttr->window.ekey, TSKEY);
}
@ -2438,7 +2438,7 @@ static void updateDataCheckOrder(SQInfo *pQInfo, SQueryTableMsg* pQueryMsg, bool
if (pQueryAttr->interval.interval == 0) {
if (onlyFirstQuery(pQueryAttr)) {
if (!QUERY_IS_ASC_QUERY(pQueryAttr)) {
qDebug(msg, pQInfo, "only-first", pQueryAttr->order.order, TSDB_ORDER_ASC, pQueryAttr->window.skey,
qDebug(msg, pQInfo->qId, "only-first", pQueryAttr->order.order, TSDB_ORDER_ASC, pQueryAttr->window.skey,
pQueryAttr->window.ekey, pQueryAttr->window.ekey, pQueryAttr->window.skey);
SWAP(pQueryAttr->window.skey, pQueryAttr->window.ekey, TSKEY);
@ -2449,7 +2449,7 @@ static void updateDataCheckOrder(SQInfo *pQInfo, SQueryTableMsg* pQueryMsg, bool
pQueryAttr->needReverseScan = false;
} else if (onlyLastQuery(pQueryAttr) && notContainSessionOrStateWindow(pQueryAttr)) {
if (QUERY_IS_ASC_QUERY(pQueryAttr)) {
qDebug(msg, pQInfo, "only-last", pQueryAttr->order.order, TSDB_ORDER_DESC, pQueryAttr->window.skey,
qDebug(msg, pQInfo->qId, "only-last", pQueryAttr->order.order, TSDB_ORDER_DESC, pQueryAttr->window.skey,
pQueryAttr->window.ekey, pQueryAttr->window.ekey, pQueryAttr->window.skey);
SWAP(pQueryAttr->window.skey, pQueryAttr->window.ekey, TSKEY);
@ -2464,7 +2464,7 @@ static void updateDataCheckOrder(SQInfo *pQInfo, SQueryTableMsg* pQueryMsg, bool
if (stableQuery) {
if (onlyFirstQuery(pQueryAttr)) {
if (!QUERY_IS_ASC_QUERY(pQueryAttr)) {
qDebug(msg, pQInfo, "only-first stable", pQueryAttr->order.order, TSDB_ORDER_ASC,
qDebug(msg, pQInfo->qId, "only-first stable", pQueryAttr->order.order, TSDB_ORDER_ASC,
pQueryAttr->window.skey, pQueryAttr->window.ekey, pQueryAttr->window.ekey, pQueryAttr->window.skey);
SWAP(pQueryAttr->window.skey, pQueryAttr->window.ekey, TSKEY);
@ -2475,7 +2475,7 @@ static void updateDataCheckOrder(SQInfo *pQInfo, SQueryTableMsg* pQueryMsg, bool
pQueryAttr->needReverseScan = false;
} else if (onlyLastQuery(pQueryAttr)) {
if (QUERY_IS_ASC_QUERY(pQueryAttr)) {
qDebug(msg, pQInfo, "only-last stable", pQueryAttr->order.order, TSDB_ORDER_DESC,
qDebug(msg, pQInfo->qId, "only-last stable", pQueryAttr->order.order, TSDB_ORDER_DESC,
pQueryAttr->window.skey, pQueryAttr->window.ekey, pQueryAttr->window.ekey, pQueryAttr->window.skey);
SWAP(pQueryAttr->window.skey, pQueryAttr->window.ekey, TSKEY);
@ -6594,10 +6594,19 @@ static SSDataBlock* hashDistinct(void* param, bool* newgroup) {
if (isNull(val, type)) {
continue;
}
int dummy;
void* res = taosHashGet(pInfo->pSet, val, bytes);
size_t keyLen = 0;
if (IS_VAR_DATA_TYPE(pOperator->pExpr->base.colType)) {
tstr* var = (tstr*)(val);
keyLen = varDataLen(var);
} else {
keyLen = bytes;
}
int dummy;
void* res = taosHashGet(pInfo->pSet, val, keyLen);
if (res == NULL) {
taosHashPut(pInfo->pSet, val, bytes, &dummy, sizeof(dummy));
taosHashPut(pInfo->pSet, val, keyLen, &dummy, sizeof(dummy));
char* start = pResultColInfoData->pData + bytes * pInfo->pRes->info.rows;
memcpy(start, val, bytes);
pRes->info.rows += 1;
@ -6624,6 +6633,7 @@ SOperatorInfo* createDistinctOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperat
pOperator->blockingOptr = false;
pOperator->status = OP_IN_EXECUTING;
pOperator->operatorType = OP_Distinct;
pOperator->pExpr = pExpr;
pOperator->numOfOutput = numOfOutput;
pOperator->info = pInfo;
pOperator->pRuntimeEnv = pRuntimeEnv;

View File

@ -242,6 +242,7 @@ static void *taosAcceptTcpConnection(void *arg) {
pServerObj = (SServerObj *)arg;
tDebug("%s TCP server is ready, ip:0x%x:%hu", pServerObj->label, pServerObj->ip, pServerObj->port);
setThreadName("acceptTcpConn");
while (1) {
socklen_t addrlen = sizeof(caddr);
@ -528,6 +529,11 @@ static void *taosProcessTcpData(void *param) {
SFdObj *pFdObj;
struct epoll_event events[maxEvents];
SRecvInfo recvInfo;
char name[16];
memset(name, 0, sizeof(name));
snprintf(name, 16, "%s-tcpData", pThreadObj->label);
setThreadName(name);
while (1) {
int fdNum = epoll_wait(pThreadObj->pollFd, events, maxEvents, TAOS_EPOLL_WAIT_TIME);

View File

@ -195,6 +195,8 @@ static void *taosRecvUdpData(void *param) {
tDebug("%s UDP thread is created, index:%d", pConn->label, pConn->index);
char *msg = pConn->buffer;
setThreadName("recvUdpData");
while (1) {
dataLen = recvfrom(pConn->fd, pConn->buffer, RPC_MAX_UDP_SIZE, 0, (struct sockaddr *)&sourceAdd, &addLen);
if (dataLen <= 0) {

View File

@ -47,6 +47,8 @@ static int tcount = 0;
static void *sendRequest(void *param) {
SInfo *pInfo = (SInfo *)param;
SRpcMsg rpcMsg = {0};
setThreadName("sendCliReq");
tDebug("thread:%d, start to send request", pInfo->index);

View File

@ -39,8 +39,10 @@ static int terror = 0;
static void *sendRequest(void *param) {
SInfo *pInfo = (SInfo *)param;
SRpcMsg rpcMsg, rspMsg;
SRpcMsg rpcMsg, rspMsg;
setThreadName("sendSrvReq");
tDebug("thread:%d, start to send request", pInfo->index);
while ( pInfo->numOfReqs == 0 || pInfo->num < pInfo->numOfReqs) {

View File

@ -263,6 +263,7 @@ static int32_t syncRestoreDataStepByStep(SSyncPeer *pPeer) {
}
void *syncRestoreData(void *param) {
setThreadName("syncRestoreData");
int64_t rid = (int64_t)param;
SSyncPeer *pPeer = syncAcquirePeer(rid);
if (pPeer == NULL) {

View File

@ -415,6 +415,7 @@ static int32_t syncRetrieveDataStepByStep(SSyncPeer *pPeer) {
}
void *syncRetrieveData(void *param) {
setThreadName("syncRetrievData");
int64_t rid = (int64_t)param;
SSyncPeer *pPeer = syncAcquirePeer(rid);
if (pPeer == NULL) {

View File

@ -195,6 +195,8 @@ static void *syncProcessTcpData(void *param) {
SConnObj * pConn = NULL;
struct epoll_event events[maxEvents];
setThreadName("syncTcpData");
void *buffer = malloc(pInfo->bufferSize);
taosBlockSIGPIPE();
@ -257,6 +259,7 @@ static void *syncAcceptPeerTcpConnection(void *argv) {
SPoolInfo *pInfo = &pPool->info;
taosBlockSIGPIPE();
setThreadName("acceptTcpConn");
while (1) {
struct sockaddr_in clientAddr;

View File

@ -48,6 +48,8 @@ void *sendRequest(void *param) {
SInfo * pInfo = (SInfo *)param;
SRpcMsg rpcMsg = {0};
setThreadName("sendCliReq");
uDebug("thread:%d, start to send request", pInfo->index);
while (pInfo->numOfReqs == 0 || pInfo->num < pInfo->numOfReqs) {

View File

@ -178,6 +178,8 @@ void *processWriteQueue(void *param) {
int type;
void *item;
setThreadName("writeQ");
while (1) {
int ret = taosReadQitem(qhandle, &type, &item);
if (ret <= 0) {

View File

@ -158,6 +158,8 @@ static void *tsdbLoopCommit(void *arg) {
STsdbRepo * pRepo = NULL;
TSDB_REQ_T req;
setThreadName("tsdbCommit");
while (true) {
pthread_mutex_lock(&(pQueue->lock));
@ -208,4 +210,4 @@ void tsdbDecCommitRef(int vgId) {
int refCount = atomic_sub_fetch_32(&tsCommitQueue.refCount, 1);
pthread_cond_broadcast(&(tsCommitQueue.queueNotEmpty));
tsdbDebug("vgId:%d, dec commit queue ref to %d", vgId, refCount);
}
}

View File

@ -656,6 +656,8 @@ void* taosCacheTimedRefresh(void *handle) {
return NULL;
}
setThreadName("cacheTimedRefre");
const int32_t SLEEP_DURATION = 500; //500 ms
int64_t totalTick = pCacheObj->refreshTime / SLEEP_DURATION;

View File

@ -178,6 +178,8 @@ static void *taosThreadToOpenNewFile(void *param) {
char keepName[LOG_FILE_NAME_LEN + 20];
sprintf(keepName, "%s.%d", tsLogObj.logName, tsLogObj.flag);
setThreadName("openNewFile");
tsLogObj.flag ^= 1;
tsLogObj.lines = 0;
char name[LOG_FILE_NAME_LEN + 20];
@ -687,6 +689,8 @@ static void taosWriteLog(SLogBuff *tLogBuff) {
static void *taosAsyncOutputLog(void *param) {
SLogBuff *tLogBuff = (SLogBuff *)param;
setThreadName("asyncOutputLog");
while (1) {
//tsem_wait(&(tLogBuff->buffNotEmpty));

View File

@ -50,7 +50,9 @@ static void *taosNetBindUdpPort(void *sarg) {
struct sockaddr_in server_addr;
struct sockaddr_in clientAddr;
if ((serverSocket = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP)) < 0) {
setThreadName("netBindUdpPort");
if ((serverSocket = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP)) < 0) {
uError("failed to create UDP socket since %s", strerror(errno));
return NULL;
}
@ -106,13 +108,15 @@ static void *taosNetBindTcpPort(void *sarg) {
struct sockaddr_in server_addr;
struct sockaddr_in clientAddr;
STestInfo *pinfo = sarg;
STestInfo *pinfo = sarg;
int32_t port = pinfo->port;
SOCKET serverSocket;
int32_t addr_len = sizeof(clientAddr);
SOCKET client;
char buffer[BUFFER_SIZE];
setThreadName("netBindTcpPort");
if ((serverSocket = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP)) < 0) {
uError("failed to create TCP socket since %s", strerror(errno));
return NULL;

View File

@ -84,6 +84,8 @@ static void *taosThreadToOpenNewNote(void *param) {
char name[NOTE_FILE_NAME_LEN * 2];
SNoteObj *pNote = (SNoteObj *)param;
setThreadName("openNewNote");
pNote->flag ^= 1;
pNote->lines = 0;
sprintf(name, "%s.%d", pNote->name, pNote->flag);

View File

@ -122,6 +122,8 @@ void *taosProcessSchedQueue(void *scheduler) {
SSchedQueue *pSched = (SSchedQueue *)scheduler;
int ret = 0;
setThreadName("schedQ");
while (1) {
if ((ret = tsem_wait(&pSched->fullSem)) != 0) {
uFatal("wait %s fullSem failed(%s)", pSched->label, strerror(errno));
@ -234,4 +236,4 @@ void taosDumpSchedulerStatus(void *qhandle, void *tmrId) {
}
taosTmrReset(taosDumpSchedulerStatus, DUMP_SCHEDULER_TIME_WINDOW, pSched, pSched->pTmrCtrl, &pSched->pTimer);
}
}

View File

@ -35,6 +35,8 @@ void *addRef(void *param) {
SRefSpace *pSpace = (SRefSpace *)param;
int id;
setThreadName("addRef");
for (int i=0; i < pSpace->steps; ++i) {
printf("a");
id = random() % pSpace->refNum;
@ -52,6 +54,8 @@ void *removeRef(void *param) {
SRefSpace *pSpace = (SRefSpace *)param;
int id, code;
setThreadName("removeRef");
for (int i=0; i < pSpace->steps; ++i) {
printf("d");
id = random() % pSpace->refNum;
@ -70,6 +74,8 @@ void *acquireRelease(void *param) {
SRefSpace *pSpace = (SRefSpace *)param;
int id;
setThreadName("acquireRelease");
for (int i=0; i < pSpace->steps; ++i) {
printf("a");
@ -91,6 +97,8 @@ void myfree(void *p) {
void *openRefSpace(void *param) {
SRefSpace *pSpace = (SRefSpace *)param;
setThreadName("openRefSpace");
printf("c");
pSpace->rsetId = taosOpenRef(50, myfree);

View File

@ -61,6 +61,8 @@ static void vnodeProcessBackupMsg(SVBackupMsg *pMsg) {
}
static void *vnodeBackupFunc(void *param) {
setThreadName("vnodeBackup");
while (1) {
SVBackupMsg *pMsg = NULL;
if (taosReadQitemFromQset(tsVBackupQset, NULL, (void **)&pMsg, NULL) == 0) {

View File

@ -188,6 +188,8 @@ static void vnodeProcessMWorkerMsg(SVMWorkerMsg *pMsg) {
}
static void *vnodeMWorkerFunc(void *param) {
setThreadName("vnodeMWorker");
while (1) {
SVMWorkerMsg *pMsg = NULL;
if (taosReadQitemFromQset(tsVMWorkerQset, NULL, (void **)&pMsg, NULL) == 0) {

View File

@ -192,6 +192,7 @@ static void walFsyncAll() {
static void *walThreadFunc(void *param) {
int stop = 0;
setThreadName("walThrd");
while (1) {
walUpdateSeq();
walFsyncAll();

View File

@ -5,6 +5,8 @@ IF (TD_LINUX)
AUX_SOURCE_DIRECTORY(. SRC)
ADD_EXECUTABLE(demo apitest.c)
TARGET_LINK_LIBRARIES(demo taos_static trpc tutil pthread )
ADD_EXECUTABLE(sml schemaless.c)
TARGET_LINK_LIBRARIES(sml taos_static trpc tutil pthread )
ADD_EXECUTABLE(subscribe subscribe.c)
TARGET_LINK_LIBRARIES(subscribe taos_static trpc tutil pthread )
ADD_EXECUTABLE(epoll epoll.c)

View File

@ -964,21 +964,31 @@ int32_t verify_schema_less(TAOS* taos) {
usleep(100000);
char* lines[] = {
"st,t1=3i,t2=4,t3=\"t3\" c1=3i,c3=L\"passit\",c2=false,c4=4 1626006833639000000",
"st,t1=4i,t3=\"t4\",t2=5,t4=5 c1=3i,c3=L\"passitagin\",c2=true,c4=5,c5=5 1626006833640000000",
"ste,t2=5,t3=L\"ste\" c1=true,c2=4,c3=\"iam\" 1626056811823316532",
"st,t1=4i,t2=5,t3=\"t4\" c1=3i,c3=L\"passitagain\",c2=true,c4=5 1626006833642000000",
"ste,t2=5,t3=L\"ste2\" c3=\"iamszhou\",c4=false 1626056811843316532",
"ste,t2=5,t3=L\"ste2\" c3=\"iamszhou\",c4=false,c5=32b,c6=64s,c7=32w,c8=88.88f 1626056812843316532",
"st,t1=4i,t3=\"t4\",t2=5,t4=5 c1=3i,c3=L\"passitagin\",c2=true,c4=5,c5=5,c6=7u 1626006933640000000",
"stf,t1=4i,t3=\"t4\",t2=5,t4=5 c1=3i,c3=L\"passitagin\",c2=true,c4=5,c5=5,c6=7u 1626006933640000000",
"stf,t1=4i,t3=\"t4\",t2=5,t4=5 c1=3i,c3=L\"passitagin_stf\",c2=false,c5=5,c6=7u 1626006933641a"
"st,t1=3i64,t2=4f64,t3=\"t3\" c1=3i64,c3=L\"passit\",c2=false,c4=4f64 1626006833639000000ns",
"st,t1=4i64,t3=\"t4\",t2=5f64,t4=5f64 c1=3i64,c3=L\"passitagin\",c2=true,c4=5f64,c5=5f64 1626006833640000000ns",
"ste,t2=5f64,t3=L\"ste\" c1=true,c2=4i64,c3=\"iam\" 1626056811823316532ns",
"st,t1=4i64,t2=5f64,t3=\"t4\" c1=3i64,c3=L\"passitagain\",c2=true,c4=5f64 1626006833642000000ns",
"ste,t2=5f64,t3=L\"ste2\" c3=\"iamszhou\",c4=false 1626056811843316532ns",
"ste,t2=5f64,t3=L\"ste2\" c3=\"iamszhou\",c4=false,c5=32i8,c6=64i16,c7=32i32,c8=88.88f32 1626056812843316532ns",
"st,t1=4i64,t3=\"t4\",t2=5f64,t4=5f64 c1=3i64,c3=L\"passitagin\",c2=true,c4=5f64,c5=5f64,c6=7u64 1626006933640000000ns",
"stf,t1=4i64,t3=\"t4\",t2=5f64,t4=5f64 c1=3i64,c3=L\"passitagin\",c2=true,c4=5f64,c5=5f64,c6=7u64 1626006933640000000ns",
"stf,t1=4i64,t3=\"t4\",t2=5f64,t4=5f64 c1=3i64,c3=L\"passitagin_stf\",c2=false,c5=5f64,c6=7u64 1626006933641000000ns"
};
// int code = taos_insert_lines(taos, lines , sizeof(lines)/sizeof(char*));
int code = taos_insert_lines(taos, &lines[0], 1);
code = taos_insert_lines(taos, &lines[1], 1);
int code = 0;
code = taos_insert_lines(taos, lines , sizeof(lines)/sizeof(char*));
char* lines2[] = {
"stg,t1=3i64,t2=4f64,t3=\"t3\" c1=3i64,c3=L\"passit\",c2=false,c4=4f64 1626006833639000000ns",
"stg,t1=4i64,t3=\"t4\",t2=5f64,t4=5f64 c1=3i64,c3=L\"passitagin\",c2=true,c4=5f64,c5=5f64 1626006833640000000ns"
};
code = taos_insert_lines(taos, &lines2[0], 1);
code = taos_insert_lines(taos, &lines2[1], 1);
char* lines3[] = {
"sth,t1=4i64,t2=5f64,t4=5f64,ID=\"childtable\" c1=3i64,c3=L\"passitagin_stf\",c2=false,c5=5f64,c6=7u64 1626006933641ms",
"sth,t1=4i64,t2=5f64,t4=5f64 c1=3i64,c3=L\"passitagin_stf\",c2=false,c5=5f64,c6=7u64 1626006933654ms"
};
code = taos_insert_lines(taos, lines3, 2);
return code;
}
@ -1000,10 +1010,8 @@ int main(int argc, char *argv[]) {
printf("client info: %s\n", info);
printf("************ verify shemaless *************\n");
int code = verify_schema_less(taos);
if (code == 0) {
return code;
}
verify_schema_less(taos);
printf("************ verify query *************\n");
verify_query(taos);

View File

@ -0,0 +1,161 @@
#include "taos.h"
#include "taoserror.h"
#include "os.h"
#include <stdio.h>
#include <stdlib.h>
#include <sys/time.h>
#include <time.h>
#include <unistd.h>
int numSuperTables = 8;
int numChildTables = 1024;
int numRowsPerChildTable = 128;
void shuffle(char**lines, size_t n)
{
if (n > 1)
{
size_t i;
for (i = 0; i < n - 1; i++)
{
size_t j = i + rand() / (RAND_MAX / (n - i) + 1);
char* t = lines[j];
lines[j] = lines[i];
lines[i] = t;
}
}
}
static int64_t getTimeInUs() {
struct timeval systemTime;
gettimeofday(&systemTime, NULL);
return (int64_t)systemTime.tv_sec * 1000000L + (int64_t)systemTime.tv_usec;
}
int main(int argc, char* argv[]) {
TAOS_RES *result;
const char* host = "127.0.0.1";
const char* user = "root";
const char* passwd = "taosdata";
taos_options(TSDB_OPTION_TIMEZONE, "GMT-8");
TAOS* taos = taos_connect(host, user, passwd, "", 0);
if (taos == NULL) {
printf("\033[31mfailed to connect to db, reason:%s\033[0m\n", taos_errstr(taos));
exit(1);
}
char* info = taos_get_server_info(taos);
printf("server info: %s\n", info);
info = taos_get_client_info(taos);
printf("client info: %s\n", info);
result = taos_query(taos, "drop database if exists db;");
taos_free_result(result);
usleep(100000);
result = taos_query(taos, "create database db precision 'ms';");
taos_free_result(result);
usleep(100000);
(void)taos_select_db(taos, "db");
time_t ct = time(0);
int64_t ts = ct * 1000;
char* lineFormat = "sta%d,t0=true,t1=127i8,t2=32767i16,t3=%di32,t4=9223372036854775807i64,t9=11.12345f32,t10=22.123456789f64,t11=\"binaryTagValue\",t12=L\"ncharTagValue\" c0=true,c1=127i8,c2=32767i16,c3=2147483647i32,c4=9223372036854775807i64,c5=255u8,c6=32770u16,c7=2147483699u32,c8=9223372036854775899u64,c9=11.12345f32,c10=22.123456789f64,c11=\"binaryValue\",c12=L\"ncharValue\" %lldms";
char** lines = calloc(numSuperTables * numChildTables * numRowsPerChildTable, sizeof(char*));
int l = 0;
for (int i = 0; i < numSuperTables; ++i) {
for (int j = 0; j < numChildTables; ++j) {
for (int k = 0; k < numRowsPerChildTable; ++k) {
char* line = calloc(512, 1);
snprintf(line, 512, lineFormat, i, j, ts + 10 * l);
lines[l] = line;
++l;
}
}
}
shuffle(lines, numSuperTables * numChildTables * numRowsPerChildTable);
printf("%s\n", "begin taos_insert_lines");
int64_t begin = getTimeInUs();
int32_t code = taos_insert_lines(taos, lines, numSuperTables * numChildTables * numRowsPerChildTable);
int64_t end = getTimeInUs();
printf("code: %d, %s. time used: %"PRId64"\n", code, tstrerror(code), end-begin);
char* lines_000_0[] = {
"sta1,id=sta1_1,t0=true,t1=127i8,t2=32767i16,t3=2147483647i32,t4=9223372036854775807i64,t5=255u8,t6=32770u16,t7=2147483699u32,t8=9223372036854775899u64,t9=11.12345f32,t10=22.123456789f64,t11=\"binaryTagValue\",t12=L\"ncharTagValue\" c0=true,c1=127i8,c2=32767i16,c3=2147483647i32,c4=9223372036854775807i64,c5=255u8,c6=32770u16,c7=2147483699u32,c8=9223372036854775899u64,c9=11.12345f32,c10=22.123456789f64,c11=\"binaryValue\",c12=L\"ncharValue\" 1626006833639000us"
};
code = taos_insert_lines(taos, lines_000_0 , sizeof(lines_000_0)/sizeof(char*));
if (0 == code) {
printf("taos_insert_lines() lines_000_0 should return error\n");
return -1;
}
char* lines_000_1[] = {
"sta2,id=\"sta2_1\",t0=true,t1=127i8,t2=32767i16,t3=2147483647i32,t4=9223372036854775807i64,t5=255u8,t6=32770u16,t7=2147483699u32,t8=9223372036854775899u64,t9=11.12345f32,t10=22.123456789f64,t11=\"binaryTagValue\",t12=L\"ncharTagValue\" c0=true,c1=127i8,c2=32767i16,c3=2147483647i32,c4=9223372036854775807i64,c5=255u8,c6=32770u16,c7=2147483699u32,c8=9223372036854775899u64,c9=11.12345f32,c10=22.123456789f64,c11=\"binaryValue\",c12=L\"ncharValue\" 1626006833639001"
};
code = taos_insert_lines(taos, lines_000_1 , sizeof(lines_000_1)/sizeof(char*));
if (0 == code) {
printf("taos_insert_lines() lines_000_1 should return error\n");
return -1;
}
char* lines_000_2[] = {
"sta3,id=\"sta3_1\",t0=true,t1=127i8,t2=32767i16,t3=2147483647i32,t4=9223372036854775807i64,t9=11.12345f32,t10=22.123456789f64,t11=\"binaryTagValue\",t12=L\"ncharTagValue\" c0=true,c1=127i8,c2=32767i16,c3=2147483647i32,c4=9223372036854775807i64,c5=255u8,c6=32770u16,c7=2147483699u32,c8=9223372036854775899u64,c9=11.12345f32,c10=22.123456789f64,c11=\"binaryValue\",c12=L\"ncharValue\" 0"
};
code = taos_insert_lines(taos, lines_000_2 , sizeof(lines_000_2)/sizeof(char*));
if (0 != code) {
printf("taos_insert_lines() lines_000_2 return code:%d (%s)\n", code, (char*)tstrerror(code));
return -1;
}
char* lines_001_0[] = {
"sta4,t0=true,t1=127i8,t2=32767i16,t3=2147483647i32,t4=9223372036854775807i64,t9=11.12345f32,t10=22.123456789f64,t11=\"binaryTagValue\",t12=L\"ncharTagValue\" c0=true,c1=127i8,c2=32767i16,c3=2147483647i32,c4=9223372036854775807i64,c9=11.12345f32,c10=22.123456789f64,c11=\"binaryValue\",c12=L\"ncharValue\" 1626006833639000us",
};
code = taos_insert_lines(taos, lines_001_0 , sizeof(lines_001_0)/sizeof(char*));
if (0 != code) {
printf("taos_insert_lines() lines_001_0 return code:%d (%s)\n", code, (char*)tstrerror(code));
return -1;
}
char* lines_001_1[] = {
"sta5,id=\"sta5_1\",t0=true,t1=127i8,t2=32767i16,t3=2147483647i32,t4=9223372036854775807i64,t9=11.12345f32,t10=22.123456789f64,t11=\"binaryTagValue\",t12=L\"ncharTagValue\" c0=true,c1=127i8,c2=32767i16,c3=2147483647i32,c4=9223372036854775807i64,c9=11.12345f32,c10=22.123456789f64,c11=\"binaryValue\",c12=L\"ncharValue\" 1626006833639001"
};
code = taos_insert_lines(taos, lines_001_1 , sizeof(lines_001_1)/sizeof(char*));
if (0 != code) {
printf("taos_insert_lines() lines_001_1 return code:%d (%s)\n", code, (char*)tstrerror(code));
return -1;
}
char* lines_001_2[] = {
"sta6,id=\"sta6_1\",t0=true,t1=127i8,t2=32767i16,t3=2147483647i32,t4=9223372036854775807i64,t9=11.12345f32,t10=22.123456789f64,t11=\"binaryTagValue\",t12=L\"ncharTagValue\" c0=true,c1=127i8,c2=32767i16,c3=2147483647i32,c4=9223372036854775807i64,c9=11.12345f32,c10=22.123456789f64,c11=\"binaryValue\",c12=L\"ncharValue\" 0"
};
code = taos_insert_lines(taos, lines_001_2 , sizeof(lines_001_2)/sizeof(char*));
if (0 != code) {
printf("taos_insert_lines() lines_001_2 return code:%d (%s)\n", code, (char*)tstrerror(code));
return -1;
}
char* lines_002[] = {
"stb,id=\"stb_1\",t20=t,t21=T,t22=true,t23=True,t24=TRUE,t25=f,t26=F,t27=false,t28=False,t29=FALSE,t10=33.12345,t11=\"binaryTagValue\",t12=L\"ncharTagValue\" c20=t,c21=T,c22=true,c23=True,c24=TRUE,c25=f,c26=F,c27=false,c28=False,c29=FALSE,c10=33.12345,c11=\"binaryValue\",c12=L\"ncharValue\" 1626006833639000000ns",
"stc,id=\"stc_1\",t20=t,t21=T,t22=true,t23=True,t24=TRUE,t25=f,t26=F,t27=false,t28=False,t29=FALSE,t10=33.12345,t11=\"binaryTagValue\",t12=L\"ncharTagValue\" c20=t,c21=T,c22=true,c23=True,c24=TRUE,c25=f,c26=F,c27=false,c28=False,c29=FALSE,c10=33.12345,c11=\"binaryValue\",c12=L\"ncharValue\" 1626006833639019us",
"stc,id=\"stc_1\",t20=t,t21=T,t22=true,t23=True,t24=TRUE,t25=f,t26=F,t27=false,t28=False,t29=FALSE,t10=33.12345,t11=\"binaryTagValue\",t12=L\"ncharTagValue\" c20=t,c21=T,c22=true,c23=True,c24=TRUE,c25=f,c26=F,c27=false,c28=False,c29=FALSE,c10=33.12345,c11=\"binaryValue\",c12=L\"ncharValue\" 1626006833640ms",
"stc,id=\"stc_1\",t20=t,t21=T,t22=true,t23=True,t24=TRUE,t25=f,t26=F,t27=false,t28=False,t29=FALSE,t10=33.12345,t11=\"binaryTagValue\",t12=L\"ncharTagValue\" c20=t,c21=T,c22=true,c23=True,c24=TRUE,c25=f,c26=F,c27=false,c28=False,c29=FALSE,c10=33.12345,c11=\"binaryValue\",c12=L\"ncharValue\" 1626006834s"
};
code = taos_insert_lines(taos, lines_002 , sizeof(lines_002)/sizeof(char*));
if (0 != code) {
printf("taos_insert_lines() lines_002 return code:%d (%s)\n", code, (char*)tstrerror(code));
return -1;
}
return 0;
}

View File

@ -27,6 +27,7 @@ python3 ./test.py -f insert/bug3654.py
python3 ./test.py -f insert/insertDynamicColBeforeVal.py
python3 ./test.py -f insert/in_function.py
python3 ./test.py -f insert/modify_column.py
python3 ./test.py -f insert/line_insert.py
#table
python3 ./test.py -f table/alter_wal0.py
@ -361,4 +362,6 @@ python3 test.py -f alter/alter_keep.py
python3 test.py -f alter/alter_cacheLastRow.py
python3 ./test.py -f query/querySession.py
python3 test.py -f alter/alter_create_exception.py
python3 ./test.py -f insert/flushwhiledrop.py
#======================p4-end===============

View File

@ -0,0 +1,67 @@
###################################################################
# Copyright (c) 2016 by TAOS Technologies, Inc.
# All rights reserved.
#
# This file is proprietary and confidential to TAOS Technologies.
# No part of this file may be reproduced, stored, transmitted,
# disclosed or used in any form or by any means other than as
# expressly provided by the written permission from Jianhui Tao
#
###################################################################
# -*- coding: utf-8 -*-
import sys
import threading
from util.log import *
from util.cases import *
from util.sql import *
from time import sleep
class TDTestCase:
def init(self, conn, logSql):
tdLog.debug("start to execute %s" % __file__)
tdSql.init(conn.cursor(), logSql)
self.numberOfRecords = 15000
self.ts = 1601481600000
def run(self):
tdSql.execute('create database test cache 1 blocks 3')
tdSql.execute('use test')
tdSql.execute('create table tb(ts timestamp, c1 timestamp, c2 int, c3 bigint, c4 float, c5 double, c6 binary(8), c7 smallint, c8 tinyint, c9 bool, c10 nchar(8))')
threads = []
t1 = threading.Thread(target=self.insertAndFlush, args=())
threads.append(t1)
t2 = threading.Thread(target=self.drop, args=())
threads.append(t2)
for t in threads:
t.setDaemon(True)
t.start()
for t in threads:
t.join()
def insertAndFlush(self):
finish = 0
currts = self.ts
while(finish < self.numberOfRecords):
sql = "insert into tb values"
for i in range(finish, self.numberOfRecords):
sql += "(%d, 1019774612, 29931, 1442173978, 165092.468750, 1128.643179, 'MOCq1pTu', 18405, 82, 0, 'g0A6S0Fu')" % (currts + i)
finish = i + 1
if (1048576 - len(sql)) < 16384:
break
tdSql.execute(sql)
def drop(self):
sleep(30)
tdSql.execute('drop database test')
def stop(self):
tdSql.close()
tdLog.success("%s successfully executed" % __file__)
tdCases.addWindows(__file__, TDTestCase())
tdCases.addLinux(__file__, TDTestCase())

View File

@ -0,0 +1,91 @@
###################################################################
# Copyright (c) 2021 by TAOS Technologies, Inc.
# All rights reserved.
#
# This file is proprietary and confidential to TAOS Technologies.
# No part of this file may be reproduced, stored, transmitted,
# disclosed or used in any form or by any means other than as
# expressly provided by the written permission from Jianhui Tao
#
###################################################################
# -*- coding: utf-8 -*-
import sys
from util.log import *
from util.cases import *
from util.sql import *
class TDTestCase:
def init(self, conn, logSql):
tdLog.debug("start to execute %s" % __file__)
tdSql.init(conn.cursor(), logSql)
self._conn = conn
def run(self):
print("running {}".format(__file__))
tdSql.execute("drop database if exists test")
tdSql.execute("create database if not exists test precision 'us'")
tdSql.execute('use test')
tdSql.execute('create stable ste(ts timestamp, f int) tags(t1 bigint)')
lines = [ "st,t1=3i64,t2=4f64,t3=\"t3\" c1=3i64,c3=L\"passit\",c2=false,c4=4f64 1626006833639000000ns",
"st,t1=4i64,t3=\"t4\",t2=5f64,t4=5f64 c1=3i64,c3=L\"passitagin\",c2=true,c4=5f64,c5=5f64 1626006833640000000ns",
"ste,t2=5f64,t3=L\"ste\" c1=true,c2=4i64,c3=\"iam\" 1626056811823316532ns",
"stf,t1=4i64,t3=\"t4\",t2=5f64,t4=5f64 c1=3i64,c3=L\"passitagin\",c2=true,c4=5f64,c5=5f64,c6=7u64 1626006933640000000ns",
"st,t1=4i64,t2=5f64,t3=\"t4\" c1=3i64,c3=L\"passitagain\",c2=true,c4=5f64 1626006833642000000ns",
"ste,t2=5f64,t3=L\"ste2\" c3=\"iamszhou\",c4=false 1626056811843316532ns",
"ste,t2=5f64,t3=L\"ste2\" c3=\"iamszhou\",c4=false,c5=32i8,c6=64i16,c7=32i32,c8=88.88f32 1626056812843316532ns",
"st,t1=4i64,t3=\"t4\",t2=5f64,t4=5f64 c1=3i64,c3=L\"passitagin\",c2=true,c4=5f64,c5=5f64,c6=7u64 1626006933640000000ns",
"stf,t1=4i64,t3=\"t4\",t2=5f64,t4=5f64 c1=3i64,c3=L\"passitagin_stf\",c2=false,c5=5f64,c6=7u64 1626006933641000000ns"
]
code = self._conn.insertLines(lines)
print("insertLines result {}".format(code))
lines2 = [ "stg,t1=3i64,t2=4f64,t3=\"t3\" c1=3i64,c3=L\"passit\",c2=false,c4=4f64 1626006833639000000ns",
"stg,t1=4i64,t3=\"t4\",t2=5f64,t4=5f64 c1=3i64,c3=L\"passitagin\",c2=true,c4=5f64,c5=5f64 1626006833640000000ns"
]
code = self._conn.insertLines([ lines2[0] ])
print("insertLines result {}".format(code))
self._conn.insertLines([ lines2[1] ])
print("insertLines result {}".format(code))
tdSql.query("select * from st")
tdSql.checkRows(4)
tdSql.query("select * from ste")
tdSql.checkRows(3)
tdSql.query("select * from stf")
tdSql.checkRows(2)
tdSql.query("select * from stg")
tdSql.checkRows(2)
tdSql.query("show tables")
tdSql.checkRows(8)
tdSql.query("describe stf")
tdSql.checkData(2, 2, 14)
self._conn.insertLines([
"sth,t1=4i64,t2=5f64,t4=5f64,ID=\"childtable\" c1=3i64,c3=L\"passitagin_stf\",c2=false,c5=5f64,c6=7u64 1626006933641ms",
"sth,t1=4i64,t2=5f64,t4=5f64 c1=3i64,c3=L\"passitagin_stf\",c2=false,c5=5f64,c6=7u64 1626006933654ms"
])
tdSql.query('select tbname, * from sth')
tdSql.checkRows(2)
tdSql.query('select tbname, * from childtable')
tdSql.checkRows(1)
def stop(self):
tdSql.close()
tdLog.success("%s successfully executed" % __file__)
tdCases.addWindows(__file__, TDTestCase())
tdCases.addLinux(__file__, TDTestCase())

Some files were not shown because too many files have changed in this diff Show More