Merge branch 'develop' into feature/TD-1413

This commit is contained in:
Hongze Cheng 2020-11-19 01:54:44 +00:00
commit 8c89576d78
25 changed files with 3554 additions and 244 deletions

View File

@ -84,9 +84,9 @@ typedef struct SRetrieveSupport {
} SRetrieveSupport; } SRetrieveSupport;
int32_t tscLocalReducerEnvCreate(SSqlObj *pSql, tExtMemBuffer ***pMemBuffer, tOrderDescriptor **pDesc, int32_t tscLocalReducerEnvCreate(SSqlObj *pSql, tExtMemBuffer ***pMemBuffer, tOrderDescriptor **pDesc,
SColumnModel **pFinalModel, uint32_t nBufferSize); SColumnModel **pFinalModel, SColumnModel** pFFModel, uint32_t nBufferSize);
void tscLocalReducerEnvDestroy(tExtMemBuffer **pMemBuffer, tOrderDescriptor *pDesc, SColumnModel *pFinalModel, void tscLocalReducerEnvDestroy(tExtMemBuffer **pMemBuffer, tOrderDescriptor *pDesc, SColumnModel *pFinalModel, SColumnModel* pFFModel,
int32_t numOfVnodes); int32_t numOfVnodes);
int32_t saveToBuffer(tExtMemBuffer *pMemoryBuf, tOrderDescriptor *pDesc, tFilePage *pPage, void *data, int32_t saveToBuffer(tExtMemBuffer *pMemoryBuf, tOrderDescriptor *pDesc, tFilePage *pPage, void *data,

View File

@ -282,6 +282,7 @@ int tscSetMgmtEpSetFromCfg(const char *first, const char *second);
bool tscSetSqlOwner(SSqlObj* pSql); bool tscSetSqlOwner(SSqlObj* pSql);
void tscClearSqlOwner(SSqlObj* pSql); void tscClearSqlOwner(SSqlObj* pSql);
int32_t doArithmeticCalculate(SQueryInfo* pQueryInfo, tFilePage* pOutput, int32_t rowSize, int32_t finalRowSize);
void* malloc_throw(size_t size); void* malloc_throw(size_t size);
void* calloc_throw(size_t nmemb, size_t size); void* calloc_throw(size_t nmemb, size_t size);

View File

@ -30,8 +30,6 @@ typedef struct SCompareParam {
int32_t groupOrderType; int32_t groupOrderType;
} SCompareParam; } SCompareParam;
static void doArithmeticCalculate(SQueryInfo* pQueryInfo, tFilePage* pOutput, int32_t rowSize, int32_t finalRowSize);
int32_t treeComparator(const void *pLeft, const void *pRight, void *param) { int32_t treeComparator(const void *pLeft, const void *pRight, void *param) {
int32_t pLeftIdx = *(int32_t *)pLeft; int32_t pLeftIdx = *(int32_t *)pLeft;
int32_t pRightIdx = *(int32_t *)pRight; int32_t pRightIdx = *(int32_t *)pRight;
@ -174,14 +172,14 @@ void tscCreateLocalReducer(tExtMemBuffer **pMemBuffer, int32_t numOfBuffer, tOrd
SSqlRes* pRes = &pSql->res; SSqlRes* pRes = &pSql->res;
if (pMemBuffer == NULL) { if (pMemBuffer == NULL) {
tscLocalReducerEnvDestroy(pMemBuffer, pDesc, finalmodel, numOfBuffer); tscLocalReducerEnvDestroy(pMemBuffer, pDesc, finalmodel, pFFModel, numOfBuffer);
tscError("%p pMemBuffer is NULL", pMemBuffer); tscError("%p pMemBuffer is NULL", pMemBuffer);
pRes->code = TSDB_CODE_TSC_APP_ERROR; pRes->code = TSDB_CODE_TSC_APP_ERROR;
return; return;
} }
if (pDesc->pColumnModel == NULL) { if (pDesc->pColumnModel == NULL) {
tscLocalReducerEnvDestroy(pMemBuffer, pDesc, finalmodel, numOfBuffer); tscLocalReducerEnvDestroy(pMemBuffer, pDesc, finalmodel, pFFModel, numOfBuffer);
tscError("%p no local buffer or intermediate result format model", pSql); tscError("%p no local buffer or intermediate result format model", pSql);
pRes->code = TSDB_CODE_TSC_APP_ERROR; pRes->code = TSDB_CODE_TSC_APP_ERROR;
return; return;
@ -199,7 +197,7 @@ void tscCreateLocalReducer(tExtMemBuffer **pMemBuffer, int32_t numOfBuffer, tOrd
} }
if (numOfFlush == 0 || numOfBuffer == 0) { if (numOfFlush == 0 || numOfBuffer == 0) {
tscLocalReducerEnvDestroy(pMemBuffer, pDesc, finalmodel, numOfBuffer); tscLocalReducerEnvDestroy(pMemBuffer, pDesc, finalmodel, pFFModel, numOfBuffer);
tscDebug("%p retrieved no data", pSql); tscDebug("%p retrieved no data", pSql);
return; return;
} }
@ -208,7 +206,7 @@ void tscCreateLocalReducer(tExtMemBuffer **pMemBuffer, int32_t numOfBuffer, tOrd
tscError("%p Invalid value of buffer capacity %d and page size %d ", pSql, pDesc->pColumnModel->capacity, tscError("%p Invalid value of buffer capacity %d and page size %d ", pSql, pDesc->pColumnModel->capacity,
pMemBuffer[0]->pageSize); pMemBuffer[0]->pageSize);
tscLocalReducerEnvDestroy(pMemBuffer, pDesc, finalmodel, numOfBuffer); tscLocalReducerEnvDestroy(pMemBuffer, pDesc, finalmodel, pFFModel, numOfBuffer);
pRes->code = TSDB_CODE_TSC_APP_ERROR; pRes->code = TSDB_CODE_TSC_APP_ERROR;
return; return;
} }
@ -219,7 +217,7 @@ void tscCreateLocalReducer(tExtMemBuffer **pMemBuffer, int32_t numOfBuffer, tOrd
if (pReducer == NULL) { if (pReducer == NULL) {
tscError("%p failed to create local merge structure, out of memory", pSql); tscError("%p failed to create local merge structure, out of memory", pSql);
tscLocalReducerEnvDestroy(pMemBuffer, pDesc, finalmodel, numOfBuffer); tscLocalReducerEnvDestroy(pMemBuffer, pDesc, finalmodel, pFFModel, numOfBuffer);
pRes->code = TSDB_CODE_TSC_OUT_OF_MEMORY; pRes->code = TSDB_CODE_TSC_OUT_OF_MEMORY;
return; return;
} }
@ -336,6 +334,8 @@ void tscCreateLocalReducer(tExtMemBuffer **pMemBuffer, int32_t numOfBuffer, tOrd
pReducer->resColModel = finalmodel; pReducer->resColModel = finalmodel;
pReducer->resColModel->capacity = pReducer->nResultBufSize; pReducer->resColModel->capacity = pReducer->nResultBufSize;
pReducer->finalModel = pFFModel;
assert(pReducer->finalRowSize > 0); assert(pReducer->finalRowSize > 0);
if (pReducer->finalRowSize > 0) { if (pReducer->finalRowSize > 0) {
pReducer->resColModel->capacity /= pReducer->finalRowSize; pReducer->resColModel->capacity /= pReducer->finalRowSize;
@ -533,7 +533,7 @@ void tscDestroyLocalReducer(SSqlObj *pSql) {
tfree(pLocalReducer->pFinalRes); tfree(pLocalReducer->pFinalRes);
tfree(pLocalReducer->discardData); tfree(pLocalReducer->discardData);
tscLocalReducerEnvDestroy(pLocalReducer->pExtMemBuffer, pLocalReducer->pDesc, pLocalReducer->resColModel, tscLocalReducerEnvDestroy(pLocalReducer->pExtMemBuffer, pLocalReducer->pDesc, pLocalReducer->resColModel, pLocalReducer->finalModel,
pLocalReducer->numOfVnode); pLocalReducer->numOfVnode);
for (int32_t i = 0; i < pLocalReducer->numOfBuffer; ++i) { for (int32_t i = 0; i < pLocalReducer->numOfBuffer; ++i) {
tfree(pLocalReducer->pLocalDataSrc[i]); tfree(pLocalReducer->pLocalDataSrc[i]);
@ -657,7 +657,7 @@ bool isSameGroup(SSqlCmd *pCmd, SLocalReducer *pReducer, char *pPrev, tFilePage
} }
int32_t tscLocalReducerEnvCreate(SSqlObj *pSql, tExtMemBuffer ***pMemBuffer, tOrderDescriptor **pOrderDesc, int32_t tscLocalReducerEnvCreate(SSqlObj *pSql, tExtMemBuffer ***pMemBuffer, tOrderDescriptor **pOrderDesc,
SColumnModel **pFinalModel, uint32_t nBufferSizes) { SColumnModel **pFinalModel, SColumnModel** pFFModel, uint32_t nBufferSizes) {
SSqlCmd *pCmd = &pSql->cmd; SSqlCmd *pCmd = &pSql->cmd;
SSqlRes *pRes = &pSql->res; SSqlRes *pRes = &pSql->res;
@ -755,6 +755,18 @@ int32_t tscLocalReducerEnvCreate(SSqlObj *pSql, tExtMemBuffer ***pMemBuffer, tOr
*pFinalModel = createColumnModel(pSchema, (int32_t)size, capacity); *pFinalModel = createColumnModel(pSchema, (int32_t)size, capacity);
memset(pSchema, 0, sizeof(SSchema) * size);
size = tscNumOfFields(pQueryInfo);
for(int32_t i = 0; i < size; ++i) {
SInternalField* pField = tscFieldInfoGetInternalField(&pQueryInfo->fieldsInfo, i);
pSchema[i].bytes = pField->field.bytes;
pSchema[i].type = pField->field.type;
tstrncpy(pSchema[i].name, pField->field.name, tListLen(pSchema[i].name));
}
*pFFModel = createColumnModel(pSchema, (int32_t) size, capacity);
tfree(pSchema); tfree(pSchema);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
@ -765,9 +777,11 @@ int32_t tscLocalReducerEnvCreate(SSqlObj *pSql, tExtMemBuffer ***pMemBuffer, tOr
* @param pFinalModel * @param pFinalModel
* @param numOfVnodes * @param numOfVnodes
*/ */
void tscLocalReducerEnvDestroy(tExtMemBuffer **pMemBuffer, tOrderDescriptor *pDesc, SColumnModel *pFinalModel, void tscLocalReducerEnvDestroy(tExtMemBuffer **pMemBuffer, tOrderDescriptor *pDesc, SColumnModel *pFinalModel, SColumnModel *pFFModel,
int32_t numOfVnodes) { int32_t numOfVnodes) {
destroyColumnModel(pFinalModel); destroyColumnModel(pFinalModel);
destroyColumnModel(pFFModel);
tOrderDescDestroy(pDesc); tOrderDescDestroy(pDesc);
for (int32_t i = 0; i < numOfVnodes; ++i) { for (int32_t i = 0; i < numOfVnodes; ++i) {
@ -875,10 +889,10 @@ static void genFinalResWithoutFill(SSqlRes* pRes, SLocalReducer *pLocalReducer,
if (pQueryInfo->limit.offset > 0) { if (pQueryInfo->limit.offset > 0) {
if (pQueryInfo->limit.offset < pRes->numOfRows) { if (pQueryInfo->limit.offset < pRes->numOfRows) {
int32_t prevSize = (int32_t)pBeforeFillData->num; int32_t prevSize = (int32_t)pBeforeFillData->num;
tColModelErase(pLocalReducer->resColModel, pBeforeFillData, prevSize, 0, (int32_t)pQueryInfo->limit.offset - 1); tColModelErase(pLocalReducer->finalModel, pBeforeFillData, prevSize, 0, (int32_t)pQueryInfo->limit.offset - 1);
/* remove the hole in column model */ /* remove the hole in column model */
tColModelCompact(pLocalReducer->resColModel, pBeforeFillData, prevSize); tColModelCompact(pLocalReducer->finalModel, pBeforeFillData, prevSize);
pRes->numOfRows -= pQueryInfo->limit.offset; pRes->numOfRows -= pQueryInfo->limit.offset;
pQueryInfo->limit.offset = 0; pQueryInfo->limit.offset = 0;
@ -900,7 +914,7 @@ static void genFinalResWithoutFill(SSqlRes* pRes, SLocalReducer *pLocalReducer,
pRes->numOfRows -= overflow; pRes->numOfRows -= overflow;
pBeforeFillData->num -= overflow; pBeforeFillData->num -= overflow;
tColModelCompact(pLocalReducer->resColModel, pBeforeFillData, prevSize); tColModelCompact(pLocalReducer->finalModel, pBeforeFillData, prevSize);
// set remain data to be discarded, and reset the interpolation information // set remain data to be discarded, and reset the interpolation information
savePrevRecordAndSetupFillInfo(pLocalReducer, pQueryInfo, pLocalReducer->pFillInfo); savePrevRecordAndSetupFillInfo(pLocalReducer, pQueryInfo, pLocalReducer->pFillInfo);
@ -1242,7 +1256,7 @@ bool genFinalResults(SSqlObj *pSql, SLocalReducer *pLocalReducer, bool noMoreCur
tColModelCompact(pModel, pResBuf, pModel->capacity); tColModelCompact(pModel, pResBuf, pModel->capacity);
if (tscIsSecondStageQuery(pQueryInfo)) { if (tscIsSecondStageQuery(pQueryInfo)) {
doArithmeticCalculate(pQueryInfo, pResBuf, pModel->rowSize, pLocalReducer->finalRowSize); pLocalReducer->finalRowSize = doArithmeticCalculate(pQueryInfo, pResBuf, pModel->rowSize, pLocalReducer->finalRowSize);
} }
#ifdef _DEBUG_VIEW #ifdef _DEBUG_VIEW
@ -1612,7 +1626,7 @@ void tscInitResObjForLocalQuery(SSqlObj *pObj, int32_t numOfRes, int32_t rowLen)
pRes->data = pRes->pLocalReducer->pResultBuf->data; pRes->data = pRes->pLocalReducer->pResultBuf->data;
} }
void doArithmeticCalculate(SQueryInfo* pQueryInfo, tFilePage* pOutput, int32_t rowSize, int32_t finalRowSize) { int32_t doArithmeticCalculate(SQueryInfo* pQueryInfo, tFilePage* pOutput, int32_t rowSize, int32_t finalRowSize) {
char* pbuf = calloc(1, pOutput->num * rowSize); char* pbuf = calloc(1, pOutput->num * rowSize);
size_t size = tscNumOfFields(pQueryInfo); size_t size = tscNumOfFields(pQueryInfo);
@ -1647,8 +1661,10 @@ void doArithmeticCalculate(SQueryInfo* pQueryInfo, tFilePage* pOutput, int32_t r
} }
assert(finalRowSize <= rowSize); assert(finalRowSize <= rowSize);
memcpy(pOutput->data, pbuf, pOutput->num * finalRowSize); memcpy(pOutput->data, pbuf, pOutput->num * offset);
tfree(pbuf); tfree(pbuf);
tfree(arithSup.data); tfree(arithSup.data);
return offset;
} }

View File

@ -547,7 +547,7 @@ static int32_t tscEstimateQueryMsgSize(SSqlCmd *pCmd, int32_t clauseIndex) {
int32_t srcColListSize = (int32_t)(taosArrayGetSize(pQueryInfo->colList) * sizeof(SColumnInfo)); int32_t srcColListSize = (int32_t)(taosArrayGetSize(pQueryInfo->colList) * sizeof(SColumnInfo));
size_t numOfExprs = tscSqlExprNumOfExprs(pQueryInfo); size_t numOfExprs = tscSqlExprNumOfExprs(pQueryInfo);
int32_t exprSize = (int32_t)(sizeof(SSqlFuncMsg) * numOfExprs); int32_t exprSize = (int32_t)(sizeof(SSqlFuncMsg) * numOfExprs * 2);
int32_t tsBufSize = (pQueryInfo->tsBuf != NULL) ? pQueryInfo->tsBuf->fileSize : 0; int32_t tsBufSize = (pQueryInfo->tsBuf != NULL) ? pQueryInfo->tsBuf->fileSize : 0;
@ -787,8 +787,10 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
pSqlFuncExpr = (SSqlFuncMsg *)pMsg; pSqlFuncExpr = (SSqlFuncMsg *)pMsg;
} }
if(tscIsSecondStageQuery(pQueryInfo)) { size_t output = tscNumOfFields(pQueryInfo);
size_t output = tscNumOfFields(pQueryInfo);
if ((tscIsSecondStageQuery(pQueryInfo) || UTIL_TABLE_IS_NORMAL_TABLE(pTableMetaInfo) ||
UTIL_TABLE_IS_CHILD_TABLE(pTableMetaInfo)) && (output != tscSqlExprNumOfExprs(pQueryInfo))) {
pQueryMsg->secondStageOutput = htonl((int32_t) output); pQueryMsg->secondStageOutput = htonl((int32_t) output);
SSqlFuncMsg *pSqlFuncExpr1 = (SSqlFuncMsg *)pMsg; SSqlFuncMsg *pSqlFuncExpr1 = (SSqlFuncMsg *)pMsg;

View File

@ -1644,6 +1644,7 @@ int32_t tscHandleMasterSTableQuery(SSqlObj *pSql) {
tExtMemBuffer ** pMemoryBuf = NULL; tExtMemBuffer ** pMemoryBuf = NULL;
tOrderDescriptor *pDesc = NULL; tOrderDescriptor *pDesc = NULL;
SColumnModel *pModel = NULL; SColumnModel *pModel = NULL;
SColumnModel *pFinalModel = NULL;
pRes->qhandle = 0x1; // hack the qhandle check pRes->qhandle = 0x1; // hack the qhandle check
@ -1662,7 +1663,7 @@ int32_t tscHandleMasterSTableQuery(SSqlObj *pSql) {
assert(pState->numOfSub > 0); assert(pState->numOfSub > 0);
int32_t ret = tscLocalReducerEnvCreate(pSql, &pMemoryBuf, &pDesc, &pModel, nBufferSize); int32_t ret = tscLocalReducerEnvCreate(pSql, &pMemoryBuf, &pDesc, &pModel, &pFinalModel, nBufferSize);
if (ret != 0) { if (ret != 0) {
pRes->code = TSDB_CODE_TSC_OUT_OF_MEMORY; pRes->code = TSDB_CODE_TSC_OUT_OF_MEMORY;
tscQueueAsyncRes(pSql); tscQueueAsyncRes(pSql);
@ -1677,7 +1678,7 @@ int32_t tscHandleMasterSTableQuery(SSqlObj *pSql) {
if (pSql->pSubs == NULL) { if (pSql->pSubs == NULL) {
tfree(pSql->pSubs); tfree(pSql->pSubs);
pRes->code = TSDB_CODE_TSC_OUT_OF_MEMORY; pRes->code = TSDB_CODE_TSC_OUT_OF_MEMORY;
tscLocalReducerEnvDestroy(pMemoryBuf, pDesc, pModel, pState->numOfSub); tscLocalReducerEnvDestroy(pMemoryBuf, pDesc, pModel, pFinalModel,pState->numOfSub);
tscQueueAsyncRes(pSql); tscQueueAsyncRes(pSql);
return ret; return ret;
@ -1707,6 +1708,7 @@ int32_t tscHandleMasterSTableQuery(SSqlObj *pSql) {
trs->subqueryIndex = i; trs->subqueryIndex = i;
trs->pParentSql = pSql; trs->pParentSql = pSql;
trs->pFinalColModel = pModel; trs->pFinalColModel = pModel;
trs->pFFColModel = pFinalModel;
SSqlObj *pNew = tscCreateSTableSubquery(pSql, trs, NULL); SSqlObj *pNew = tscCreateSTableSubquery(pSql, trs, NULL);
if (pNew == NULL) { if (pNew == NULL) {
@ -1730,13 +1732,13 @@ int32_t tscHandleMasterSTableQuery(SSqlObj *pSql) {
tscError("%p failed to prepare subquery structure and launch subqueries", pSql); tscError("%p failed to prepare subquery structure and launch subqueries", pSql);
pRes->code = TSDB_CODE_TSC_OUT_OF_MEMORY; pRes->code = TSDB_CODE_TSC_OUT_OF_MEMORY;
tscLocalReducerEnvDestroy(pMemoryBuf, pDesc, pModel, pState->numOfSub); tscLocalReducerEnvDestroy(pMemoryBuf, pDesc, pModel, pFinalModel, pState->numOfSub);
doCleanupSubqueries(pSql, i); doCleanupSubqueries(pSql, i);
return pRes->code; // free all allocated resource return pRes->code; // free all allocated resource
} }
if (pRes->code == TSDB_CODE_TSC_QUERY_CANCELLED) { if (pRes->code == TSDB_CODE_TSC_QUERY_CANCELLED) {
tscLocalReducerEnvDestroy(pMemoryBuf, pDesc, pModel, pState->numOfSub); tscLocalReducerEnvDestroy(pMemoryBuf, pDesc, pModel, pFinalModel, pState->numOfSub);
doCleanupSubqueries(pSql, i); doCleanupSubqueries(pSql, i);
return pRes->code; return pRes->code;
} }
@ -1876,7 +1878,7 @@ void tscHandleSubqueryError(SRetrieveSupport *trsupport, SSqlObj *pSql, int numO
tstrerror(pParentSql->res.code)); tstrerror(pParentSql->res.code));
// release allocated resource // release allocated resource
tscLocalReducerEnvDestroy(trsupport->pExtMemBuffer, trsupport->pOrderDescriptor, trsupport->pFinalColModel, tscLocalReducerEnvDestroy(trsupport->pExtMemBuffer, trsupport->pOrderDescriptor, trsupport->pFinalColModel, trsupport->pFFColModel,
pState->numOfSub); pState->numOfSub);
tscFreeRetrieveSup(pSql); tscFreeRetrieveSup(pSql);
@ -2312,10 +2314,10 @@ static void doBuildResFromSubqueries(SSqlObj* pSql) {
return; return;
} }
int32_t totalSize = tscGetResRowLength(pQueryInfo->exprList); int32_t rowSize = tscGetResRowLength(pQueryInfo->exprList);
assert(numOfRes * totalSize > 0); assert(numOfRes * rowSize > 0);
char* tmp = realloc(pRes->pRsp, numOfRes * totalSize); char* tmp = realloc(pRes->pRsp, numOfRes * rowSize + sizeof(tFilePage));
if (tmp == NULL) { if (tmp == NULL) {
pRes->code = TSDB_CODE_TSC_OUT_OF_MEMORY; pRes->code = TSDB_CODE_TSC_OUT_OF_MEMORY;
return; return;
@ -2323,9 +2325,12 @@ static void doBuildResFromSubqueries(SSqlObj* pSql) {
pRes->pRsp = tmp; pRes->pRsp = tmp;
} }
pRes->data = pRes->pRsp; tFilePage* pFilePage = (tFilePage*) pRes->pRsp;
pFilePage->num = numOfRes;
pRes->data = pFilePage->data;
char* data = pRes->data; char* data = pRes->data;
int16_t bytes = 0; int16_t bytes = 0;
size_t numOfExprs = tscSqlExprNumOfExprs(pQueryInfo); size_t numOfExprs = tscSqlExprNumOfExprs(pQueryInfo);
@ -2352,6 +2357,16 @@ static void doBuildResFromSubqueries(SSqlObj* pSql) {
pRes->numOfRows = numOfRes; pRes->numOfRows = numOfRes;
pRes->numOfClauseTotal += numOfRes; pRes->numOfClauseTotal += numOfRes;
int32_t finalRowSize = 0;
for(int32_t i = 0; i < tscNumOfFields(pQueryInfo); ++i) {
TAOS_FIELD* pField = tscFieldInfoGetField(&pQueryInfo->fieldsInfo, i);
finalRowSize += pField->bytes;
}
doArithmeticCalculate(pQueryInfo, pFilePage, rowSize, finalRowSize);
pRes->data = pFilePage->data;
} }
void tscBuildResFromSubqueries(SSqlObj *pSql) { void tscBuildResFromSubqueries(SSqlObj *pSql) {

View File

@ -56,6 +56,23 @@
<scope>test</scope> <scope>test</scope>
</dependency> </dependency>
<!-- for restful -->
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpclient</artifactId>
<version>4.5.8</version>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
<version>3.9</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.58</version>
</dependency>
</dependencies> </dependencies>
<build> <build>
<plugins> <plugins>

View File

@ -0,0 +1,161 @@
package com.taosdata.jdbc;
import java.io.*;
import java.sql.Driver;
import java.sql.DriverPropertyInfo;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
import java.util.StringTokenizer;
public abstract class AbstractTaosDriver implements Driver {
private static final String TAOS_CFG_FILENAME = "taos.cfg";
/**
* @param cfgDirPath
* @return return the config dir
**/
protected File loadConfigDir(String cfgDirPath) {
if (cfgDirPath == null)
return loadDefaultConfigDir();
File cfgDir = new File(cfgDirPath);
if (!cfgDir.exists())
return loadDefaultConfigDir();
return cfgDir;
}
/**
* @return search the default config dir, if the config dir is not exist will return null
*/
protected File loadDefaultConfigDir() {
File cfgDir;
File cfgDir_linux = new File("/etc/taos");
cfgDir = cfgDir_linux.exists() ? cfgDir_linux : null;
File cfgDir_windows = new File("C:\\TDengine\\cfg");
cfgDir = (cfgDir == null && cfgDir_windows.exists()) ? cfgDir_windows : cfgDir;
return cfgDir;
}
protected List<String> loadConfigEndpoints(File cfgFile) {
List<String> endpoints = new ArrayList<>();
try (BufferedReader reader = new BufferedReader(new FileReader(cfgFile))) {
String line = null;
while ((line = reader.readLine()) != null) {
if (line.trim().startsWith("firstEp") || line.trim().startsWith("secondEp")) {
endpoints.add(line.substring(line.indexOf('p') + 1).trim());
}
if (endpoints.size() > 1)
break;
}
} catch (FileNotFoundException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
}
return endpoints;
}
protected void loadTaosConfig(Properties info) {
if ((info.getProperty(TSDBDriver.PROPERTY_KEY_HOST) == null ||
info.getProperty(TSDBDriver.PROPERTY_KEY_HOST).isEmpty()) && (
info.getProperty(TSDBDriver.PROPERTY_KEY_PORT) == null ||
info.getProperty(TSDBDriver.PROPERTY_KEY_PORT).isEmpty())) {
File cfgDir = loadConfigDir(info.getProperty(TSDBDriver.PROPERTY_KEY_CONFIG_DIR));
File cfgFile = cfgDir.listFiles((dir, name) -> TAOS_CFG_FILENAME.equalsIgnoreCase(name))[0];
List<String> endpoints = loadConfigEndpoints(cfgFile);
if (!endpoints.isEmpty()) {
info.setProperty(TSDBDriver.PROPERTY_KEY_HOST, endpoints.get(0).split(":")[0]);
info.setProperty(TSDBDriver.PROPERTY_KEY_PORT, endpoints.get(0).split(":")[1]);
}
}
}
protected DriverPropertyInfo[] getPropertyInfo(Properties info) {
DriverPropertyInfo hostProp = new DriverPropertyInfo(TSDBDriver.PROPERTY_KEY_HOST, info.getProperty(TSDBDriver.PROPERTY_KEY_HOST));
hostProp.required = false;
hostProp.description = "Hostname";
DriverPropertyInfo portProp = new DriverPropertyInfo(TSDBDriver.PROPERTY_KEY_PORT, info.getProperty(TSDBDriver.PROPERTY_KEY_PORT, TSDBConstants.DEFAULT_PORT));
portProp.required = false;
portProp.description = "Port";
DriverPropertyInfo dbProp = new DriverPropertyInfo(TSDBDriver.PROPERTY_KEY_DBNAME, info.getProperty(TSDBDriver.PROPERTY_KEY_DBNAME));
dbProp.required = false;
dbProp.description = "Database name";
DriverPropertyInfo userProp = new DriverPropertyInfo(TSDBDriver.PROPERTY_KEY_USER, info.getProperty(TSDBDriver.PROPERTY_KEY_USER));
userProp.required = true;
userProp.description = "User";
DriverPropertyInfo passwordProp = new DriverPropertyInfo(TSDBDriver.PROPERTY_KEY_PASSWORD, info.getProperty(TSDBDriver.PROPERTY_KEY_PASSWORD));
passwordProp.required = true;
passwordProp.description = "Password";
DriverPropertyInfo[] propertyInfo = new DriverPropertyInfo[5];
propertyInfo[0] = hostProp;
propertyInfo[1] = portProp;
propertyInfo[2] = dbProp;
propertyInfo[3] = userProp;
propertyInfo[4] = passwordProp;
return propertyInfo;
}
protected Properties parseURL(String url, Properties defaults) {
Properties urlProps = (defaults != null) ? defaults : new Properties();
// parse properties
int beginningOfSlashes = url.indexOf("//");
int index = url.indexOf("?");
if (index != -1) {
String paramString = url.substring(index + 1, url.length());
url = url.substring(0, index);
StringTokenizer queryParams = new StringTokenizer(paramString, "&");
while (queryParams.hasMoreElements()) {
String parameterValuePair = queryParams.nextToken();
int indexOfEqual = parameterValuePair.indexOf("=");
String parameter = null;
String value = null;
if (indexOfEqual != -1) {
parameter = parameterValuePair.substring(0, indexOfEqual);
if (indexOfEqual + 1 < parameterValuePair.length()) {
value = parameterValuePair.substring(indexOfEqual + 1);
}
}
if ((value != null && value.length() > 0) && (parameter != null && parameter.length() > 0)) {
urlProps.setProperty(parameter, value);
}
}
}
// parse Product Name
String dbProductName = url.substring(0, beginningOfSlashes);
dbProductName = dbProductName.substring(dbProductName.indexOf(":") + 1);
dbProductName = dbProductName.substring(0, dbProductName.indexOf(":"));
// parse dbname
url = url.substring(beginningOfSlashes + 2);
int indexOfSlash = url.indexOf("/");
if (indexOfSlash != -1) {
if (indexOfSlash + 1 < url.length()) {
urlProps.setProperty(TSDBDriver.PROPERTY_KEY_DBNAME, url.substring(indexOfSlash + 1));
}
url = url.substring(0, indexOfSlash);
}
// parse port
int indexOfColon = url.indexOf(":");
if (indexOfColon != -1) {
if (indexOfColon + 1 < url.length()) {
urlProps.setProperty(TSDBDriver.PROPERTY_KEY_PORT, url.substring(indexOfColon + 1));
}
url = url.substring(0, indexOfColon);
}
// parse host
if (url != null && url.length() > 0 && url.trim().length() > 0) {
urlProps.setProperty(TSDBDriver.PROPERTY_KEY_HOST, url);
}
return urlProps;
}
}

View File

@ -14,7 +14,6 @@
*****************************************************************************/ *****************************************************************************/
package com.taosdata.jdbc; package com.taosdata.jdbc;
import java.io.*;
import java.sql.*; import java.sql.*;
import java.util.*; import java.util.*;
import java.util.logging.Logger; import java.util.logging.Logger;
@ -38,7 +37,7 @@ import java.util.logging.Logger;
* register it with the DriverManager. This means that a user can load and * register it with the DriverManager. This means that a user can load and
* register a driver by doing Class.forName("foo.bah.Driver") * register a driver by doing Class.forName("foo.bah.Driver")
*/ */
public class TSDBDriver implements java.sql.Driver { public class TSDBDriver extends AbstractTaosDriver {
@Deprecated @Deprecated
private static final String URL_PREFIX1 = "jdbc:TSDB://"; private static final String URL_PREFIX1 = "jdbc:TSDB://";
@ -97,50 +96,6 @@ public class TSDBDriver implements java.sql.Driver {
} }
} }
private List<String> loadConfigEndpoints(File cfgFile) {
List<String> endpoints = new ArrayList<>();
try (BufferedReader reader = new BufferedReader(new FileReader(cfgFile))) {
String line = null;
while ((line = reader.readLine()) != null) {
if (line.trim().startsWith("firstEp") || line.trim().startsWith("secondEp")) {
endpoints.add(line.substring(line.indexOf('p') + 1).trim());
}
if (endpoints.size() > 1)
break;
}
} catch (FileNotFoundException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
}
return endpoints;
}
/**
* @param cfgDirPath
* @return return the config dir
**/
private File loadConfigDir(String cfgDirPath) {
if (cfgDirPath == null)
return loadDefaultConfigDir();
File cfgDir = new File(cfgDirPath);
if (!cfgDir.exists())
return loadDefaultConfigDir();
return cfgDir;
}
/**
* @return search the default config dir, if the config dir is not exist will return null
*/
private File loadDefaultConfigDir() {
File cfgDir;
File cfgDir_linux = new File("/etc/taos");
cfgDir = cfgDir_linux.exists() ? cfgDir_linux : null;
File cfgDir_windows = new File("C:\\TDengine\\cfg");
cfgDir = (cfgDir == null && cfgDir_windows.exists()) ? cfgDir_windows : cfgDir;
return cfgDir;
}
public Connection connect(String url, Properties info) throws SQLException { public Connection connect(String url, Properties info) throws SQLException {
if (url == null) if (url == null)
throw new SQLException(TSDBConstants.WrapErrMsg("url is not set!")); throw new SQLException(TSDBConstants.WrapErrMsg("url is not set!"));
@ -152,26 +107,12 @@ public class TSDBDriver implements java.sql.Driver {
if ((props = parseURL(url, info)) == null) { if ((props = parseURL(url, info)) == null) {
return null; return null;
} }
//load taos.cfg start //load taos.cfg start
if ((info.getProperty(TSDBDriver.PROPERTY_KEY_HOST) == null || loadTaosConfig(info);
info.getProperty(TSDBDriver.PROPERTY_KEY_HOST).isEmpty()) && (
info.getProperty(TSDBDriver.PROPERTY_KEY_PORT) == null ||
info.getProperty(TSDBDriver.PROPERTY_KEY_PORT).isEmpty())) {
File cfgDir = loadConfigDir(info.getProperty(TSDBDriver.PROPERTY_KEY_CONFIG_DIR));
File cfgFile = cfgDir.listFiles((dir, name) -> "taos.cfg".equalsIgnoreCase(name))[0];
List<String> endpoints = loadConfigEndpoints(cfgFile);
if (!endpoints.isEmpty()) {
info.setProperty(TSDBDriver.PROPERTY_KEY_HOST, endpoints.get(0).split(":")[0]);
info.setProperty(TSDBDriver.PROPERTY_KEY_PORT, endpoints.get(0).split(":")[1]);
}
}
try { try {
TSDBJNIConnector.init((String) props.get(PROPERTY_KEY_CONFIG_DIR), TSDBJNIConnector.init((String) props.get(PROPERTY_KEY_CONFIG_DIR), (String) props.get(PROPERTY_KEY_LOCALE),
(String) props.get(PROPERTY_KEY_LOCALE), (String) props.get(PROPERTY_KEY_CHARSET), (String) props.get(PROPERTY_KEY_TIME_ZONE));
(String) props.get(PROPERTY_KEY_CHARSET),
(String) props.get(PROPERTY_KEY_TIME_ZONE));
Connection newConn = new TSDBConnection(props, this.dbMetaData); Connection newConn = new TSDBConnection(props, this.dbMetaData);
return newConn; return newConn;
} catch (SQLWarning sqlWarning) { } catch (SQLWarning sqlWarning) {
@ -208,39 +149,13 @@ public class TSDBDriver implements java.sql.Driver {
info = parseURL(url, info); info = parseURL(url, info);
} }
DriverPropertyInfo hostProp = new DriverPropertyInfo(PROPERTY_KEY_HOST, info.getProperty(PROPERTY_KEY_HOST)); return getPropertyInfo(info);
hostProp.required = false;
hostProp.description = "Hostname";
DriverPropertyInfo portProp = new DriverPropertyInfo(PROPERTY_KEY_PORT, info.getProperty(PROPERTY_KEY_PORT, TSDBConstants.DEFAULT_PORT));
portProp.required = false;
portProp.description = "Port";
DriverPropertyInfo dbProp = new DriverPropertyInfo(PROPERTY_KEY_DBNAME, info.getProperty(PROPERTY_KEY_DBNAME));
dbProp.required = false;
dbProp.description = "Database name";
DriverPropertyInfo userProp = new DriverPropertyInfo(PROPERTY_KEY_USER, info.getProperty(PROPERTY_KEY_USER));
userProp.required = true;
userProp.description = "User";
DriverPropertyInfo passwordProp = new DriverPropertyInfo(PROPERTY_KEY_PASSWORD, info.getProperty(PROPERTY_KEY_PASSWORD));
passwordProp.required = true;
passwordProp.description = "Password";
DriverPropertyInfo[] propertyInfo = new DriverPropertyInfo[5];
propertyInfo[0] = hostProp;
propertyInfo[1] = portProp;
propertyInfo[2] = dbProp;
propertyInfo[3] = userProp;
propertyInfo[4] = passwordProp;
return propertyInfo;
} }
/** /**
* example: jdbc:TAOS://127.0.0.1:0/db?user=root&password=your_password * example: jdbc:TAOS://127.0.0.1:0/db?user=root&password=your_password
*/ */
@Override
public Properties parseURL(String url, Properties defaults) { public Properties parseURL(String url, Properties defaults) {
Properties urlProps = (defaults != null) ? defaults : new Properties(); Properties urlProps = (defaults != null) ? defaults : new Properties();
if (url == null || url.length() <= 0 || url.trim().length() <= 0) if (url == null || url.length() <= 0 || url.trim().length() <= 0)
@ -296,86 +211,10 @@ public class TSDBDriver implements java.sql.Driver {
if (url != null && url.length() > 0 && url.trim().length() > 0) { if (url != null && url.length() > 0 && url.trim().length() > 0) {
urlProps.setProperty(TSDBDriver.PROPERTY_KEY_HOST, url); urlProps.setProperty(TSDBDriver.PROPERTY_KEY_HOST, url);
} }
this.dbMetaData = new TSDBDatabaseMetaData(dbProductName, urlForMeta, urlProps.getProperty(TSDBDriver.PROPERTY_KEY_USER)); this.dbMetaData = new TSDBDatabaseMetaData(dbProductName, urlForMeta, urlProps.getProperty(TSDBDriver.PROPERTY_KEY_USER));
/*
String urlForMeta = url;
String dbProductName = url.substring(url.indexOf(":") + 1);
dbProductName = dbProductName.substring(0, dbProductName.indexOf(":"));
int beginningOfSlashes = url.indexOf("//");
url = url.substring(beginningOfSlashes + 2);
String host = url.substring(0, url.indexOf(":"));
url = url.substring(url.indexOf(":") + 1);
urlProps.setProperty(PROPERTY_KEY_HOST, host);
String port = url.substring(0, url.indexOf("/"));
urlProps.setProperty(PROPERTY_KEY_PORT, port);
url = url.substring(url.indexOf("/") + 1);
if (url.indexOf("?") != -1) {
String dbName = url.substring(0, url.indexOf("?"));
urlProps.setProperty(PROPERTY_KEY_DBNAME, dbName);
url = url.trim().substring(url.indexOf("?") + 1);
} else {
// without user & password so return
if (!url.trim().isEmpty()) {
String dbName = url.trim();
urlProps.setProperty(PROPERTY_KEY_DBNAME, dbName);
}
this.dbMetaData = new TSDBDatabaseMetaData(dbProductName, urlForMeta, urlProps.getProperty("user"));
return urlProps;
}
String user = "";
if (url.indexOf("&") == -1) {
String[] kvPair = url.trim().split("=");
if (kvPair.length == 2) {
setPropertyValue(urlProps, kvPair);
return urlProps;
}
}
String[] queryStrings = url.trim().split("&");
for (String queryStr : queryStrings) {
String[] kvPair = queryStr.trim().split("=");
if (kvPair.length < 2) {
continue;
}
setPropertyValue(urlProps, kvPair);
}
user = urlProps.getProperty(PROPERTY_KEY_USER).toString();
this.dbMetaData = new TSDBDatabaseMetaData(dbProductName, urlForMeta, user);
*/
return urlProps; return urlProps;
} }
private void setPropertyValue(Properties property, String[] keyValuePair) {
switch (keyValuePair[0].toLowerCase()) {
case PROPERTY_KEY_USER:
property.setProperty(PROPERTY_KEY_USER, keyValuePair[1]);
break;
case PROPERTY_KEY_PASSWORD:
property.setProperty(PROPERTY_KEY_PASSWORD, keyValuePair[1]);
break;
case PROPERTY_KEY_TIME_ZONE:
property.setProperty(PROPERTY_KEY_TIME_ZONE, keyValuePair[1]);
break;
case PROPERTY_KEY_LOCALE:
property.setProperty(PROPERTY_KEY_LOCALE, keyValuePair[1]);
break;
case PROPERTY_KEY_CHARSET:
property.setProperty(PROPERTY_KEY_CHARSET, keyValuePair[1]);
break;
case PROPERTY_KEY_CONFIG_DIR:
property.setProperty(PROPERTY_KEY_CONFIG_DIR, keyValuePair[1]);
break;
}
}
public int getMajorVersion() { public int getMajorVersion() {
return 2; return 2;
} }

View File

@ -0,0 +1,319 @@
package com.taosdata.jdbc.rs;
import com.taosdata.jdbc.TSDBConstants;
import java.sql.*;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.Executor;
public class RestfulConnection implements Connection {
private final String host;
private final int port;
private final Properties props;
private final String database;
private final String url;
public RestfulConnection(String host, String port, Properties props, String database, String url) {
this.host = host;
this.port = Integer.parseInt(port);
this.props = props;
this.database = database;
this.url = url;
}
@Override
public Statement createStatement() throws SQLException {
if (isClosed())
throw new SQLException(TSDBConstants.WrapErrMsg("restful TDengine connection is closed."));
return new RestfulStatement(this, this.database);
}
@Override
public PreparedStatement prepareStatement(String sql) throws SQLException {
return null;
}
@Override
public CallableStatement prepareCall(String sql) throws SQLException {
return null;
}
@Override
public String nativeSQL(String sql) throws SQLException {
return null;
}
@Override
public void setAutoCommit(boolean autoCommit) throws SQLException {
}
@Override
public boolean getAutoCommit() throws SQLException {
return false;
}
@Override
public void commit() throws SQLException {
}
@Override
public void rollback() throws SQLException {
}
@Override
public void close() throws SQLException {
}
@Override
public boolean isClosed() throws SQLException {
return false;
}
@Override
public DatabaseMetaData getMetaData() throws SQLException {
//TODO: RestfulDatabaseMetaData is not implemented
return new RestfulDatabaseMetaData();
}
@Override
public void setReadOnly(boolean readOnly) throws SQLException {
}
@Override
public boolean isReadOnly() throws SQLException {
return false;
}
@Override
public void setCatalog(String catalog) throws SQLException {
}
@Override
public String getCatalog() throws SQLException {
return null;
}
@Override
public void setTransactionIsolation(int level) throws SQLException {
}
@Override
public int getTransactionIsolation() throws SQLException {
return 0;
}
@Override
public SQLWarning getWarnings() throws SQLException {
return null;
}
@Override
public void clearWarnings() throws SQLException {
}
@Override
public Statement createStatement(int resultSetType, int resultSetConcurrency) throws SQLException {
return null;
}
@Override
public PreparedStatement prepareStatement(String sql, int resultSetType, int resultSetConcurrency) throws SQLException {
return null;
}
@Override
public CallableStatement prepareCall(String sql, int resultSetType, int resultSetConcurrency) throws SQLException {
return null;
}
@Override
public Map<String, Class<?>> getTypeMap() throws SQLException {
return null;
}
@Override
public void setTypeMap(Map<String, Class<?>> map) throws SQLException {
}
@Override
public void setHoldability(int holdability) throws SQLException {
}
@Override
public int getHoldability() throws SQLException {
return 0;
}
@Override
public Savepoint setSavepoint() throws SQLException {
return null;
}
@Override
public Savepoint setSavepoint(String name) throws SQLException {
return null;
}
@Override
public void rollback(Savepoint savepoint) throws SQLException {
}
@Override
public void releaseSavepoint(Savepoint savepoint) throws SQLException {
}
@Override
public Statement createStatement(int resultSetType, int resultSetConcurrency, int resultSetHoldability) throws SQLException {
return null;
}
@Override
public PreparedStatement prepareStatement(String sql, int resultSetType, int resultSetConcurrency, int resultSetHoldability) throws SQLException {
return null;
}
@Override
public CallableStatement prepareCall(String sql, int resultSetType, int resultSetConcurrency, int resultSetHoldability) throws SQLException {
return null;
}
@Override
public PreparedStatement prepareStatement(String sql, int autoGeneratedKeys) throws SQLException {
return null;
}
@Override
public PreparedStatement prepareStatement(String sql, int[] columnIndexes) throws SQLException {
return null;
}
@Override
public PreparedStatement prepareStatement(String sql, String[] columnNames) throws SQLException {
return null;
}
@Override
public Clob createClob() throws SQLException {
return null;
}
@Override
public Blob createBlob() throws SQLException {
return null;
}
@Override
public NClob createNClob() throws SQLException {
return null;
}
@Override
public SQLXML createSQLXML() throws SQLException {
return null;
}
@Override
public boolean isValid(int timeout) throws SQLException {
return false;
}
@Override
public void setClientInfo(String name, String value) throws SQLClientInfoException {
}
@Override
public void setClientInfo(Properties properties) throws SQLClientInfoException {
}
@Override
public String getClientInfo(String name) throws SQLException {
return null;
}
@Override
public Properties getClientInfo() throws SQLException {
return null;
}
@Override
public Array createArrayOf(String typeName, Object[] elements) throws SQLException {
return null;
}
@Override
public Struct createStruct(String typeName, Object[] attributes) throws SQLException {
return null;
}
@Override
public void setSchema(String schema) throws SQLException {
}
@Override
public String getSchema() throws SQLException {
return null;
}
@Override
public void abort(Executor executor) throws SQLException {
}
@Override
public void setNetworkTimeout(Executor executor, int milliseconds) throws SQLException {
}
@Override
public int getNetworkTimeout() throws SQLException {
return 0;
}
@Override
public <T> T unwrap(Class<T> iface) throws SQLException {
return null;
}
@Override
public boolean isWrapperFor(Class<?> iface) throws SQLException {
return false;
}
public String getHost() {
return host;
}
public int getPort() {
return port;
}
public Properties getProps() {
return props;
}
public String getDatabase() {
return database;
}
public String getUrl() {
return url;
}
}

View File

@ -0,0 +1,886 @@
package com.taosdata.jdbc.rs;
import java.sql.*;
public class RestfulDatabaseMetaData implements DatabaseMetaData {
@Override
public boolean allProceduresAreCallable() throws SQLException {
return false;
}
@Override
public boolean allTablesAreSelectable() throws SQLException {
return false;
}
@Override
public String getURL() throws SQLException {
return null;
}
@Override
public String getUserName() throws SQLException {
return null;
}
@Override
public boolean isReadOnly() throws SQLException {
return false;
}
@Override
public boolean nullsAreSortedHigh() throws SQLException {
return false;
}
@Override
public boolean nullsAreSortedLow() throws SQLException {
return false;
}
@Override
public boolean nullsAreSortedAtStart() throws SQLException {
return false;
}
@Override
public boolean nullsAreSortedAtEnd() throws SQLException {
return false;
}
@Override
public String getDatabaseProductName() throws SQLException {
return null;
}
@Override
public String getDatabaseProductVersion() throws SQLException {
return null;
}
@Override
public String getDriverName() throws SQLException {
return null;
}
@Override
public String getDriverVersion() throws SQLException {
return null;
}
@Override
public int getDriverMajorVersion() {
return 0;
}
@Override
public int getDriverMinorVersion() {
return 0;
}
@Override
public boolean usesLocalFiles() throws SQLException {
return false;
}
@Override
public boolean usesLocalFilePerTable() throws SQLException {
return false;
}
@Override
public boolean supportsMixedCaseIdentifiers() throws SQLException {
return false;
}
@Override
public boolean storesUpperCaseIdentifiers() throws SQLException {
return false;
}
@Override
public boolean storesLowerCaseIdentifiers() throws SQLException {
return false;
}
@Override
public boolean storesMixedCaseIdentifiers() throws SQLException {
return false;
}
@Override
public boolean supportsMixedCaseQuotedIdentifiers() throws SQLException {
return false;
}
@Override
public boolean storesUpperCaseQuotedIdentifiers() throws SQLException {
return false;
}
@Override
public boolean storesLowerCaseQuotedIdentifiers() throws SQLException {
return false;
}
@Override
public boolean storesMixedCaseQuotedIdentifiers() throws SQLException {
return false;
}
@Override
public String getIdentifierQuoteString() throws SQLException {
return null;
}
@Override
public String getSQLKeywords() throws SQLException {
return null;
}
@Override
public String getNumericFunctions() throws SQLException {
return null;
}
@Override
public String getStringFunctions() throws SQLException {
return null;
}
@Override
public String getSystemFunctions() throws SQLException {
return null;
}
@Override
public String getTimeDateFunctions() throws SQLException {
return null;
}
@Override
public String getSearchStringEscape() throws SQLException {
return null;
}
@Override
public String getExtraNameCharacters() throws SQLException {
return null;
}
@Override
public boolean supportsAlterTableWithAddColumn() throws SQLException {
return false;
}
@Override
public boolean supportsAlterTableWithDropColumn() throws SQLException {
return false;
}
@Override
public boolean supportsColumnAliasing() throws SQLException {
return false;
}
@Override
public boolean nullPlusNonNullIsNull() throws SQLException {
return false;
}
@Override
public boolean supportsConvert() throws SQLException {
return false;
}
@Override
public boolean supportsConvert(int fromType, int toType) throws SQLException {
return false;
}
@Override
public boolean supportsTableCorrelationNames() throws SQLException {
return false;
}
@Override
public boolean supportsDifferentTableCorrelationNames() throws SQLException {
return false;
}
@Override
public boolean supportsExpressionsInOrderBy() throws SQLException {
return false;
}
@Override
public boolean supportsOrderByUnrelated() throws SQLException {
return false;
}
@Override
public boolean supportsGroupBy() throws SQLException {
return false;
}
@Override
public boolean supportsGroupByUnrelated() throws SQLException {
return false;
}
@Override
public boolean supportsGroupByBeyondSelect() throws SQLException {
return false;
}
@Override
public boolean supportsLikeEscapeClause() throws SQLException {
return false;
}
@Override
public boolean supportsMultipleResultSets() throws SQLException {
return false;
}
@Override
public boolean supportsMultipleTransactions() throws SQLException {
return false;
}
@Override
public boolean supportsNonNullableColumns() throws SQLException {
return false;
}
@Override
public boolean supportsMinimumSQLGrammar() throws SQLException {
return false;
}
@Override
public boolean supportsCoreSQLGrammar() throws SQLException {
return false;
}
@Override
public boolean supportsExtendedSQLGrammar() throws SQLException {
return false;
}
@Override
public boolean supportsANSI92EntryLevelSQL() throws SQLException {
return false;
}
@Override
public boolean supportsANSI92IntermediateSQL() throws SQLException {
return false;
}
@Override
public boolean supportsANSI92FullSQL() throws SQLException {
return false;
}
@Override
public boolean supportsIntegrityEnhancementFacility() throws SQLException {
return false;
}
@Override
public boolean supportsOuterJoins() throws SQLException {
return false;
}
@Override
public boolean supportsFullOuterJoins() throws SQLException {
return false;
}
@Override
public boolean supportsLimitedOuterJoins() throws SQLException {
return false;
}
@Override
public String getSchemaTerm() throws SQLException {
return null;
}
@Override
public String getProcedureTerm() throws SQLException {
return null;
}
@Override
public String getCatalogTerm() throws SQLException {
return null;
}
@Override
public boolean isCatalogAtStart() throws SQLException {
return false;
}
@Override
public String getCatalogSeparator() throws SQLException {
return null;
}
@Override
public boolean supportsSchemasInDataManipulation() throws SQLException {
return false;
}
@Override
public boolean supportsSchemasInProcedureCalls() throws SQLException {
return false;
}
@Override
public boolean supportsSchemasInTableDefinitions() throws SQLException {
return false;
}
@Override
public boolean supportsSchemasInIndexDefinitions() throws SQLException {
return false;
}
@Override
public boolean supportsSchemasInPrivilegeDefinitions() throws SQLException {
return false;
}
@Override
public boolean supportsCatalogsInDataManipulation() throws SQLException {
return false;
}
@Override
public boolean supportsCatalogsInProcedureCalls() throws SQLException {
return false;
}
@Override
public boolean supportsCatalogsInTableDefinitions() throws SQLException {
return false;
}
@Override
public boolean supportsCatalogsInIndexDefinitions() throws SQLException {
return false;
}
@Override
public boolean supportsCatalogsInPrivilegeDefinitions() throws SQLException {
return false;
}
@Override
public boolean supportsPositionedDelete() throws SQLException {
return false;
}
@Override
public boolean supportsPositionedUpdate() throws SQLException {
return false;
}
@Override
public boolean supportsSelectForUpdate() throws SQLException {
return false;
}
@Override
public boolean supportsStoredProcedures() throws SQLException {
return false;
}
@Override
public boolean supportsSubqueriesInComparisons() throws SQLException {
return false;
}
@Override
public boolean supportsSubqueriesInExists() throws SQLException {
return false;
}
@Override
public boolean supportsSubqueriesInIns() throws SQLException {
return false;
}
@Override
public boolean supportsSubqueriesInQuantifieds() throws SQLException {
return false;
}
@Override
public boolean supportsCorrelatedSubqueries() throws SQLException {
return false;
}
@Override
public boolean supportsUnion() throws SQLException {
return false;
}
@Override
public boolean supportsUnionAll() throws SQLException {
return false;
}
@Override
public boolean supportsOpenCursorsAcrossCommit() throws SQLException {
return false;
}
@Override
public boolean supportsOpenCursorsAcrossRollback() throws SQLException {
return false;
}
@Override
public boolean supportsOpenStatementsAcrossCommit() throws SQLException {
return false;
}
@Override
public boolean supportsOpenStatementsAcrossRollback() throws SQLException {
return false;
}
@Override
public int getMaxBinaryLiteralLength() throws SQLException {
return 0;
}
@Override
public int getMaxCharLiteralLength() throws SQLException {
return 0;
}
@Override
public int getMaxColumnNameLength() throws SQLException {
return 0;
}
@Override
public int getMaxColumnsInGroupBy() throws SQLException {
return 0;
}
@Override
public int getMaxColumnsInIndex() throws SQLException {
return 0;
}
@Override
public int getMaxColumnsInOrderBy() throws SQLException {
return 0;
}
@Override
public int getMaxColumnsInSelect() throws SQLException {
return 0;
}
@Override
public int getMaxColumnsInTable() throws SQLException {
return 0;
}
@Override
public int getMaxConnections() throws SQLException {
return 0;
}
@Override
public int getMaxCursorNameLength() throws SQLException {
return 0;
}
@Override
public int getMaxIndexLength() throws SQLException {
return 0;
}
@Override
public int getMaxSchemaNameLength() throws SQLException {
return 0;
}
@Override
public int getMaxProcedureNameLength() throws SQLException {
return 0;
}
@Override
public int getMaxCatalogNameLength() throws SQLException {
return 0;
}
@Override
public int getMaxRowSize() throws SQLException {
return 0;
}
@Override
public boolean doesMaxRowSizeIncludeBlobs() throws SQLException {
return false;
}
@Override
public int getMaxStatementLength() throws SQLException {
return 0;
}
@Override
public int getMaxStatements() throws SQLException {
return 0;
}
@Override
public int getMaxTableNameLength() throws SQLException {
return 0;
}
@Override
public int getMaxTablesInSelect() throws SQLException {
return 0;
}
@Override
public int getMaxUserNameLength() throws SQLException {
return 0;
}
@Override
public int getDefaultTransactionIsolation() throws SQLException {
return 0;
}
@Override
public boolean supportsTransactions() throws SQLException {
return false;
}
@Override
public boolean supportsTransactionIsolationLevel(int level) throws SQLException {
return false;
}
@Override
public boolean supportsDataDefinitionAndDataManipulationTransactions() throws SQLException {
return false;
}
@Override
public boolean supportsDataManipulationTransactionsOnly() throws SQLException {
return false;
}
@Override
public boolean dataDefinitionCausesTransactionCommit() throws SQLException {
return false;
}
@Override
public boolean dataDefinitionIgnoredInTransactions() throws SQLException {
return false;
}
@Override
public ResultSet getProcedures(String catalog, String schemaPattern, String procedureNamePattern) throws SQLException {
return null;
}
@Override
public ResultSet getProcedureColumns(String catalog, String schemaPattern, String procedureNamePattern, String columnNamePattern) throws SQLException {
return null;
}
@Override
public ResultSet getTables(String catalog, String schemaPattern, String tableNamePattern, String[] types) throws SQLException {
return null;
}
@Override
public ResultSet getSchemas() throws SQLException {
return null;
}
@Override
public ResultSet getCatalogs() throws SQLException {
return null;
}
@Override
public ResultSet getTableTypes() throws SQLException {
return null;
}
@Override
public ResultSet getColumns(String catalog, String schemaPattern, String tableNamePattern, String columnNamePattern) throws SQLException {
return null;
}
@Override
public ResultSet getColumnPrivileges(String catalog, String schema, String table, String columnNamePattern) throws SQLException {
return null;
}
@Override
public ResultSet getTablePrivileges(String catalog, String schemaPattern, String tableNamePattern) throws SQLException {
return null;
}
@Override
public ResultSet getBestRowIdentifier(String catalog, String schema, String table, int scope, boolean nullable) throws SQLException {
return null;
}
@Override
public ResultSet getVersionColumns(String catalog, String schema, String table) throws SQLException {
return null;
}
@Override
public ResultSet getPrimaryKeys(String catalog, String schema, String table) throws SQLException {
return null;
}
@Override
public ResultSet getImportedKeys(String catalog, String schema, String table) throws SQLException {
return null;
}
@Override
public ResultSet getExportedKeys(String catalog, String schema, String table) throws SQLException {
return null;
}
@Override
public ResultSet getCrossReference(String parentCatalog, String parentSchema, String parentTable, String foreignCatalog, String foreignSchema, String foreignTable) throws SQLException {
return null;
}
@Override
public ResultSet getTypeInfo() throws SQLException {
return null;
}
@Override
public ResultSet getIndexInfo(String catalog, String schema, String table, boolean unique, boolean approximate) throws SQLException {
return null;
}
@Override
public boolean supportsResultSetType(int type) throws SQLException {
return false;
}
@Override
public boolean supportsResultSetConcurrency(int type, int concurrency) throws SQLException {
return false;
}
@Override
public boolean ownUpdatesAreVisible(int type) throws SQLException {
return false;
}
@Override
public boolean ownDeletesAreVisible(int type) throws SQLException {
return false;
}
@Override
public boolean ownInsertsAreVisible(int type) throws SQLException {
return false;
}
@Override
public boolean othersUpdatesAreVisible(int type) throws SQLException {
return false;
}
@Override
public boolean othersDeletesAreVisible(int type) throws SQLException {
return false;
}
@Override
public boolean othersInsertsAreVisible(int type) throws SQLException {
return false;
}
@Override
public boolean updatesAreDetected(int type) throws SQLException {
return false;
}
@Override
public boolean deletesAreDetected(int type) throws SQLException {
return false;
}
@Override
public boolean insertsAreDetected(int type) throws SQLException {
return false;
}
@Override
public boolean supportsBatchUpdates() throws SQLException {
return false;
}
@Override
public ResultSet getUDTs(String catalog, String schemaPattern, String typeNamePattern, int[] types) throws SQLException {
return null;
}
@Override
public Connection getConnection() throws SQLException {
return null;
}
@Override
public boolean supportsSavepoints() throws SQLException {
return false;
}
@Override
public boolean supportsNamedParameters() throws SQLException {
return false;
}
@Override
public boolean supportsMultipleOpenResults() throws SQLException {
return false;
}
@Override
public boolean supportsGetGeneratedKeys() throws SQLException {
return false;
}
@Override
public ResultSet getSuperTypes(String catalog, String schemaPattern, String typeNamePattern) throws SQLException {
return null;
}
@Override
public ResultSet getSuperTables(String catalog, String schemaPattern, String tableNamePattern) throws SQLException {
return null;
}
@Override
public ResultSet getAttributes(String catalog, String schemaPattern, String typeNamePattern, String attributeNamePattern) throws SQLException {
return null;
}
@Override
public boolean supportsResultSetHoldability(int holdability) throws SQLException {
return false;
}
@Override
public int getResultSetHoldability() throws SQLException {
return 0;
}
@Override
public int getDatabaseMajorVersion() throws SQLException {
return 0;
}
@Override
public int getDatabaseMinorVersion() throws SQLException {
return 0;
}
@Override
public int getJDBCMajorVersion() throws SQLException {
return 0;
}
@Override
public int getJDBCMinorVersion() throws SQLException {
return 0;
}
@Override
public int getSQLStateType() throws SQLException {
return 0;
}
@Override
public boolean locatorsUpdateCopy() throws SQLException {
return false;
}
@Override
public boolean supportsStatementPooling() throws SQLException {
return false;
}
@Override
public RowIdLifetime getRowIdLifetime() throws SQLException {
return null;
}
@Override
public ResultSet getSchemas(String catalog, String schemaPattern) throws SQLException {
return null;
}
@Override
public boolean supportsStoredFunctionsUsingCallSyntax() throws SQLException {
return false;
}
@Override
public boolean autoCommitFailureClosesAllResultSets() throws SQLException {
return false;
}
@Override
public ResultSet getClientInfoProperties() throws SQLException {
return null;
}
@Override
public ResultSet getFunctions(String catalog, String schemaPattern, String functionNamePattern) throws SQLException {
return null;
}
@Override
public ResultSet getFunctionColumns(String catalog, String schemaPattern, String functionNamePattern, String columnNamePattern) throws SQLException {
return null;
}
@Override
public ResultSet getPseudoColumns(String catalog, String schemaPattern, String tableNamePattern, String columnNamePattern) throws SQLException {
return null;
}
@Override
public boolean generatedKeyAlwaysReturned() throws SQLException {
return false;
}
@Override
public <T> T unwrap(Class<T> iface) throws SQLException {
return null;
}
@Override
public boolean isWrapperFor(Class<?> iface) throws SQLException {
return false;
}
}

View File

@ -0,0 +1,91 @@
package com.taosdata.jdbc.rs;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.taosdata.jdbc.AbstractTaosDriver;
import com.taosdata.jdbc.TSDBConstants;
import com.taosdata.jdbc.TSDBDriver;
import com.taosdata.jdbc.rs.util.HttpClientPoolUtil;
import java.sql.*;
import java.util.Properties;
import java.util.logging.Logger;
public class RestfulDriver extends AbstractTaosDriver {
private static final String URL_PREFIX = "jdbc:TAOS-RS://";
static {
try {
DriverManager.registerDriver(new RestfulDriver());
} catch (SQLException e) {
throw new RuntimeException(TSDBConstants.WrapErrMsg("can not register Restful JDBC driver"), e);
}
}
@Override
public Connection connect(String url, Properties info) throws SQLException {
// throw SQLException if url is null
if (url == null)
throw new SQLException(TSDBConstants.WrapErrMsg("url is not set!"));
// return null if url is not be accepted
if (!acceptsURL(url))
return null;
Properties props = parseURL(url, info);
String host = props.getProperty(TSDBDriver.PROPERTY_KEY_HOST, "localhost");
String port = props.getProperty(TSDBDriver.PROPERTY_KEY_PORT, "6041");
String database = props.getProperty(TSDBDriver.PROPERTY_KEY_DBNAME);
String loginUrl = "http://" + props.getProperty(TSDBDriver.PROPERTY_KEY_HOST) + ":"
+ props.getProperty(TSDBDriver.PROPERTY_KEY_PORT) + "/rest/login/"
+ props.getProperty(TSDBDriver.PROPERTY_KEY_USER) + "/"
+ props.getProperty(TSDBDriver.PROPERTY_KEY_PASSWORD) + "";
String result = HttpClientPoolUtil.execute(loginUrl);
JSONObject jsonResult = JSON.parseObject(result);
String status = jsonResult.getString("status");
if (!status.equals("succ")) {
throw new SQLException(jsonResult.getString("desc"));
}
return new RestfulConnection(host, port, props, database, url);
}
@Override
public boolean acceptsURL(String url) throws SQLException {
if (url == null)
throw new SQLException(TSDBConstants.WrapErrMsg("url is null"));
return (url != null && url.length() > 0 && url.trim().length() > 0) && url.startsWith(URL_PREFIX);
}
@Override
public DriverPropertyInfo[] getPropertyInfo(String url, Properties info) throws SQLException {
if (info == null) {
info = new Properties();
}
if (acceptsURL(url)) {
info = parseURL(url, info);
}
return getPropertyInfo(info);
}
@Override
public int getMajorVersion() {
return 2;
}
@Override
public int getMinorVersion() {
return 0;
}
@Override
public boolean jdbcCompliant() {
return false;
}
@Override
public Logger getParentLogger() throws SQLFeatureNotSupportedException {
return null;
}
}

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,129 @@
package com.taosdata.jdbc.rs;
import java.sql.ResultSetMetaData;
import java.sql.SQLException;
import java.util.List;
public class RestfulResultSetMetaData implements ResultSetMetaData {
private List<String> fields;
public RestfulResultSetMetaData(List<String> fields) {
this.fields = fields;
}
@Override
public int getColumnCount() throws SQLException {
return fields.size();
}
@Override
public boolean isAutoIncrement(int column) throws SQLException {
return false;
}
@Override
public boolean isCaseSensitive(int column) throws SQLException {
return false;
}
@Override
public boolean isSearchable(int column) throws SQLException {
return false;
}
@Override
public boolean isCurrency(int column) throws SQLException {
return false;
}
@Override
public int isNullable(int column) throws SQLException {
return 0;
}
@Override
public boolean isSigned(int column) throws SQLException {
return false;
}
@Override
public int getColumnDisplaySize(int column) throws SQLException {
return 0;
}
@Override
public String getColumnLabel(int column) throws SQLException {
return fields.get(column - 1);
}
@Override
public String getColumnName(int column) throws SQLException {
return null;
}
@Override
public String getSchemaName(int column) throws SQLException {
return null;
}
@Override
public int getPrecision(int column) throws SQLException {
return 0;
}
@Override
public int getScale(int column) throws SQLException {
return 0;
}
@Override
public String getTableName(int column) throws SQLException {
return null;
}
@Override
public String getCatalogName(int column) throws SQLException {
return null;
}
@Override
public int getColumnType(int column) throws SQLException {
return 0;
}
@Override
public String getColumnTypeName(int column) throws SQLException {
return null;
}
@Override
public boolean isReadOnly(int column) throws SQLException {
return false;
}
@Override
public boolean isWritable(int column) throws SQLException {
return false;
}
@Override
public boolean isDefinitelyWritable(int column) throws SQLException {
return false;
}
@Override
public String getColumnClassName(int column) throws SQLException {
return null;
}
@Override
public <T> T unwrap(Class<T> iface) throws SQLException {
return null;
}
@Override
public boolean isWrapperFor(Class<?> iface) throws SQLException {
return false;
}
}

View File

@ -0,0 +1,280 @@
package com.taosdata.jdbc.rs;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.taosdata.jdbc.TSDBConstants;
import com.taosdata.jdbc.rs.util.HttpClientPoolUtil;
import java.sql.*;
import java.util.Arrays;
import java.util.List;
public class RestfulStatement implements Statement {
private final String catalog;
private final RestfulConnection conn;
public RestfulStatement(RestfulConnection c, String catalog) {
this.conn = c;
this.catalog = catalog;
}
@Override
public ResultSet executeQuery(String sql) throws SQLException {
final String url = "http://" + conn.getHost() + ":"+conn.getPort()+"/rest/sql";
String result = HttpClientPoolUtil.execute(url, sql);
String fields = "";
List<String> words = Arrays.asList(sql.split(" "));
if (words.get(0).equalsIgnoreCase("select")) {
int index = 0;
if (words.contains("from")) {
index = words.indexOf("from");
}
if (words.contains("FROM")) {
index = words.indexOf("FROM");
}
fields = HttpClientPoolUtil.execute(url, "DESCRIBE " + words.get(index + 1));
}
JSONObject jsonObject = JSON.parseObject(result);
if (jsonObject.getString("status").equals("error")) {
throw new SQLException(TSDBConstants.WrapErrMsg("SQL execution error: " +
jsonObject.getString("desc") + "\n" +
"error code: " + jsonObject.getString("code")));
}
String dataStr = jsonObject.getString("data");
if ("use".equalsIgnoreCase(fields.split(" ")[0])) {
return new RestfulResultSet(dataStr, "");
}
JSONObject jsonField = JSON.parseObject(fields);
if (jsonField == null) {
return new RestfulResultSet(dataStr, "");
}
if (jsonField.getString("status").equals("error")) {
throw new SQLException(TSDBConstants.WrapErrMsg("SQL execution error: " +
jsonField.getString("desc") + "\n" +
"error code: " + jsonField.getString("code")));
}
String fieldData = jsonField.getString("data");
return new RestfulResultSet(dataStr, fieldData);
}
@Override
public int executeUpdate(String sql) throws SQLException {
return 0;
}
@Override
public void close() throws SQLException {
}
@Override
public int getMaxFieldSize() throws SQLException {
return 0;
}
@Override
public void setMaxFieldSize(int max) throws SQLException {
}
@Override
public int getMaxRows() throws SQLException {
return 0;
}
@Override
public void setMaxRows(int max) throws SQLException {
}
@Override
public void setEscapeProcessing(boolean enable) throws SQLException {
}
@Override
public int getQueryTimeout() throws SQLException {
return 0;
}
@Override
public void setQueryTimeout(int seconds) throws SQLException {
}
@Override
public void cancel() throws SQLException {
}
@Override
public SQLWarning getWarnings() throws SQLException {
return null;
}
@Override
public void clearWarnings() throws SQLException {
}
@Override
public void setCursorName(String name) throws SQLException {
}
@Override
public boolean execute(String sql) throws SQLException {
return false;
}
@Override
public ResultSet getResultSet() throws SQLException {
return null;
}
@Override
public int getUpdateCount() throws SQLException {
return 0;
}
@Override
public boolean getMoreResults() throws SQLException {
return false;
}
@Override
public void setFetchDirection(int direction) throws SQLException {
}
@Override
public int getFetchDirection() throws SQLException {
return 0;
}
@Override
public void setFetchSize(int rows) throws SQLException {
}
@Override
public int getFetchSize() throws SQLException {
return 0;
}
@Override
public int getResultSetConcurrency() throws SQLException {
return 0;
}
@Override
public int getResultSetType() throws SQLException {
return 0;
}
@Override
public void addBatch(String sql) throws SQLException {
}
@Override
public void clearBatch() throws SQLException {
}
@Override
public int[] executeBatch() throws SQLException {
return new int[0];
}
@Override
public Connection getConnection() throws SQLException {
return null;
}
@Override
public boolean getMoreResults(int current) throws SQLException {
return false;
}
@Override
public ResultSet getGeneratedKeys() throws SQLException {
return null;
}
@Override
public int executeUpdate(String sql, int autoGeneratedKeys) throws SQLException {
return 0;
}
@Override
public int executeUpdate(String sql, int[] columnIndexes) throws SQLException {
return 0;
}
@Override
public int executeUpdate(String sql, String[] columnNames) throws SQLException {
return 0;
}
@Override
public boolean execute(String sql, int autoGeneratedKeys) throws SQLException {
return false;
}
@Override
public boolean execute(String sql, int[] columnIndexes) throws SQLException {
return false;
}
@Override
public boolean execute(String sql, String[] columnNames) throws SQLException {
return false;
}
@Override
public int getResultSetHoldability() throws SQLException {
return 0;
}
@Override
public boolean isClosed() throws SQLException {
return false;
}
@Override
public void setPoolable(boolean poolable) throws SQLException {
}
@Override
public boolean isPoolable() throws SQLException {
return false;
}
@Override
public void closeOnCompletion() throws SQLException {
}
@Override
public boolean isCloseOnCompletion() throws SQLException {
return false;
}
@Override
public <T> T unwrap(Class<T> iface) throws SQLException {
return null;
}
@Override
public boolean isWrapperFor(Class<?> iface) throws SQLException {
return false;
}
}

View File

@ -0,0 +1,222 @@
package com.taosdata.jdbc.rs.util;
import org.apache.commons.lang3.StringUtils;
import org.apache.http.HeaderElement;
import org.apache.http.HeaderElementIterator;
import org.apache.http.HttpEntity;
import org.apache.http.client.config.RequestConfig;
import org.apache.http.client.methods.*;
import org.apache.http.client.protocol.HttpClientContext;
import org.apache.http.conn.ConnectionKeepAliveStrategy;
import org.apache.http.entity.StringEntity;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClients;
import org.apache.http.impl.conn.PoolingHttpClientConnectionManager;
import org.apache.http.message.BasicHeaderElementIterator;
import org.apache.http.protocol.HTTP;
import org.apache.http.protocol.HttpContext;
import org.apache.http.util.EntityUtils;
public class HttpClientPoolUtil {
public static PoolingHttpClientConnectionManager cm = null;
public static CloseableHttpClient httpClient = null;
/**
* 默认content 类型
*/
private static final String DEFAULT_CONTENT_TYPE = "application/json";
/**
* 默认请求超时时间30s
*/
private static final int DEFAULT_TIME_OUT = 15000;
private static final int count = 32;
private static final int totalCount = 1000;
private static final int Http_Default_Keep_Time = 15000;
/**
* 初始化连接池
*/
public static synchronized void initPools() {
if (httpClient == null) {
cm = new PoolingHttpClientConnectionManager();
cm.setDefaultMaxPerRoute(count);
cm.setMaxTotal(totalCount);
httpClient = HttpClients.custom().setKeepAliveStrategy(defaultStrategy).setConnectionManager(cm).build();
}
}
/**
* Http connection keepAlive 设置
*/
public static ConnectionKeepAliveStrategy defaultStrategy = (response, context) -> {
HeaderElementIterator it = new BasicHeaderElementIterator(response.headerIterator(HTTP.CONN_KEEP_ALIVE));
int keepTime = Http_Default_Keep_Time * 1000;
while (it.hasNext()) {
HeaderElement headerElement = it.nextElement();
String param = headerElement.getName();
String value = headerElement.getValue();
if (value != null && param.equalsIgnoreCase("timeout")) {
try {
return Long.parseLong(value) * 1000;
} catch (Exception e) {
new Exception(
"format KeepAlive timeout exception, exception:" + e.toString())
.printStackTrace();
}
}
}
return keepTime;
};
public static CloseableHttpClient getHttpClient() {
return httpClient;
}
public static PoolingHttpClientConnectionManager getHttpConnectionManager() {
return cm;
}
/**
* 执行http post请求
* 默认采用Content-Typeapplication/jsonAcceptapplication/json
*
* @param uri 请求地址
* @param data 请求数据
* @return responseBody
*/
public static String execute(String uri, String data) {
long startTime = System.currentTimeMillis();
HttpEntity httpEntity = null;
HttpEntityEnclosingRequestBase method = null;
String responseBody = "";
try {
if (httpClient == null) {
initPools();
}
method = (HttpEntityEnclosingRequestBase) getRequest(uri, HttpPost.METHOD_NAME, DEFAULT_CONTENT_TYPE, 0);
method.setEntity(new StringEntity(data));
HttpContext context = HttpClientContext.create();
CloseableHttpResponse httpResponse = httpClient.execute(method, context);
httpEntity = httpResponse.getEntity();
if (httpEntity != null) {
responseBody = EntityUtils.toString(httpEntity, "UTF-8");
}
} catch (Exception e) {
if (method != null) {
method.abort();
}
// e.printStackTrace();
// logger.error("execute post request exception, url:" + uri + ", exception:" + e.toString()
// + ", cost time(ms):" + (System.currentTimeMillis() - startTime));
new Exception("execute post request exception, url:"
+ uri + ", exception:" + e.toString() +
", cost time(ms):" + (System.currentTimeMillis() - startTime))
.printStackTrace();
} finally {
if (httpEntity != null) {
try {
EntityUtils.consumeQuietly(httpEntity);
} catch (Exception e) {
// e.printStackTrace();
// logger.error("close response exception, url:" + uri + ", exception:" + e.toString()
// + ", cost time(ms):" + (System.currentTimeMillis() - startTime));
new Exception(
"close response exception, url:" + uri +
", exception:" + e.toString()
+ ", cost time(ms):" + (System.currentTimeMillis() - startTime))
.printStackTrace();
}
}
}
return responseBody;
}
/**
* * 创建请求
*
* @param uri 请求url
* @param methodName 请求的方法类型
* @param contentType contentType类型
* @param timeout 超时时间
* @return HttpRequestBase 返回类型
* @author lisc
*/
public static HttpRequestBase getRequest(String uri, String methodName, String contentType, int timeout) {
if (httpClient == null) {
initPools();
}
HttpRequestBase method;
if (timeout <= 0) {
timeout = DEFAULT_TIME_OUT;
}
RequestConfig requestConfig = RequestConfig.custom().setSocketTimeout(timeout * 1000)
.setConnectTimeout(timeout * 1000).setConnectionRequestTimeout(timeout * 1000)
.setExpectContinueEnabled(false).build();
if (HttpPut.METHOD_NAME.equalsIgnoreCase(methodName)) {
method = new HttpPut(uri);
} else if (HttpPost.METHOD_NAME.equalsIgnoreCase(methodName)) {
method = new HttpPost(uri);
} else if (HttpGet.METHOD_NAME.equalsIgnoreCase(methodName)) {
method = new HttpGet(uri);
} else {
method = new HttpPost(uri);
}
if (StringUtils.isBlank(contentType)) {
contentType = DEFAULT_CONTENT_TYPE;
}
method.addHeader("Content-Type", contentType);
method.addHeader("Accept", contentType);
method.setConfig(requestConfig);
return method;
}
/**
* 执行GET 请求
*
* @param uri 网址
* @return responseBody
*/
public static String execute(String uri) {
long startTime = System.currentTimeMillis();
HttpEntity httpEntity = null;
HttpRequestBase method = null;
String responseBody = "";
try {
if (httpClient == null) {
initPools();
}
method = getRequest(uri, HttpGet.METHOD_NAME, DEFAULT_CONTENT_TYPE, 0);
HttpContext context = HttpClientContext.create();
CloseableHttpResponse httpResponse = httpClient.execute(method, context);
httpEntity = httpResponse.getEntity();
if (httpEntity != null) {
responseBody = EntityUtils.toString(httpEntity, "UTF-8");
// logger.info("请求URL: " + uri + "+ 返回状态码:" + httpResponse.getStatusLine().getStatusCode());
}
} catch (Exception e) {
if (method != null) {
method.abort();
}
e.printStackTrace();
// logger.error("execute get request exception, url:" + uri + ", exception:" + e.toString() + ",cost time(ms):"
// + (System.currentTimeMillis() - startTime));
System.out.println("log:调用 HttpClientPoolUtil execute get request exception, url:" + uri + ", exception:" + e.toString() + ",cost time(ms):"
+ (System.currentTimeMillis() - startTime));
} finally {
if (httpEntity != null) {
try {
EntityUtils.consumeQuietly(httpEntity);
} catch (Exception e) {
// e.printStackTrace();
// logger.error("close response exception, url:" + uri + ", exception:" + e.toString()
// + ",cost time(ms):" + (System.currentTimeMillis() - startTime));
new Exception("close response exception, url:" + uri + ", exception:" + e.toString()
+ ",cost time(ms):" + (System.currentTimeMillis() - startTime))
.printStackTrace();
}
}
}
return responseBody;
}
}

View File

@ -0,0 +1,40 @@
package com.taosdata.jdbc.rs;
import org.junit.Assert;
import org.junit.Test;
import java.sql.*;
public class RestfulDriverTest {
@Test
public void testCase001() {
try {
Class.forName("com.taosdata.jdbc.rs.RestfulDriver");
Connection connection = DriverManager.getConnection("jdbc:TAOS-RS://master:6041/?user=root&password=taosdata");
Statement statement = connection.createStatement();
ResultSet resultSet = statement.executeQuery("select * from log.log");
ResultSetMetaData metaData = resultSet.getMetaData();
while (resultSet.next()) {
for (int i = 1; i <= metaData.getColumnCount(); i++) {
String column = metaData.getColumnLabel(i);
String value = resultSet.getString(i);
System.out.print(column + ":" + value + "\t");
}
System.out.println();
}
statement.close();
connection.close();
} catch (SQLException | ClassNotFoundException e) {
e.printStackTrace();
}
}
@Test
public void testAcceptUrl() throws SQLException {
Driver driver = new RestfulDriver();
boolean isAccept = driver.acceptsURL("jdbc:TAOS-RS://master:6041");
Assert.assertTrue(isAccept);
}
}

View File

@ -148,10 +148,12 @@ static void *monitorThreadFunc(void *param) {
} }
if (tsMonitor.state == MON_STATE_NOT_INIT) { if (tsMonitor.state == MON_STATE_NOT_INIT) {
int code = 0;
for (; tsMonitor.cmdIndex < MON_CMD_MAX; ++tsMonitor.cmdIndex) { for (; tsMonitor.cmdIndex < MON_CMD_MAX; ++tsMonitor.cmdIndex) {
monitorBuildMonitorSql(tsMonitor.sql, tsMonitor.cmdIndex); monitorBuildMonitorSql(tsMonitor.sql, tsMonitor.cmdIndex);
void *res = taos_query(tsMonitor.conn, tsMonitor.sql); void *res = taos_query(tsMonitor.conn, tsMonitor.sql);
int code = taos_errno(res); code = taos_errno(res);
taos_free_result(res); taos_free_result(res);
if (code != 0) { if (code != 0) {
@ -162,7 +164,7 @@ static void *monitorThreadFunc(void *param) {
} }
} }
if (tsMonitor.start) { if (tsMonitor.start && code == 0) {
tsMonitor.state = MON_STATE_INITED; tsMonitor.state = MON_STATE_INITED;
} }
} }

View File

@ -332,7 +332,7 @@ static void doFillOneRowResult(SFillInfo* pFillInfo, tFilePage** data, char** sr
point1 = (SPoint){.key = *(TSKEY*)(prev), .val = prev + pCol->col.offset}; point1 = (SPoint){.key = *(TSKEY*)(prev), .val = prev + pCol->col.offset};
point2 = (SPoint){.key = ts, .val = srcData[i] + pFillInfo->index * bytes}; point2 = (SPoint){.key = ts, .val = srcData[i] + pFillInfo->index * bytes};
point = (SPoint){.key = pFillInfo->start, .val = val1}; point = (SPoint){.key = pFillInfo->currentKey, .val = val1};
taosGetLinearInterpolationVal(type, &point1, &point2, &point); taosGetLinearInterpolationVal(type, &point1, &point2, &point);
} }
} else { } else {

View File

@ -106,7 +106,7 @@ typedef struct {
int8_t nacks; int8_t nacks;
int8_t confirmed; int8_t confirmed;
int32_t code; int32_t code;
uint64_t time; int64_t time;
} SFwdInfo; } SFwdInfo;
typedef struct { typedef struct {

View File

@ -1204,14 +1204,17 @@ static void syncMonitorFwdInfos(void *param, void *tmrId) {
SSyncFwds *pSyncFwds = pNode->pSyncFwds; SSyncFwds *pSyncFwds = pNode->pSyncFwds;
if (pSyncFwds) {; if (pSyncFwds) {
uint64_t time = taosGetTimestampMs(); int64_t time = taosGetTimestampMs();
if (pSyncFwds->fwds > 0) { if (pSyncFwds->fwds > 0) {
pthread_mutex_lock(&(pNode->mutex)); pthread_mutex_lock(&(pNode->mutex));
for (int32_t i = 0; i < pSyncFwds->fwds; ++i) { for (int32_t i = 0; i < pSyncFwds->fwds; ++i) {
SFwdInfo *pFwdInfo = pSyncFwds->fwdInfo + (pSyncFwds->first + i) % tsMaxFwdInfo; SFwdInfo *pFwdInfo = pSyncFwds->fwdInfo + (pSyncFwds->first + i) % tsMaxFwdInfo;
if (time - pFwdInfo->time < 2000) break; if (ABS(time - pFwdInfo->time) < 2000) break;
sDebug("vgId:%d, forward info expired, hver:%" PRIu64 " curtime:%" PRIu64 " savetime:%" PRIu64, pNode->vgId,
pFwdInfo->version, time, pFwdInfo->time);
syncProcessFwdAck(pNode, pFwdInfo, TSDB_CODE_RPC_NETWORK_UNAVAIL); syncProcessFwdAck(pNode, pFwdInfo, TSDB_CODE_RPC_NETWORK_UNAVAIL);
} }

View File

@ -31,35 +31,33 @@ class TDTestCase:
print("==============step1") print("==============step1")
tdSql.execute( tdSql.execute(
"create table if not exists stb (ts timestamp, col1 int, col2 int, col3 int) tags(loc nchar(20), id int)") "create table if not exists stb (ts timestamp, col1 int, col2 int, col3 int) tags(loc nchar(20), id int)")
tdSql.execute(
"insert into tb0 using stb tags('beijing', 1) values(%s, 1, 1, 1)(%s, 2, 2, 2)(%s, 3, 3, 3)(%s, 4, 4, 4)" % (self.ts, self.ts + 1000000, self.ts + 2000000, self.ts + 3000000)) currTs = self.ts
tdSql.execute(
"insert into tb1 using stb tags('beijing', 2) values(%s, 1, 1, 1)(%s, 2, 2, 2)(%s, 3, 3, 3)(%s, 4, 4, 4)" % (self.ts + 4000000, self.ts + 5000000, self.ts + 6000000, self.ts + 7000000))
tdSql.execute(
"insert into tb2 using stb tags('shanghai', 1) values(%s, 1, 1, 1)(%s, 2, 2, 2)(%s, 3, 3, 3)(%s, 4, 4, 4)" % (self.ts + 8000000, self.ts + 9000000, self.ts + 10000000, self.ts + 11000000))
tdSql.execute(
"insert into tb3 using stb tags('shanghai', 2) values(%s, 1, 1, 1)(%s, 2, 2, 2)(%s, 3, 3, 3)(%s, 4, 4, 4)" % (self.ts + 12000000, self.ts + 13000000, self.ts + 14000000, self.ts + 15000000))
tdSql.execute(
"insert into tb4 using stb tags('shanghai', 3) values(%s, null, null, null)(%s, null, null, null)(%s, null, null, null)(%s, null, null, null)" % (self.ts + 16000000, self.ts + 17000000, self.ts + 18000000, self.ts + 19000000))
tdSql.query("select first(col1) - avg(col1) from stb where ts > '2018-09-17 08:00:00.000' and ts < '2018-09-17 14:16:41.000' interval(1h)")
tdSql.checkRows(5)
tdSql.checkData(0, 1, -1.5)
tdSql.checkData(1, 1, -1.5)
tdSql.checkData(2, 1, -1.0)
tdSql.checkData(3, 1, 1.5)
tdSql.checkData(4, 1, 0)
tdSql.query("select first(col1) - avg(col1) from stb where ts > '2018-09-17 08:00:00.000' and ts < '2018-09-17 14:16:41.000' interval(1h) fill(null)") for i in range(100):
tdSql.checkRows(7) sql = "create table tb%d using stb tags('city%d', 1)" % (i, i)
tdSql.execute(sql)
sql = "insert into tb%d values" % i
for j in range(5):
val = 1 + j
sql += "(%d, %d, %d, %d)" % (currTs, val, val, val)
currTs += 1000000
tdSql.execute(sql)
tdSql.query("select first(col1) - avg(col1) from stb where ts > '2018-09-17 08:00:00.000' and ts < '2018-09-23 04:36:40.000' interval(1h)")
tdSql.checkRows(139)
tdSql.query("select first(col1) - avg(col1) from stb where ts > '2018-09-17 08:00:00.000' and ts < '2018-09-23 04:36:40.000' interval(1h) fill(null)")
tdSql.checkRows(141)
tdSql.checkData(0, 1, None) tdSql.checkData(0, 1, None)
tdSql.checkData(6, 1, None) tdSql.checkData(140, 1, None)
tdSql.query("select max(col1) - min(col1) from stb where ts > '2018-09-17 08:00:00.000' and ts < '2018-09-17 14:16:41.000' and id = 1 group by loc, id") tdSql.query("select max(col1) - min(col1) from stb where ts > '2018-09-17 08:00:00.000' and ts < '2018-09-23 04:36:40.000' and id = 1 group by loc, id")
tdSql.checkRows(2) rows = tdSql.queryRows
tdSql.query("select spread(col1) from stb where ts > '2018-09-17 08:00:00.000' and ts < '2018-09-17 14:16:41.000' and id = 1 group by loc, id") tdSql.query("select spread(col1) from stb where ts > '2018-09-17 08:00:00.000' and ts < '2018-09-23 04:36:40.000' and id = 1 group by loc, id")
tdSql.checkRows(2) tdSql.checkRows(rows)
def stop(self): def stop(self):
tdSql.close() tdSql.close()

View File

@ -136,9 +136,8 @@ sql select join_mt0.ts, join_mt1.t1, join_mt0.t1, join_mt1.tbname, join_mt0.tbna
#1970-01-01 08:01:40.790 | 10 | 945.000000000 | 90 | true | true | 0 | #1970-01-01 08:01:40.790 | 10 | 945.000000000 | 90 | true | true | 0 |
sql_error select count(join_mt0.c1), sum(join_mt1.c2), first(join_mt0.c5), last(join_mt1.c7), first(join_mt1.c7) from join_mt0, join_mt1 where join_mt0.t1=join_mt1.t1 and join_mt0.ts=join_mt1.ts interval(10a) group by join_mt0.t1 order by join_mt0.ts desc limit 20 offset 19; sql_error select count(join_mt0.c1), sum(join_mt1.c2), first(join_mt0.c5), last(join_mt1.c7), first(join_mt1.c7) from join_mt0, join_mt1 where join_mt0.t1=join_mt1.t1 and join_mt0.ts=join_mt1.ts interval(10a) group by join_mt0.t1 order by join_mt0.ts desc limit 20 offset 19;
sql select count(join_mt0.c1), sum(join_mt0.c2)/count(*), avg(c2), first(join_mt0.c5), last(c7) from join_mt0 interval(10a) group by join_mt0.t1 order by join_mt0.ts desc; sql select count(join_mt0.c1), sum(join_mt0.c2)/count(*), avg(c2), first(join_mt0.c5), last(c7) from join_mt0 interval(10a) group by join_mt0.t1 order by join_mt0.ts desc;
if $rows != 100 then if $rows != 300 then
return -1 return -1
endi endi
@ -147,7 +146,7 @@ if $data00 != @70-01-01 08:01:40.990@ then
return -1 return -1
endi endi
if $data01 != 30 then if $data01 != 10 then
return -1 return -1
endi endi
@ -168,7 +167,7 @@ if $data05 != 1 then
return -1 return -1
endi endi
if $data06 != 2 then if $data06 != 0 then
return -1 return -1
endi endi
@ -177,7 +176,7 @@ if $data10 != @70-01-01 08:01:40.980@ then
return -1 return -1
endi endi
if $data11 != 30 then if $data11 != 10 then
return -1 return -1
endi endi
@ -198,7 +197,7 @@ if $data15 != 1 then
return -1 return -1
endi endi
if $data16 != 2 then if $data16 != 0 then
return -1 return -1
endi endi

View File

@ -277,6 +277,8 @@ cd ../../../debug; make
./test.sh -f unique/db/replica_part.sim ./test.sh -f unique/db/replica_part.sim
./test.sh -f unique/dnode/alternativeRole.sim ./test.sh -f unique/dnode/alternativeRole.sim
./test.sh -f unique/dnode/monitor.sim
./test.sh -f unique/dnode/monitor_bug.sim
./test.sh -f unique/dnode/simple.sim ./test.sh -f unique/dnode/simple.sim
./test.sh -f unique/dnode/balance1.sim ./test.sh -f unique/dnode/balance1.sim
./test.sh -f unique/dnode/balance2.sim ./test.sh -f unique/dnode/balance2.sim

View File

@ -0,0 +1,92 @@
system sh/stop_dnodes.sh
system sh/deploy.sh -n dnode1 -i 1
system sh/deploy.sh -n dnode2 -i 2
system sh/cfg.sh -n dnode1 -c role -v 1
system sh/cfg.sh -n dnode2 -c role -v 2
system sh/cfg.sh -n dnode1 -c wallevel -v 1
system sh/cfg.sh -n dnode2 -c wallevel -v 1
system sh/cfg.sh -n dnode1 -c numOfMnodes -v 3
system sh/cfg.sh -n dnode2 -c numOfMnodes -v 3
system sh/cfg.sh -n dnode1 -c minTablesPerVnode -v 10
system sh/cfg.sh -n dnode2 -c minTablesPerVnode -v 10
system sh/cfg.sh -n dnode1 -c maxTablesPerVnode -v 10
system sh/cfg.sh -n dnode2 -c maxTablesPerVnode -v 10
system sh/cfg.sh -n dnode1 -c monitor -v 1
system sh/cfg.sh -n dnode2 -c monitor -v 1
print ========== step1
system sh/exec.sh -n dnode1 -s start
sql connect
sleep 5000
sql show dnodes
print dnode1 openVnodes $data3_1
if $data2_1 != 0 then
return -1
endi
print ========== step2
sql create dnode $hostname2
system sh/exec.sh -n dnode2 -s start
sleep 10000
sql show dnodes
print dnode1 openVnodes $data2_1
print dnode2 openVnodes $data2_2
if $data2_1 != 0 then
return -1
endi
if $data2_2 != 1 then
return -1
endi
print ========== step3
sql show log.tables
print $data00
print $data10
print $data20
print $data30
print $data40
print $data50
if $rows != 5 then
return -1
endi
print ========== step4
sql select * from log.dn1
print $rows
$rows1 = $rows
sleep 3000
sql select * from log.dn1
print $rows
$rows2 = $rows
if $rows2 <= $rows1 then
return -1
endi
print ========== step5
sql select * from log.dn2
print $rows
$rows1 = $rows
sleep 3000
sql select * from log.dn2
print $rows
$rows2 = $rows
if $rows2 <= $rows1 then
return -1
endi
system sh/exec.sh -n dnode1 -s stop -x SIGINT
system sh/exec.sh -n dnode2 -s stop -x SIGINT

View File

@ -18,8 +18,8 @@ sql connect
sleep 5000 sleep 5000
sql show dnodes sql show dnodes
print dnode1 openVnodes $data3_1 print dnode1 openVnodes $data2_1
if $data3_1 != 3 then if $data2_1 != 1 then
return -1 return -1
endi endi
@ -31,22 +31,21 @@ $x = 0
show2: show2:
$x = $x + 1 $x = $x + 1
sleep 2000 sleep 2000
if $x == 30 then if $x == 10 then
return -1 return -1
endi endi
sql show dnodes sql show dnodes
print dnode1 openVnodes $data3_1 print dnode1 openVnodes $data2_1
print dnode2 openVnodes $data3_2 print dnode2 openVnodes $data2_2
if $data3_1 != 4 then if $data2_1 != 0 then
goto show2 goto show2
endi endi
if $data3_2 != 3 then if $data2_2 != 1 then
goto show2 goto show2
endi endi
print ========== step3 print ========== step3
sleep 3000
sql show log.tables sql show log.tables
print $data00 print $data00
@ -56,6 +55,23 @@ print $data30
print $data40 print $data40
print $data50 print $data50
if $rows != 5 then if $rows != 4 then
return -1 return -1
endi endi
print ========== step4
sql select * from log.dn1
print $rows
$rows1 = $rows
sleep 3000
sql select * from log.dn1
print $rows
$rows2 = $rows
if $rows2 <= $rows1 then
return -1
endi
system sh/exec.sh -n dnode1 -s stop -x SIGINT
system sh/exec.sh -n dnode2 -s stop -x SIGINT