Merge branch 'develop' into coverity_scan
This commit is contained in:
commit
03ee481d76
|
@ -178,7 +178,7 @@ matrix:
|
|||
|
||||
cd ${TRAVIS_BUILD_DIR}
|
||||
lcov -d . --capture --rc lcov_branch_coverage=1 -o coverage.info
|
||||
lcov --remove coverage.info '*tests*' '*deps*' -o coverage.info
|
||||
lcov --remove coverage.info '*/tests/*' '*/test/*' '*/deps/*' -o coverage.info
|
||||
lcov -l --rc lcov_branch_coverage=1 coverage.info || travis_terminate $?
|
||||
|
||||
gem install coveralls-lcov
|
||||
|
|
|
@ -191,14 +191,14 @@ typedef struct SDataBlockList { // todo remove
|
|||
} SDataBlockList;
|
||||
|
||||
typedef struct SQueryInfo {
|
||||
int16_t command; // the command may be different for each subclause, so keep it seperately.
|
||||
uint32_t type; // query/insert/import type
|
||||
char slidingTimeUnit;
|
||||
|
||||
STimeWindow window;
|
||||
int64_t intervalTime; // aggregation time interval
|
||||
int64_t slidingTime; // sliding window in mseconds
|
||||
SSqlGroupbyExpr groupbyExpr; // group by tags info
|
||||
int16_t command; // the command may be different for each subclause, so keep it seperately.
|
||||
uint32_t type; // query/insert/import type
|
||||
char slidingTimeUnit;
|
||||
|
||||
STimeWindow window;
|
||||
int64_t intervalTime; // aggregation time interval
|
||||
int64_t slidingTime; // sliding window in mseconds
|
||||
SSqlGroupbyExpr groupbyExpr; // group by tags info
|
||||
|
||||
SArray * colList; // SArray<SColumn*>
|
||||
SFieldInfo fieldsInfo;
|
||||
|
@ -207,11 +207,11 @@ typedef struct SQueryInfo {
|
|||
SLimitVal slimit;
|
||||
STagCond tagCond;
|
||||
SOrderVal order;
|
||||
int16_t fillType; // interpolate type
|
||||
int16_t fillType; // final result fill type
|
||||
int16_t numOfTables;
|
||||
STableMetaInfo **pTableMetaInfo;
|
||||
struct STSBuf * tsBuf;
|
||||
int64_t * fillVal; // default value for interpolation
|
||||
int64_t * fillVal; // default value for fill
|
||||
char * msg; // pointer to the pCmd->payload to keep error message temporarily
|
||||
int64_t clauseLimit; // limit for current sub clause
|
||||
|
||||
|
@ -222,15 +222,15 @@ typedef struct SQueryInfo {
|
|||
typedef struct {
|
||||
int command;
|
||||
uint8_t msgType;
|
||||
|
||||
bool autoCreated; // if the table is missing, on-the-fly create it. during getmeterMeta
|
||||
int8_t dataSourceType; // load data from file or not
|
||||
bool autoCreated; // if the table is missing, on-the-fly create it. during getmeterMeta
|
||||
int8_t dataSourceType; // load data from file or not
|
||||
|
||||
union {
|
||||
int32_t count;
|
||||
int32_t numOfTablesInSubmit;
|
||||
};
|
||||
|
||||
int32_t insertType;
|
||||
int32_t clauseIndex; // index of multiple subclause query
|
||||
int8_t parseFinished;
|
||||
short numOfCols;
|
||||
|
@ -239,14 +239,12 @@ typedef struct {
|
|||
int32_t payloadLen;
|
||||
SQueryInfo **pQueryInfo;
|
||||
int32_t numOfClause;
|
||||
char * curSql; // current sql, resume position of sql after parsing paused
|
||||
void * pTableList; // referred table involved in sql
|
||||
int32_t batchSize; // for parameter ('?') binding and batch processing
|
||||
int32_t numOfParams;
|
||||
|
||||
SDataBlockList *pDataBlocks; // submit data blocks after parsing sql
|
||||
char * curSql; // current sql, resume position of sql after parsing paused
|
||||
void * pTableList; // referred table involved in sql
|
||||
|
||||
// for parameter ('?') binding and batch processing
|
||||
int32_t batchSize;
|
||||
int32_t numOfParams;
|
||||
} SSqlCmd;
|
||||
|
||||
typedef struct SResRec {
|
||||
|
@ -316,7 +314,6 @@ typedef struct SSqlObj {
|
|||
SRpcIpSet ipList;
|
||||
char freed : 4;
|
||||
char listed : 4;
|
||||
uint32_t insertType;
|
||||
tsem_t rspSem;
|
||||
SSqlCmd cmd;
|
||||
SSqlRes res;
|
||||
|
@ -361,7 +358,7 @@ int tsParseSql(SSqlObj *pSql, bool multiVnodeInsertion);
|
|||
void tscProcessMsgFromServer(SRpcMsg *rpcMsg, SRpcIpSet *pIpSet);
|
||||
int tscProcessSql(SSqlObj *pSql);
|
||||
|
||||
int tscRenewMeterMeta(SSqlObj *pSql, char *tableId);
|
||||
int tscRenewTableMeta(SSqlObj *pSql, char *tableId);
|
||||
void tscQueueAsyncRes(SSqlObj *pSql);
|
||||
|
||||
void tscQueueAsyncError(void(*fp), void *param, int32_t code);
|
||||
|
|
|
@ -442,15 +442,17 @@ void tscTableMetaCallBack(void *param, TAOS_RES *res, int code) {
|
|||
}
|
||||
|
||||
if (pSql->pStream == NULL) {
|
||||
// check if it is a sub-query of super table query first, if true, enter another routine
|
||||
SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
|
||||
|
||||
if ((pQueryInfo->type & TSDB_QUERY_TYPE_STABLE_SUBQUERY) == TSDB_QUERY_TYPE_STABLE_SUBQUERY) {
|
||||
|
||||
// check if it is a sub-query of super table query first, if true, enter another routine
|
||||
if (TSDB_QUERY_HAS_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_STABLE_SUBQUERY)) {
|
||||
tscTrace("%p update table meta in local cache, continue to process sql and send corresponding subquery", pSql);
|
||||
|
||||
STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
|
||||
if (pTableMetaInfo->pTableMeta == NULL){
|
||||
code = tscGetTableMeta(pSql, pTableMetaInfo);
|
||||
assert(code == TSDB_CODE_SUCCESS);
|
||||
}
|
||||
}
|
||||
|
||||
assert((tscGetNumOfTags(pTableMetaInfo->pTableMeta) != 0) && pTableMetaInfo->vgroupIndex >= 0 && pSql->param != NULL);
|
||||
|
||||
|
@ -460,32 +462,37 @@ void tscTableMetaCallBack(void *param, TAOS_RES *res, int code) {
|
|||
assert(pParObj->signature == pParObj && trs->subqueryIndex == pTableMetaInfo->vgroupIndex &&
|
||||
tscGetNumOfTags(pTableMetaInfo->pTableMeta) != 0);
|
||||
|
||||
tscTrace("%p get metricMeta during super table query successfully", pSql);
|
||||
|
||||
code = tscGetSTableVgroupInfo(pSql, 0);
|
||||
pRes->code = code;
|
||||
|
||||
if (code == TSDB_CODE_TSC_ACTION_IN_PROGRESS) return;
|
||||
} else { // normal async query continues
|
||||
// NOTE: the vgroupInfo for the queried super table must be existed here.
|
||||
assert(pTableMetaInfo->vgroupList != NULL);
|
||||
if ((code = tscProcessSql(pSql)) == TSDB_CODE_SUCCESS) {
|
||||
return;
|
||||
}
|
||||
} else { // continue to process normal async query
|
||||
if (pCmd->parseFinished) {
|
||||
tscTrace("%p re-send data to vnode in table Meta callback since sql parsed completed", pSql);
|
||||
|
||||
tscTrace("%p update table meta in local cache, continue to process sql and send corresponding query", pSql);
|
||||
|
||||
STableMetaInfo* pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);
|
||||
code = tscGetTableMeta(pSql, pTableMetaInfo);
|
||||
assert(code == TSDB_CODE_SUCCESS);
|
||||
|
||||
if (pTableMetaInfo->pTableMeta) {
|
||||
// todo update the submit message according to the new table meta
|
||||
// 1. table uid, 2. ip address
|
||||
code = tscSendMsgToServer(pSql);
|
||||
if (code == TSDB_CODE_SUCCESS) return;
|
||||
|
||||
// if failed to process sql, go to error handler
|
||||
if ((code = tscProcessSql(pSql)) == TSDB_CODE_SUCCESS) {
|
||||
return;
|
||||
}
|
||||
// // todo update the submit message according to the new table meta
|
||||
// // 1. table uid, 2. ip address
|
||||
// code = tscSendMsgToServer(pSql);
|
||||
// if (code == TSDB_CODE_SUCCESS) return;
|
||||
// }
|
||||
} else {
|
||||
tscTrace("%p continue parse sql after get table meta", pSql);
|
||||
|
||||
code = tsParseSql(pSql, false);
|
||||
if ((pQueryInfo->type & TSDB_QUERY_TYPE_STMT_INSERT) == TSDB_QUERY_TYPE_STMT_INSERT) {
|
||||
if (TSDB_QUERY_HAS_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_STMT_INSERT)) {
|
||||
STableMetaInfo* pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);
|
||||
code = tscGetTableMeta(pSql, pTableMetaInfo);
|
||||
assert(code == TSDB_CODE_SUCCESS && pTableMetaInfo->pTableMeta != NULL);
|
||||
|
||||
(*pSql->fp)(pSql->param, pSql, code);
|
||||
return;
|
||||
}
|
||||
|
|
|
@ -1293,7 +1293,7 @@ static void max_function_f(SQLFunctionCtx *pCtx, int32_t index) {
|
|||
minMax_function_f(pCtx, index, 0);
|
||||
|
||||
SResultInfo *pResInfo = GET_RES_INFO(pCtx);
|
||||
if (pResInfo->hasResult == DATA_SET_FLAG) {
|
||||
if (pResInfo->hasResult == DATA_SET_FLAG && pResInfo->superTableQ) {
|
||||
char *flag = pCtx->aOutputBuf + pCtx->inputBytes;
|
||||
*flag = DATA_SET_FLAG;
|
||||
}
|
||||
|
@ -1309,7 +1309,7 @@ static void min_function_f(SQLFunctionCtx *pCtx, int32_t index) {
|
|||
minMax_function_f(pCtx, index, 1);
|
||||
|
||||
SResultInfo *pResInfo = GET_RES_INFO(pCtx);
|
||||
if (pResInfo->hasResult == DATA_SET_FLAG) {
|
||||
if (pResInfo->hasResult == DATA_SET_FLAG && pResInfo->superTableQ) {
|
||||
char *flag = pCtx->aOutputBuf + pCtx->inputBytes;
|
||||
*flag = DATA_SET_FLAG;
|
||||
}
|
||||
|
|
|
@ -1314,7 +1314,7 @@ int tsParseInsertSql(SSqlObj *pSql) {
|
|||
tscGetQueryInfoDetailSafely(pCmd, pCmd->clauseIndex, &pQueryInfo);
|
||||
|
||||
TSDB_QUERY_SET_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_INSERT);
|
||||
TSDB_QUERY_SET_TYPE(pQueryInfo->type, pSql->insertType);
|
||||
TSDB_QUERY_SET_TYPE(pQueryInfo->type, pCmd->insertType);
|
||||
|
||||
sToken = tStrGetToken(pSql->sqlstr, &index, false, 0, NULL);
|
||||
if (sToken.type != TK_INTO) {
|
||||
|
@ -1342,7 +1342,7 @@ int tsParseSql(SSqlObj *pSql, bool initialParse) {
|
|||
* Set the fp before parse the sql string, in case of getTableMeta failed, in which
|
||||
* the error handle callback function can rightfully restore the user-defined callback function (fp).
|
||||
*/
|
||||
if (initialParse && (pSql->insertType != TSDB_QUERY_TYPE_STMT_INSERT)) {
|
||||
if (initialParse && (pSql->cmd.insertType != TSDB_QUERY_TYPE_STMT_INSERT)) {
|
||||
pSql->fetchFp = pSql->fp;
|
||||
pSql->fp = (void(*)())tscHandleMultivnodeInsert;
|
||||
}
|
||||
|
@ -1354,9 +1354,7 @@ int tsParseSql(SSqlObj *pSql, bool initialParse) {
|
|||
return ret;
|
||||
}
|
||||
|
||||
SSqlInfo SQLInfo = {0};
|
||||
tSQLParse(&SQLInfo, pSql->sqlstr);
|
||||
|
||||
SSqlInfo SQLInfo = qSQLParse(pSql->sqlstr);
|
||||
ret = tscToSQLCmd(pSql, &SQLInfo);
|
||||
SQLInfoDestroy(&SQLInfo);
|
||||
}
|
||||
|
|
|
@ -451,7 +451,7 @@ static int insertStmtExecute(STscStmt* stmt) {
|
|||
|
||||
pRes->qhandle = 0;
|
||||
|
||||
pSql->insertType = 0;
|
||||
pSql->cmd.insertType = 0;
|
||||
pSql->fetchFp = waitForQueryRsp;
|
||||
pSql->fp = (void(*)())tscHandleMultivnodeInsert;
|
||||
|
||||
|
@ -515,7 +515,7 @@ int taos_stmt_prepare(TAOS_STMT* stmt, const char* sql, unsigned long length) {
|
|||
SSqlRes *pRes = &pSql->res;
|
||||
pSql->param = (void*) pSql;
|
||||
pSql->fp = waitForQueryRsp;
|
||||
pSql->insertType = TSDB_QUERY_TYPE_STMT_INSERT;
|
||||
pSql->cmd.insertType = TSDB_QUERY_TYPE_STMT_INSERT;
|
||||
|
||||
if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, TSDB_DEFAULT_PAYLOAD_SIZE)) {
|
||||
tscError("%p failed to malloc payload buffer", pSql);
|
||||
|
|
|
@ -515,7 +515,7 @@ int32_t tscToSQLCmd(SSqlObj* pSql, struct SSqlInfo* pInfo) {
|
|||
if (ret != 0) {
|
||||
return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg1);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pCmd->parseFinished = 1;
|
||||
return TSDB_CODE_SUCCESS; // do not build query message here
|
||||
|
@ -543,6 +543,7 @@ int32_t tscToSQLCmd(SSqlObj* pSql, struct SSqlInfo* pInfo) {
|
|||
return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), "not support sql expression");
|
||||
}
|
||||
|
||||
pSql->cmd.parseFinished = true;
|
||||
return tscBuildMsg[pCmd->command](pSql, pInfo);
|
||||
}
|
||||
|
||||
|
|
|
@ -1185,7 +1185,9 @@ bool needToMerge(SQueryInfo *pQueryInfo, SLocalReducer *pLocalReducer, tFilePage
|
|||
int32_t ret = 0; // merge all result by default
|
||||
|
||||
int16_t functionId = pLocalReducer->pCtx[0].functionId;
|
||||
if (functionId == TSDB_FUNC_PRJ || functionId == TSDB_FUNC_ARITHM) { // column projection query
|
||||
|
||||
// todo opt performance
|
||||
if ((/*functionId == TSDB_FUNC_PRJ || */functionId == TSDB_FUNC_ARITHM) || (tscIsProjectionQueryOnSTable(pQueryInfo, 0))) { // column projection query
|
||||
ret = 1; // disable merge procedure
|
||||
} else {
|
||||
tOrderDescriptor *pDesc = pLocalReducer->pDesc;
|
||||
|
|
|
@ -239,16 +239,6 @@ void tscProcessMsgFromServer(SRpcMsg *rpcMsg, SRpcIpSet *pIpSet) {
|
|||
STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);
|
||||
if (rpcMsg->code == TSDB_CODE_TDB_INVALID_TABLE_ID || rpcMsg->code == TSDB_CODE_VND_INVALID_VGROUP_ID ||
|
||||
rpcMsg->code == TSDB_CODE_RPC_NETWORK_UNAVAIL) {
|
||||
/*
|
||||
* not_active_table: 1. the virtual node may fail to create table, since the procedure of create table is asynchronized,
|
||||
* the virtual node may have not create table till now, so try again by using the new metermeta.
|
||||
* 2. this requested table may have been removed by other client, so we need to renew the
|
||||
* metermeta here.
|
||||
*
|
||||
* not_active_vnode: current vnode is move to other node due to node balance procedure or virtual node have been
|
||||
* removed. So, renew metermeta and try again.
|
||||
* not_active_session: db has been move to other node, the vnode does not exist on this dnode anymore.
|
||||
*/
|
||||
if (pCmd->command == TSDB_SQL_CONNECT) {
|
||||
rpcMsg->code = TSDB_CODE_RPC_NETWORK_UNAVAIL;
|
||||
rpcFreeCont(rpcMsg->pCont);
|
||||
|
@ -258,8 +248,7 @@ void tscProcessMsgFromServer(SRpcMsg *rpcMsg, SRpcIpSet *pIpSet) {
|
|||
rpcFreeCont(rpcMsg->pCont);
|
||||
return;
|
||||
} else if (pCmd->command == TSDB_SQL_META) {
|
||||
// rpcFreeCont(rpcMsg->pCont);
|
||||
// return;
|
||||
// get table meta query will not retry, do nothing
|
||||
} else {
|
||||
tscWarn("%p it shall renew table meta, code:%s, retry:%d", pSql, tstrerror(rpcMsg->code), ++pSql->retry);
|
||||
|
||||
|
@ -267,13 +256,14 @@ void tscProcessMsgFromServer(SRpcMsg *rpcMsg, SRpcIpSet *pIpSet) {
|
|||
if (pSql->retry > pSql->maxRetry) {
|
||||
tscError("%p max retry %d reached, give up", pSql, pSql->maxRetry);
|
||||
} else {
|
||||
rpcMsg->code = tscRenewMeterMeta(pSql, pTableMetaInfo->name);
|
||||
if (pTableMetaInfo->pTableMeta) {
|
||||
tscSendMsgToServer(pSql);
|
||||
rpcMsg->code = tscRenewTableMeta(pSql, pTableMetaInfo->name);
|
||||
|
||||
// if there is an error occurring, proceed to the following error handling procedure.
|
||||
// todo add test cases
|
||||
if (rpcMsg->code == TSDB_CODE_TSC_ACTION_IN_PROGRESS) {
|
||||
rpcFreeCont(rpcMsg->pCont);
|
||||
return;
|
||||
}
|
||||
|
||||
rpcFreeCont(rpcMsg->pCont);
|
||||
return;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -330,9 +320,9 @@ void tscProcessMsgFromServer(SRpcMsg *rpcMsg, SRpcIpSet *pIpSet) {
|
|||
}
|
||||
}
|
||||
|
||||
if (pRes->code == TSDB_CODE_SUCCESS && tscProcessMsgRsp[pCmd->command])
|
||||
if (pRes->code == TSDB_CODE_SUCCESS && tscProcessMsgRsp[pCmd->command]) {
|
||||
rpcMsg->code = (*tscProcessMsgRsp[pCmd->command])(pSql);
|
||||
|
||||
}
|
||||
|
||||
if (rpcMsg->code != TSDB_CODE_TSC_ACTION_IN_PROGRESS) {
|
||||
rpcMsg->code = (pRes->code == TSDB_CODE_SUCCESS) ? pRes->numOfRows: pRes->code;
|
||||
|
@ -431,7 +421,7 @@ void tscKillSTableQuery(SSqlObj *pSql) {
|
|||
|
||||
/*
|
||||
* here, we cannot set the command = TSDB_SQL_KILL_QUERY. Otherwise, it may cause
|
||||
* sub-queries not correctly released and master sql object of metric query reaches an abnormal state.
|
||||
* sub-queries not correctly released and master sql object of super table query reaches an abnormal state.
|
||||
*/
|
||||
pSql->pSubs[i]->res.code = TSDB_CODE_TSC_QUERY_CANCELLED;
|
||||
//taosStopRpcConn(pSql->pSubs[i]->thandle);
|
||||
|
@ -565,7 +555,7 @@ static char *doSerializeTableInfo(SQueryTableMsg* pQueryMsg, SSqlObj *pSql, char
|
|||
|
||||
pQueryMsg->numOfTables = htonl(1); // set the number of tables
|
||||
pMsg += sizeof(STableIdInfo);
|
||||
} else {
|
||||
} else { // it is a subquery of the super table query, this IP info is acquired from vgroupInfo
|
||||
int32_t index = pTableMetaInfo->vgroupIndex;
|
||||
int32_t numOfVgroups = taosArrayGetSize(pTableMetaInfo->pVgroupTables);
|
||||
assert(index >= 0 && index < numOfVgroups);
|
||||
|
@ -1822,7 +1812,7 @@ int tscProcessTableMetaRsp(SSqlObj *pSql) {
|
|||
return TSDB_CODE_TSC_OUT_OF_MEMORY;
|
||||
}
|
||||
|
||||
tscTrace("%p recv table meta: %"PRId64 ", tid:%d, name:%s", pSql, pTableMeta->uid, pTableMeta->sid, pTableMetaInfo->name);
|
||||
tscTrace("%p recv table meta, uid:%"PRId64 ", tid:%d, name:%s", pSql, pTableMeta->uid, pTableMeta->sid, pTableMetaInfo->name);
|
||||
free(pTableMeta);
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
|
@ -2358,7 +2348,7 @@ static int32_t getTableMetaFromMgmt(SSqlObj *pSql, STableMetaInfo *pTableMetaInf
|
|||
|
||||
int32_t code = tscProcessSql(pNew);
|
||||
if (code == TSDB_CODE_SUCCESS) {
|
||||
code = TSDB_CODE_TSC_ACTION_IN_PROGRESS;
|
||||
code = TSDB_CODE_TSC_ACTION_IN_PROGRESS; // notify upper application that current process need to be terminated
|
||||
}
|
||||
|
||||
return code;
|
||||
|
@ -2389,56 +2379,26 @@ int tscGetMeterMetaEx(SSqlObj *pSql, STableMetaInfo *pTableMetaInfo, bool create
|
|||
return tscGetTableMeta(pSql, pTableMetaInfo);
|
||||
}
|
||||
|
||||
/*
|
||||
* in handling the renew metermeta problem during insertion,
|
||||
*
|
||||
* If the meter is created on demand during insertion, the routine usually waits for a short
|
||||
* period to re-issue the getMeterMeta msg, in which makes a greater change that vnode has
|
||||
* successfully created the corresponding table.
|
||||
*/
|
||||
static void tscWaitingForCreateTable(SSqlCmd *pCmd) {
|
||||
if (pCmd->command == TSDB_SQL_INSERT) {
|
||||
taosMsleep(50); // todo: global config
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* in renew metermeta, do not retrieve metadata in cache.
|
||||
* retrieve table meta from mnode, and update the local table meta cache.
|
||||
* @param pSql sql object
|
||||
* @param tableId meter id
|
||||
* @param tableId table full name
|
||||
* @return status code
|
||||
*/
|
||||
int tscRenewMeterMeta(SSqlObj *pSql, char *tableId) {
|
||||
int code = 0;
|
||||
|
||||
// handle table meta renew process
|
||||
int tscRenewTableMeta(SSqlObj *pSql, char *tableId) {
|
||||
SSqlCmd *pCmd = &pSql->cmd;
|
||||
|
||||
SQueryInfo * pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);
|
||||
STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
|
||||
|
||||
/*
|
||||
* 1. only update the metermeta in force model metricmeta is not updated
|
||||
* 2. if get metermeta failed, still get the metermeta
|
||||
*/
|
||||
if (pTableMetaInfo->pTableMeta == NULL || !tscQueryOnSTable(pCmd)) {
|
||||
STableMeta* pTableMeta = pTableMetaInfo->pTableMeta;
|
||||
if (pTableMetaInfo->pTableMeta) {
|
||||
tscTrace("%p update table meta, old: numOfTags:%d, numOfCols:%d, uid:%" PRId64 ", addr:%p", pSql,
|
||||
tscGetNumOfTags(pTableMeta), tscGetNumOfColumns(pTableMeta), pTableMeta->uid, pTableMeta);
|
||||
}
|
||||
|
||||
tscWaitingForCreateTable(pCmd);
|
||||
taosCacheRelease(tscCacheHandle, (void **)&(pTableMetaInfo->pTableMeta), true);
|
||||
|
||||
code = getTableMetaFromMgmt(pSql, pTableMetaInfo); // todo ??
|
||||
} else {
|
||||
tscTrace("%p metric query not update metric meta, numOfTags:%d, numOfCols:%d, uid:%" PRId64 ", addr:%p", pSql,
|
||||
tscGetNumOfTags(pTableMetaInfo->pTableMeta), pCmd->numOfCols, pTableMetaInfo->pTableMeta->uid,
|
||||
pTableMetaInfo->pTableMeta);
|
||||
STableMeta* pTableMeta = pTableMetaInfo->pTableMeta;
|
||||
if (pTableMetaInfo->pTableMeta) {
|
||||
tscTrace("%p update table meta, old meta numOfTags:%d, numOfCols:%d, uid:%" PRId64 ", addr:%p", pSql,
|
||||
tscGetNumOfTags(pTableMeta), tscGetNumOfColumns(pTableMeta), pTableMeta->uid, pTableMeta);
|
||||
}
|
||||
|
||||
return code;
|
||||
taosCacheRelease(tscCacheHandle, (void **)&(pTableMetaInfo->pTableMeta), true);
|
||||
return getTableMetaFromMgmt(pSql, pTableMetaInfo);
|
||||
}
|
||||
|
||||
static bool allVgroupInfoRetrieved(SSqlCmd* pCmd, int32_t clauseIndex) {
|
||||
|
|
|
@ -322,7 +322,7 @@ enum {
|
|||
#define NORMAL_ARITHMETIC 1
|
||||
#define AGG_ARIGHTMEIC 2
|
||||
|
||||
int32_t tSQLParse(SSqlInfo *pSQLInfo, const char *pSql);
|
||||
SSqlInfo qSQLParse(const char *str);
|
||||
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
|
|
|
@ -373,7 +373,6 @@ static SWindowResult *doSetTimeWindowFromKey(SQueryRuntimeEnv *pRuntimeEnv, SWin
|
|||
SPosInfo pos = {-1, -1};
|
||||
createQueryResultInfo(pQuery, &pWindowResInfo->pResult[i], pRuntimeEnv->stableQuery, &pos);
|
||||
}
|
||||
|
||||
pWindowResInfo->capacity = newCap;
|
||||
}
|
||||
|
||||
|
@ -1566,11 +1565,6 @@ static bool isFirstLastRowQuery(SQuery *pQuery) {
|
|||
return false;
|
||||
}
|
||||
|
||||
static UNUSED_FUNC bool notHasQueryTimeRange(SQuery *pQuery) {
|
||||
return (pQuery->window.skey == 0 && pQuery->window.ekey == INT64_MAX && QUERY_IS_ASC_QUERY(pQuery)) ||
|
||||
(pQuery->window.skey == INT64_MAX && pQuery->window.ekey == 0 && (!QUERY_IS_ASC_QUERY(pQuery)));
|
||||
}
|
||||
|
||||
static bool needReverseScan(SQuery *pQuery) {
|
||||
for (int32_t i = 0; i < pQuery->numOfOutput; ++i) {
|
||||
int32_t functionId = pQuery->pSelectExpr[i].base.functionId;
|
||||
|
@ -1768,61 +1762,6 @@ static void changeExecuteScanOrder(SQuery *pQuery, bool stableQuery) {
|
|||
}
|
||||
}
|
||||
|
||||
static UNUSED_FUNC void doSetInterpVal(SQLFunctionCtx *pCtx, TSKEY ts, int16_t type, int32_t index, char *data) {
|
||||
assert(pCtx->param[index].pz == NULL);
|
||||
|
||||
int32_t len = 0;
|
||||
size_t t = 0;
|
||||
|
||||
if (type == TSDB_DATA_TYPE_BINARY) {
|
||||
t = strlen(data);
|
||||
|
||||
len = t + 1 + TSDB_KEYSIZE;
|
||||
pCtx->param[index].pz = calloc(1, len);
|
||||
} else if (type == TSDB_DATA_TYPE_NCHAR) {
|
||||
t = wcslen((const wchar_t *)data);
|
||||
|
||||
len = (t + 1) * TSDB_NCHAR_SIZE + TSDB_KEYSIZE;
|
||||
pCtx->param[index].pz = calloc(1, len);
|
||||
} else {
|
||||
len = TSDB_KEYSIZE * 2;
|
||||
pCtx->param[index].pz = malloc(len);
|
||||
}
|
||||
|
||||
pCtx->param[index].nType = TSDB_DATA_TYPE_BINARY;
|
||||
|
||||
char *z = pCtx->param[index].pz;
|
||||
*(TSKEY *)z = ts;
|
||||
z += TSDB_KEYSIZE;
|
||||
|
||||
switch (type) {
|
||||
case TSDB_DATA_TYPE_FLOAT:
|
||||
*(double *)z = GET_FLOAT_VAL(data);
|
||||
break;
|
||||
case TSDB_DATA_TYPE_DOUBLE:
|
||||
*(double *)z = GET_DOUBLE_VAL(data);
|
||||
break;
|
||||
case TSDB_DATA_TYPE_INT:
|
||||
case TSDB_DATA_TYPE_BOOL:
|
||||
case TSDB_DATA_TYPE_BIGINT:
|
||||
case TSDB_DATA_TYPE_TINYINT:
|
||||
case TSDB_DATA_TYPE_SMALLINT:
|
||||
case TSDB_DATA_TYPE_TIMESTAMP:
|
||||
*(int64_t *)z = GET_INT64_VAL(data);
|
||||
break;
|
||||
case TSDB_DATA_TYPE_BINARY:
|
||||
strncpy(z, data, t);
|
||||
break;
|
||||
case TSDB_DATA_TYPE_NCHAR: {
|
||||
wcsncpy((wchar_t *)z, (const wchar_t *)data, t);
|
||||
} break;
|
||||
default:
|
||||
assert(0);
|
||||
}
|
||||
|
||||
pCtx->param[index].nLen = len;
|
||||
}
|
||||
|
||||
static int32_t getInitialPageNum(SQInfo *pQInfo) {
|
||||
SQuery *pQuery = pQInfo->runtimeEnv.pQuery;
|
||||
int32_t INITIAL_RESULT_ROWS_VALUE = 16;
|
||||
|
@ -4071,45 +4010,19 @@ int32_t doInitQInfo(SQInfo *pQInfo, void *param, void *tsdb, int32_t vgId, bool
|
|||
initWindowResInfo(&pRuntimeEnv->windowResInfo, pRuntimeEnv, rows, 4096, type);
|
||||
}
|
||||
|
||||
setQueryStatus(pQuery, QUERY_NOT_COMPLETED);
|
||||
|
||||
/*
|
||||
* in case of last_row query without query range, we set the query timestamp to be
|
||||
* STable->lastKey. Otherwise, keep the initial query time range unchanged.
|
||||
*/
|
||||
// if (isFirstLastRowQuery(pQuery)) {
|
||||
// if (!normalizeUnBoundLastRowQuery(pQInfo, &interpInfo)) {
|
||||
// sem_post(&pQInfo->dataReady);
|
||||
// pointInterpSupporterDestroy(&interpInfo);
|
||||
// return TSDB_CODE_SUCCESS;
|
||||
// }
|
||||
// }
|
||||
|
||||
if (pQuery->fillType != TSDB_FILL_NONE && !isPointInterpoQuery(pQuery)) {
|
||||
SFillColInfo* pColInfo = taosCreateFillColInfo(pQuery);
|
||||
pRuntimeEnv->pFillInfo = taosInitFillInfo(pQuery->order.order, 0, 0, pQuery->rec.capacity, pQuery->numOfOutput,
|
||||
pQuery->slidingTime, pQuery->fillType, pColInfo);
|
||||
}
|
||||
|
||||
|
||||
// todo refactor
|
||||
pRuntimeEnv->topBotQuery = isTopBottomQuery(pQuery);
|
||||
setQueryStatus(pQuery, QUERY_NOT_COMPLETED);
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static UNUSED_FUNC bool isGroupbyEachTable(SSqlGroupbyExpr *pGroupbyExpr, STableGroupInfo *pSidset) {
|
||||
if (pGroupbyExpr == NULL || pGroupbyExpr->numOfGroupCols == 0) {
|
||||
return false;
|
||||
}
|
||||
|
||||
for (int32_t i = 0; i < pGroupbyExpr->numOfGroupCols; ++i) {
|
||||
SColIndex* pColIndex = taosArrayGet(pGroupbyExpr->columnInfo, i);
|
||||
if (pColIndex->flag == TSDB_COL_TAG) {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
|
||||
return false;
|
||||
}
|
||||
|
||||
static void enableExecutionForNextTable(SQueryRuntimeEnv *pRuntimeEnv) {
|
||||
SQuery *pQuery = pRuntimeEnv->pQuery;
|
||||
|
||||
|
@ -5907,10 +5820,11 @@ int32_t qCreateQueryInfo(void *tsdb, int32_t vgId, SQueryTableMsg *pQueryMsg, qi
|
|||
_over:
|
||||
tfree(tagCond);
|
||||
tfree(tbnameCond);
|
||||
tfree(pGroupColIndex);
|
||||
taosArrayDestroy(pTableIdList);
|
||||
|
||||
//pQInfo already freed in initQInfo, but *pQInfo may not pointer to null;
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
//pQInfo already freed in initQInfo, but *pQInfo may not pointer to null;
|
||||
*pQInfo = NULL;
|
||||
}
|
||||
|
||||
|
|
|
@ -26,16 +26,18 @@
|
|||
#include "tstrbuild.h"
|
||||
#include "queryLog.h"
|
||||
|
||||
int32_t tSQLParse(SSqlInfo *pSQLInfo, const char *pStr) {
|
||||
SSqlInfo qSQLParse(const char *pStr) {
|
||||
void *pParser = ParseAlloc(malloc);
|
||||
pSQLInfo->valid = true;
|
||||
|
||||
SSqlInfo sqlInfo = {0};
|
||||
sqlInfo.valid = true;
|
||||
|
||||
int32_t i = 0;
|
||||
while (1) {
|
||||
SSQLToken t0 = {0};
|
||||
|
||||
if (pStr[i] == 0) {
|
||||
Parse(pParser, 0, t0, pSQLInfo);
|
||||
Parse(pParser, 0, t0, &sqlInfo);
|
||||
goto abort_parse;
|
||||
}
|
||||
|
||||
|
@ -49,19 +51,19 @@ int32_t tSQLParse(SSqlInfo *pSQLInfo, const char *pStr) {
|
|||
break;
|
||||
}
|
||||
case TK_SEMI: {
|
||||
Parse(pParser, 0, t0, pSQLInfo);
|
||||
Parse(pParser, 0, t0, &sqlInfo);
|
||||
goto abort_parse;
|
||||
}
|
||||
|
||||
case TK_QUESTION:
|
||||
case TK_ILLEGAL: {
|
||||
snprintf(pSQLInfo->pzErrMsg, tListLen(pSQLInfo->pzErrMsg), "unrecognized token: \"%s\"", t0.z);
|
||||
pSQLInfo->valid = false;
|
||||
snprintf(sqlInfo.pzErrMsg, tListLen(sqlInfo.pzErrMsg), "unrecognized token: \"%s\"", t0.z);
|
||||
sqlInfo.valid = false;
|
||||
goto abort_parse;
|
||||
}
|
||||
default:
|
||||
Parse(pParser, t0.type, t0, pSQLInfo);
|
||||
if (pSQLInfo->valid == false) {
|
||||
Parse(pParser, t0.type, t0, &sqlInfo);
|
||||
if (sqlInfo.valid == false) {
|
||||
goto abort_parse;
|
||||
}
|
||||
}
|
||||
|
@ -69,7 +71,7 @@ int32_t tSQLParse(SSqlInfo *pSQLInfo, const char *pStr) {
|
|||
|
||||
abort_parse:
|
||||
ParseFree(pParser, free);
|
||||
return 0;
|
||||
return sqlInfo;
|
||||
}
|
||||
|
||||
tSQLExprList *tSQLExprListAppend(tSQLExprList *pList, tSQLExpr *pNode, SSQLToken *pToken) {
|
||||
|
|
|
@ -74,14 +74,16 @@ void tsdbEncodeTable(STable *pTable, char *buf, int *contLen) {
|
|||
STable *tsdbDecodeTable(void *cont, int contLen) {
|
||||
STable *pTable = (STable *)calloc(1, sizeof(STable));
|
||||
if (pTable == NULL) return NULL;
|
||||
pTable->schema = (STSchema **)malloc(sizeof(STSchema *) * TSDB_MAX_TABLE_SCHEMAS);
|
||||
if (pTable->schema == NULL) {
|
||||
free(pTable);
|
||||
return NULL;
|
||||
}
|
||||
|
||||
void *ptr = cont;
|
||||
T_READ_MEMBER(ptr, int8_t, pTable->type);
|
||||
if (pTable->type != TSDB_CHILD_TABLE) {
|
||||
pTable->schema = (STSchema **)malloc(sizeof(STSchema *) * TSDB_MAX_TABLE_SCHEMAS);
|
||||
if (pTable->schema == NULL) {
|
||||
free(pTable);
|
||||
return NULL;
|
||||
}
|
||||
}
|
||||
int len = *(int *)ptr;
|
||||
ptr = (char *)ptr + sizeof(int);
|
||||
pTable->name = calloc(1, len + VARSTR_HEADER_SIZE + 1);
|
||||
|
@ -620,7 +622,10 @@ static int tsdbFreeTable(STable *pTable) {
|
|||
if (pTable->type == TSDB_CHILD_TABLE) {
|
||||
kvRowFree(pTable->tagVal);
|
||||
} else {
|
||||
for (int i = 0; i < pTable->numOfSchemas; i++) tdFreeSchema(pTable->schema[i]);
|
||||
if (pTable->schema) {
|
||||
for (int i = 0; i < pTable->numOfSchemas; i++) tdFreeSchema(pTable->schema[i]);
|
||||
free(pTable->schema);
|
||||
}
|
||||
}
|
||||
|
||||
if (pTable->type == TSDB_STREAM_TABLE) {
|
||||
|
|
|
@ -764,8 +764,8 @@ static bool tsdbShouldCreateNewLast(SRWHelper *pHelper) {
|
|||
|
||||
static int tsdbWriteBlockToFile(SRWHelper *pHelper, SFile *pFile, SDataCols *pDataCols, int rowsToWrite, SCompBlock *pCompBlock,
|
||||
bool isLast, bool isSuperBlock) {
|
||||
ASSERT(rowsToWrite > 0 && rowsToWrite <= pDataCols->numOfRows &&
|
||||
rowsToWrite <= pHelper->config.maxRowsPerFileBlock);
|
||||
ASSERT(rowsToWrite > 0 && rowsToWrite <= pDataCols->numOfRows && rowsToWrite <= pHelper->config.maxRowsPerFileBlock);
|
||||
ASSERT(isLast ? rowsToWrite < pHelper->config.minRowsPerFileBlock : true);
|
||||
|
||||
SCompData *pCompData = (SCompData *)(pHelper->pBuffer);
|
||||
int64_t offset = 0;
|
||||
|
@ -905,14 +905,15 @@ static int tsdbMergeDataWithBlock(SRWHelper *pHelper, int blkIdx, SDataCols *pDa
|
|||
|
||||
rowsWritten = MIN((defaultRowsToWrite - blockAtIdx(pHelper, blkIdx)->numOfRows), pDataCols->numOfRows);
|
||||
if ((blockAtIdx(pHelper, blkIdx)->numOfSubBlocks < TSDB_MAX_SUBBLOCKS) &&
|
||||
(blockAtIdx(pHelper, blkIdx)->numOfRows + rowsWritten < pHelper->config.minRowsPerFileBlock) && (pHelper->files.nLastF.fd) > 0) {
|
||||
(blockAtIdx(pHelper, blkIdx)->numOfRows + rowsWritten < pHelper->config.minRowsPerFileBlock) &&
|
||||
(pHelper->files.nLastF.fd) < 0) {
|
||||
if (tsdbWriteBlockToFile(pHelper, &(pHelper->files.lastF), pDataCols, rowsWritten, &compBlock, true, false) < 0)
|
||||
goto _err;
|
||||
if (tsdbAddSubBlock(pHelper, &compBlock, blkIdx, rowsWritten) < 0) goto _err;
|
||||
} else {
|
||||
// Load
|
||||
if (tsdbLoadBlockData(pHelper, blockAtIdx(pHelper, blkIdx), NULL) < 0) goto _err;
|
||||
ASSERT(pHelper->pDataCols[0]->numOfRows == blockAtIdx(pHelper, blkIdx)->numOfRows);
|
||||
ASSERT(pHelper->pDataCols[0]->numOfRows <= blockAtIdx(pHelper, blkIdx)->numOfRows);
|
||||
// Merge
|
||||
if (tdMergeDataCols(pHelper->pDataCols[0], pDataCols, rowsWritten) < 0) goto _err;
|
||||
// Write
|
||||
|
@ -936,21 +937,21 @@ static int tsdbMergeDataWithBlock(SRWHelper *pHelper, int blkIdx, SDataCols *pDa
|
|||
// Key must overlap with the block
|
||||
ASSERT(keyFirst <= blockAtIdx(pHelper, blkIdx)->keyLast);
|
||||
|
||||
TSKEY keyLimit =
|
||||
(blkIdx == pIdx->numOfBlocks - 1) ? INT64_MAX : pHelper->pCompInfo->blocks[blkIdx + 1].keyFirst - 1;
|
||||
TSKEY keyLimit = (blkIdx == pIdx->numOfBlocks - 1) ? INT64_MAX : blockAtIdx(pHelper, blkIdx + 1)->keyFirst - 1;
|
||||
|
||||
// rows1: number of rows must merge in this block
|
||||
int rows1 = tsdbGetRowsInRange(pDataCols, blockAtIdx(pHelper, blkIdx)->keyFirst, blockAtIdx(pHelper, blkIdx)->keyLast);
|
||||
// rows2: max nuber of rows the block can have more
|
||||
// rows2: max number of rows the block can have more
|
||||
int rows2 = pHelper->config.maxRowsPerFileBlock - blockAtIdx(pHelper, blkIdx)->numOfRows;
|
||||
// rows3: number of rows between this block and the next block
|
||||
int rows3 = tsdbGetRowsInRange(pDataCols, blockAtIdx(pHelper, blkIdx)->keyFirst, keyLimit);
|
||||
|
||||
ASSERT(rows3 >= rows1);
|
||||
|
||||
if ((rows2 >= rows1) &&
|
||||
(( blockAtIdx(pHelper, blkIdx)->last) ||
|
||||
((rows1 + blockAtIdx(pHelper, blkIdx)->numOfRows < pHelper->config.minRowsPerFileBlock) && (pHelper->files.nLastF.fd < 0)))) {
|
||||
if ((rows2 >= rows1) && (blockAtIdx(pHelper, blkIdx)->numOfSubBlocks < TSDB_MAX_SUBBLOCKS) &&
|
||||
((!blockAtIdx(pHelper, blkIdx)->last) ||
|
||||
((rows1 + blockAtIdx(pHelper, blkIdx)->numOfRows < pHelper->config.minRowsPerFileBlock) &&
|
||||
(pHelper->files.nLastF.fd < 0)))) {
|
||||
rowsWritten = rows1;
|
||||
bool isLast = false;
|
||||
SFile *pFile = NULL;
|
||||
|
@ -964,7 +965,7 @@ static int tsdbMergeDataWithBlock(SRWHelper *pHelper, int blkIdx, SDataCols *pDa
|
|||
|
||||
if (tsdbWriteBlockToFile(pHelper, pFile, pDataCols, rows1, &compBlock, isLast, false) < 0) goto _err;
|
||||
if (tsdbAddSubBlock(pHelper, &compBlock, blkIdx, rowsWritten) < 0) goto _err;
|
||||
} else { // Load-Merge-Write
|
||||
} else { // Load-Merge-Write
|
||||
// Load
|
||||
if (tsdbLoadBlockData(pHelper, blockAtIdx(pHelper, blkIdx), NULL) < 0) goto _err;
|
||||
if (blockAtIdx(pHelper, blkIdx)->last) pHelper->hasOldLastBlock = false;
|
||||
|
@ -1106,16 +1107,16 @@ static int tsdbAddSubBlock(SRWHelper *pHelper, SCompBlock *pCompBlock, int blkId
|
|||
for (int i = blkIdx + 1; i < pIdx->numOfBlocks; i++) {
|
||||
SCompBlock *pTCompBlock = pHelper->pCompInfo->blocks + i;
|
||||
if (pTCompBlock->numOfSubBlocks > 1) {
|
||||
ptr = (void *)((char *)(pHelper->pCompInfo) + pTCompBlock->offset + pTCompBlock->len);
|
||||
ptr = POINTER_SHIFT(pHelper->pCompInfo, pTCompBlock->offset);
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
if (ptr == NULL) ptr = (void *)((char *)(pHelper->pCompInfo) + pIdx->len - sizeof(TSCKSUM));
|
||||
if (ptr == NULL) ptr = POINTER_SHIFT(pHelper->pCompInfo, pIdx->len-sizeof(TSCKSUM));
|
||||
|
||||
size_t tsize = pIdx->len - ((char *)ptr - (char *)(pHelper->pCompInfo));
|
||||
if (tsize > 0) {
|
||||
memmove((void *)((char *)ptr + sizeof(SCompBlock) * 2), ptr, tsize);
|
||||
memmove(POINTER_SHIFT(ptr, sizeof(SCompBlock) * 2), ptr, tsize);
|
||||
for (int i = blkIdx + 1; i < pIdx->numOfBlocks; i++) {
|
||||
SCompBlock *pTCompBlock = pHelper->pCompInfo->blocks + i;
|
||||
if (pTCompBlock->numOfSubBlocks > 1) pTCompBlock->offset += (sizeof(SCompBlock) * 2);
|
||||
|
|
|
@ -0,0 +1,60 @@
|
|||
/*
|
||||
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
|
||||
*
|
||||
* This program is free software: you can use, redistribute, and/or modify
|
||||
* it under the terms of the GNU Affero General Public License, version 3
|
||||
* or later ("AGPL"), as published by the Free Software Foundation.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful, but WITHOUT
|
||||
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
||||
* FITNESS FOR A PARTICULAR PURPOSE.
|
||||
*
|
||||
* You should have received a copy of the GNU Affero General Public License
|
||||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
#ifndef _TD_KVSTORE_H_
|
||||
#define _TD_KVSTORE_H_
|
||||
|
||||
#ifdef __cplusplus
|
||||
extern "C" {
|
||||
#endif
|
||||
|
||||
#include <stdint.h>
|
||||
|
||||
typedef int (*iterFunc)(void *, void *cont, int contLen);
|
||||
typedef void (*afterFunc)(void *);
|
||||
|
||||
typedef struct {
|
||||
int64_t size;
|
||||
int64_t tombSize;
|
||||
int64_t nRecords;
|
||||
int64_t nDels;
|
||||
} SStoreInfo;
|
||||
|
||||
typedef struct {
|
||||
char * fname;
|
||||
int fd;
|
||||
char * fsnap;
|
||||
int sfd;
|
||||
char * fnew;
|
||||
int nfd;
|
||||
SHashObj * map;
|
||||
iterFunc iFunc;
|
||||
afterFunc aFunc;
|
||||
void * appH;
|
||||
SStoreInfo info;
|
||||
} SKVStore;
|
||||
|
||||
int tdCreateKVStore(char *fname);
|
||||
int tdDestroyKVStore();
|
||||
SKVStore *tdOpenKVStore(char *fname, iterFunc iFunc, afterFunc aFunc, void *appH);
|
||||
void tdCloseKVStore(SKVStore *pStore);
|
||||
int tdKVStoreStartCommit(SKVStore *pStore);
|
||||
int tdUpdateRecordInKVStore(SKVStore *pStore, uint64_t uid, void *cont, int contLen);
|
||||
int tdKVStoreEndCommit(SKVStore *pStore);
|
||||
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
#endif
|
||||
|
||||
#endif
|
|
@ -0,0 +1,292 @@
|
|||
/*
|
||||
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
|
||||
*
|
||||
* This program is free software: you can use, redistribute, and/or modify
|
||||
* it under the terms of the GNU Affero General Public License, version 3
|
||||
* or later ("AGPL"), as published by the Free Software Foundation.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful, but WITHOUT
|
||||
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
||||
* FITNESS FOR A PARTICULAR PURPOSE.
|
||||
*
|
||||
* You should have received a copy of the GNU Affero General Public License
|
||||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
#include <errno.h>
|
||||
#include <fcntl.h>
|
||||
#include <libgen.h>
|
||||
#include <string.h>
|
||||
#include <sys/stat.h>
|
||||
#include <sys/types.h>
|
||||
#include <unistd.h>
|
||||
|
||||
#include "hash.h"
|
||||
#include "os.h"
|
||||
#include "taoserror.h"
|
||||
#include "tchecksum.h"
|
||||
#include "tcoding.h"
|
||||
#include "tkvstore.h"
|
||||
#include "tulog.h"
|
||||
|
||||
#define TD_KVSTORE_HEADER_SIZE 512
|
||||
#define TD_KVSTORE_MAJOR_VERSION 1
|
||||
#define TD_KVSTORE_MAINOR_VERSION 0
|
||||
#define TD_KVSTORE_SNAP_SUFFIX ".snap"
|
||||
#define TD_KVSTORE_NEW_SUFFIX ".new"
|
||||
|
||||
static int tdInitKVStoreHeader(int fd, char *fname);
|
||||
static void * tdEncodeStoreInfo(void *buf, SStoreInfo *pInfo);
|
||||
// static void * tdDecodeStoreInfo(void *buf, SStoreInfo *pInfo);
|
||||
static SKVStore *tdNewKVStore(char *fname, iterFunc iFunc, afterFunc aFunc, void *appH);
|
||||
static char * tdGetKVStoreSnapshotFname(char *fdata);
|
||||
static char * tdGetKVStoreNewFname(char *fdata);
|
||||
static void tdFreeKVStore(SKVStore *pStore);
|
||||
static int tdUpdateKVStoreHeader(int fd, char *fname, SStoreInfo *pInfo);
|
||||
|
||||
int tdCreateKVStore(char *fname) {
|
||||
char *tname = strdup(fname);
|
||||
if (tname == NULL) return TSDB_CODE_COM_OUT_OF_MEMORY;
|
||||
|
||||
int fd = open(fname, O_RDWR | O_CREAT, 0755);
|
||||
if (fd < 0) {
|
||||
uError("failed to open file %s since %s", fname, strerror(errno));
|
||||
return TAOS_SYSTEM_ERROR(errno);
|
||||
}
|
||||
|
||||
int code = tdInitKVStoreHeader(fd, fname);
|
||||
if (code != TSDB_CODE_SUCCESS) return code;
|
||||
|
||||
if (fsync(fd) < 0) {
|
||||
uError("failed to fsync file %s since %s", fname, strerror(errno));
|
||||
return TAOS_SYSTEM_ERROR(errno);
|
||||
}
|
||||
|
||||
if (close(fd) < 0) {
|
||||
uError("failed to close file %s since %s", fname, strerror(errno));
|
||||
return TAOS_SYSTEM_ERROR(errno);
|
||||
}
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
SKVStore *tdOpenKVStore(char *fname, iterFunc iFunc, afterFunc aFunc, void *appH) {
|
||||
SKVStore *pStore = tdNewKVStore(fname, iFunc, aFunc, appH);
|
||||
if (pStore == NULL) return NULL;
|
||||
|
||||
pStore->fd = open(pStore->fname, O_RDWR);
|
||||
if (pStore->fd < 0) {
|
||||
uError("failed to open file %s since %s", pStore->fname, strerror(errno));
|
||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||
goto _err;
|
||||
}
|
||||
|
||||
if (access(pStore->fsnap, F_OK) == 0) {
|
||||
uTrace("file %s exists, try to recover the KV store", pStore->fsnap);
|
||||
pStore->sfd = open(pStore->fsnap, O_RDONLY);
|
||||
if (pStore->sfd < 0) {
|
||||
uError("failed to open file %s since %s", pStore->fsnap, strerror(errno));
|
||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||
goto _err;
|
||||
}
|
||||
|
||||
// TODO: rewind the file
|
||||
|
||||
close(pStore->sfd);
|
||||
pStore->sfd = -1;
|
||||
remove(pStore->fsnap);
|
||||
}
|
||||
|
||||
// TODO: Recover from the file
|
||||
|
||||
return pStore;
|
||||
|
||||
_err:
|
||||
if (pStore->fd > 0) {
|
||||
close(pStore->fd);
|
||||
pStore->fd = -1;
|
||||
}
|
||||
if (pStore->sfd > 0) {
|
||||
close(pStore->sfd);
|
||||
pStore->sfd = -1;
|
||||
}
|
||||
tdFreeKVStore(pStore);
|
||||
return NULL;
|
||||
}
|
||||
|
||||
int tdKVStoreStartCommit(SKVStore *pStore) {
|
||||
pStore->fd = open(pStore->fname, O_RDWR);
|
||||
if (pStore->fd < 0) {
|
||||
uError("failed to open file %s since %s", pStore->fname, strerror(errno));
|
||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||
goto _err;
|
||||
}
|
||||
|
||||
pStore->sfd = open(pStore->fsnap, O_WRONLY | O_CREAT, 0755);
|
||||
if (pStore->sfd < 0) {
|
||||
uError("failed to open file %s since %s", pStore->fsnap, strerror(errno));
|
||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||
goto _err;
|
||||
}
|
||||
|
||||
if (tsendfile(pStore->sfd, pStore->fd, NULL, TD_KVSTORE_HEADER_SIZE) < TD_KVSTORE_HEADER_SIZE) {
|
||||
uError("failed to send file %d bytes since %s", TD_KVSTORE_HEADER_SIZE, strerror(errno));
|
||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||
goto _err;
|
||||
}
|
||||
|
||||
if (fsync(pStore->sfd) < 0) {
|
||||
uError("failed to fsync file %s since %s", pStore->fsnap, strerror(errno));
|
||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||
goto _err;
|
||||
}
|
||||
|
||||
if (close(pStore->sfd) < 0) {
|
||||
uError("failed to close file %s since %s", pStore->fsnap, strerror(errno));
|
||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||
goto _err;
|
||||
}
|
||||
pStore->sfd = -1;
|
||||
|
||||
return 0;
|
||||
|
||||
_err:
|
||||
if (pStore->sfd > 0) {
|
||||
close(pStore->sfd);
|
||||
pStore->sfd = -1;
|
||||
remove(pStore->fsnap);
|
||||
}
|
||||
if (pStore->fd > 0) {
|
||||
close(pStore->fd);
|
||||
pStore->fd = -1;
|
||||
}
|
||||
return -1;
|
||||
}
|
||||
|
||||
int tdKVStoreEndCommit(SKVStore *pStore) {
|
||||
ASSERT(pStore->fd > 0);
|
||||
|
||||
terrno = tdUpdateKVStoreHeader(pStore->fd, pStore->fname, &(pStore->info));
|
||||
if (terrno != TSDB_CODE_SUCCESS) return -1;
|
||||
|
||||
if (fsync(pStore->fd) < 0) {
|
||||
uError("failed to fsync file %s since %s", pStore->fname, strerror(errno));
|
||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||
return -1;
|
||||
}
|
||||
|
||||
if (close(pStore->fd) < 0) {
|
||||
uError("failed to close file %s since %s", pStore->fname, strerror(errno));
|
||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||
return -1;
|
||||
}
|
||||
|
||||
remove(pStore->fsnap);
|
||||
return 0;
|
||||
}
|
||||
|
||||
static int tdUpdateKVStoreHeader(int fd, char *fname, SStoreInfo *pInfo) {
|
||||
char buf[TD_KVSTORE_HEADER_SIZE] = "\0";
|
||||
|
||||
if (lseek(fd, 0, SEEK_SET) < 0) {
|
||||
uError("failed to lseek file %s since %s", fname, strerror(errno));
|
||||
return TAOS_SYSTEM_ERROR(errno);
|
||||
}
|
||||
|
||||
tdEncodeStoreInfo(buf, pInfo);
|
||||
taosCalcChecksumAppend(0, (uint8_t *)buf, TD_KVSTORE_HEADER_SIZE);
|
||||
if (twrite(fd, buf, TD_KVSTORE_HEADER_SIZE) < TD_KVSTORE_HEADER_SIZE) {
|
||||
uError("failed to write file %s %d bytes since %s", fname, TD_KVSTORE_HEADER_SIZE, strerror(errno));
|
||||
return TAOS_SYSTEM_ERROR(errno);
|
||||
}
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static int tdInitKVStoreHeader(int fd, char *fname) {
|
||||
SStoreInfo info = {TD_KVSTORE_HEADER_SIZE, 0, 0, 0};
|
||||
|
||||
return tdUpdateKVStoreHeader(fd, fname, &info);
|
||||
}
|
||||
|
||||
static void *tdEncodeStoreInfo(void *buf, SStoreInfo *pInfo) {
|
||||
buf = taosEncodeVariantI64(buf, pInfo->size);
|
||||
buf = taosEncodeVariantI64(buf, pInfo->tombSize);
|
||||
buf = taosEncodeVariantI64(buf, pInfo->nRecords);
|
||||
buf = taosEncodeVariantI64(buf, pInfo->nDels);
|
||||
|
||||
return buf;
|
||||
}
|
||||
|
||||
// static void *tdDecodeStoreInfo(void *buf, SStoreInfo *pInfo) {
|
||||
// buf = taosDecodeVariantI64(buf, &(pInfo->size));
|
||||
// buf = taosDecodeVariantI64(buf, &(pInfo->tombSize));
|
||||
// buf = taosDecodeVariantI64(buf, &(pInfo->nRecords));
|
||||
// buf = taosDecodeVariantI64(buf, &(pInfo->nDels));
|
||||
|
||||
// return buf;
|
||||
// }
|
||||
|
||||
static SKVStore *tdNewKVStore(char *fname, iterFunc iFunc, afterFunc aFunc, void *appH) {
|
||||
SKVStore *pStore = (SKVStore *)malloc(sizeof(SKVStore));
|
||||
if (pStore == NULL) goto _err;
|
||||
|
||||
pStore->fname = strdup(fname);
|
||||
if (pStore->map == NULL) goto _err;
|
||||
|
||||
pStore->fsnap = tdGetKVStoreSnapshotFname(fname);
|
||||
if (pStore->fsnap == NULL) goto _err;
|
||||
|
||||
pStore->fnew = tdGetKVStoreNewFname(fname);
|
||||
if (pStore->fnew == NULL) goto _err;
|
||||
|
||||
pStore->fd = -1;
|
||||
pStore->sfd = -1;
|
||||
pStore->nfd = -1;
|
||||
pStore->iFunc = iFunc;
|
||||
pStore->aFunc = aFunc;
|
||||
pStore->appH = appH;
|
||||
pStore->map = taosHashInit(4096, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false);
|
||||
if (pStore->map == NULL) {
|
||||
terrno = TSDB_CODE_COM_OUT_OF_MEMORY;
|
||||
goto _err;
|
||||
}
|
||||
|
||||
return pStore;
|
||||
|
||||
_err:
|
||||
terrno = TSDB_CODE_COM_OUT_OF_MEMORY;
|
||||
tdFreeKVStore(pStore);
|
||||
return NULL;
|
||||
}
|
||||
|
||||
static void tdFreeKVStore(SKVStore *pStore) {
|
||||
if (pStore) {
|
||||
tfree(pStore->fname);
|
||||
tfree(pStore->fsnap);
|
||||
tfree(pStore->fnew);
|
||||
taosHashCleanup(pStore->map);
|
||||
free(pStore);
|
||||
}
|
||||
}
|
||||
|
||||
static char *tdGetKVStoreSnapshotFname(char *fdata) {
|
||||
size_t size = strlen(fdata) + strlen(TD_KVSTORE_SNAP_SUFFIX) + 1;
|
||||
char * fname = malloc(size);
|
||||
if (fname == NULL) {
|
||||
terrno = TSDB_CODE_COM_OUT_OF_MEMORY;
|
||||
return NULL;
|
||||
}
|
||||
sprintf(fname, "%s%s", fdata, TD_KVSTORE_SNAP_SUFFIX);
|
||||
return fname;
|
||||
}
|
||||
|
||||
static char *tdGetKVStoreNewFname(char *fdata) {
|
||||
size_t size = strlen(fdata) + strlen(TD_KVSTORE_NEW_SUFFIX) + 1;
|
||||
char * fname = malloc(size);
|
||||
if (fname == NULL) {
|
||||
terrno = TSDB_CODE_COM_OUT_OF_MEMORY;
|
||||
return NULL;
|
||||
}
|
||||
sprintf(fname, "%s%s", fdata, TD_KVSTORE_NEW_SUFFIX);
|
||||
return fname;
|
||||
}
|
|
@ -21,14 +21,16 @@
|
|||
#include <stdlib.h>
|
||||
#include <string.h>
|
||||
#include <taos.h> // TAOS header file
|
||||
#include <unistd.h>
|
||||
|
||||
void taosMsleep(int mseconds);
|
||||
#include <sys/time.h>
|
||||
#include <inttypes.h>
|
||||
|
||||
static int32_t doQuery(TAOS* taos, const char* sql) {
|
||||
struct timeval t1 = {0};
|
||||
gettimeofday(&t1, NULL);
|
||||
|
||||
TAOS_RES* res = taos_query(taos, sql);
|
||||
if (taos_errno(res) != 0) {
|
||||
printf("failed to execute query, reason:%s\n", taos_errstr(taos));
|
||||
printf("failed to execute query, reason:%s\n", taos_errstr(res));
|
||||
return -1;
|
||||
}
|
||||
|
||||
|
@ -38,13 +40,19 @@ static int32_t doQuery(TAOS* taos, const char* sql) {
|
|||
int32_t numOfFields = taos_num_fields(res);
|
||||
TAOS_FIELD* pFields = taos_fetch_fields(res);
|
||||
|
||||
int32_t i = 0;
|
||||
while((row = taos_fetch_row(res)) != NULL) {
|
||||
taos_print_row(buf, row, pFields, numOfFields);
|
||||
printf("%s\n", buf);
|
||||
printf("%d:%s\n", ++i, buf);
|
||||
memset(buf, 0, 512);
|
||||
}
|
||||
|
||||
taos_free_result(res);
|
||||
|
||||
struct timeval t2 = {0};
|
||||
gettimeofday(&t2, NULL);
|
||||
|
||||
printf("elapsed time:%"PRId64 " ms\n", ((t2.tv_sec*1000000 + t2.tv_usec) - (t1.tv_sec*1000000 + t1.tv_usec))/1000);
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
@ -101,14 +109,18 @@ int main(int argc, char *argv[]) {
|
|||
|
||||
taos = taos_connect(argv[1], "root", "taosdata", NULL, 0);
|
||||
if (taos == NULL) {
|
||||
printf("failed to connect to server, reason:%s\n", taos_errstr(taos));
|
||||
printf("failed to connect to server, reason:%s\n", taos_errstr(NULL));
|
||||
exit(1);
|
||||
}
|
||||
printf("success to connect to server\n");
|
||||
|
||||
printf("success to connect to server\n");
|
||||
// doQuery(taos, "select c1,count(*) from group_db0.group_mt0 where c1<8 group by c1");
|
||||
doQuery(taos, "select * from test.m1");
|
||||
|
||||
// multiThreadTest(1, taos);
|
||||
doQuery(taos, "use test");
|
||||
doQuery(taos, "alter table tm99 set tag a=99");
|
||||
// doQuery(taos, "select tbname from test.m1");
|
||||
// doQuery(taos, "select max(c1), min(c2), sum(c3), avg(c4), first(c7), last(c8), first(c9) from lm2_db0.lm2_stb0 where ts >= 1537146000000 and ts <= 1543145400000 and tbname in ('lm2_tb0') interval(1s) group by t1");
|
||||
// doQuery(taos, "select max(c1), min(c2), sum(c3), avg(c4), first(c7), last(c8), first(c9) from lm2_db0.lm2_stb0 where ts >= 1537146000000 and ts <= 1543145400000 and tbname in ('lm2_tb0', 'lm2_tb1', 'lm2_tb2') interval(1s)");
|
||||
// for(int32_t i = 0; i < 100000; ++i) {
|
||||
// doQuery(taos, "insert into t1 values(now, 2)");
|
||||
// }
|
||||
|
|
|
@ -0,0 +1,136 @@
|
|||
#!/bin/bash
|
||||
ulimit -c unlimited
|
||||
|
||||
python3 ./test.py -f insert/basic.py
|
||||
python3 ./test.py -f insert/int.py
|
||||
python3 ./test.py -f insert/float.py
|
||||
python3 ./test.py -f insert/bigint.py
|
||||
python3 ./test.py -f insert/bool.py
|
||||
python3 ./test.py -f insert/double.py
|
||||
python3 ./test.py -f insert/smallint.py
|
||||
python3 ./test.py -f insert/tinyint.py
|
||||
python3 ./test.py -f insert/date.py
|
||||
python3 ./test.py -f insert/binary.py
|
||||
python3 ./test.py -f insert/nchar.py
|
||||
# python3 ./test.py -f insert/nchar-boundary.py
|
||||
# python3 ./test.py -f insert/nchar-unicode.py
|
||||
python3 ./test.py -f insert/multi.py
|
||||
python3 ./test.py -f insert/randomNullCommit.py
|
||||
|
||||
python3 ./test.py -f table/column_name.py
|
||||
python3 ./test.py -f table/column_num.py
|
||||
python3 ./test.py -f table/db_table.py
|
||||
# python3 ./test.py -f table/tablename-boundary.py
|
||||
|
||||
# tag
|
||||
python3 ./test.py -f tag_lite/filter.py
|
||||
python3 ./test.py -f tag_lite/create-tags-boundary.py
|
||||
python3 ./test.py -f tag_lite/3.py
|
||||
python3 ./test.py -f tag_lite/4.py
|
||||
python3 ./test.py -f tag_lite/5.py
|
||||
python3 ./test.py -f tag_lite/6.py
|
||||
# python3 ./test.py -f tag_lite/add.py
|
||||
python3 ./test.py -f tag_lite/bigint.py
|
||||
python3 ./test.py -f tag_lite/binary_binary.py
|
||||
python3 ./test.py -f tag_lite/binary.py
|
||||
python3 ./test.py -f tag_lite/bool_binary.py
|
||||
python3 ./test.py -f tag_lite/bool_int.py
|
||||
python3 ./test.py -f tag_lite/bool.py
|
||||
python3 ./test.py -f tag_lite/change.py
|
||||
python3 ./test.py -f tag_lite/column.py
|
||||
# python3 ./test.py -f tag_lite/commit.py
|
||||
python3 ./test.py -f tag_lite/create.py
|
||||
# python3 ./test.py -f tag_lite/datatype.py
|
||||
python3 ./test.py -f tag_lite/datatype-without-alter.py
|
||||
# python3 ./test.py -f tag_lite/delete.py
|
||||
python3 ./test.py -f tag_lite/double.py
|
||||
python3 ./test.py -f tag_lite/float.py
|
||||
python3 ./test.py -f tag_lite/int_binary.py
|
||||
python3 ./test.py -f tag_lite/int_float.py
|
||||
python3 ./test.py -f tag_lite/int.py
|
||||
# python3 ./test.py -f tag_lite/set.py
|
||||
python3 ./test.py -f tag_lite/smallint.py
|
||||
python3 ./test.py -f tag_lite/tinyint.py
|
||||
|
||||
# python3 ./test.py -f dbmgmt/database-name-boundary.py
|
||||
|
||||
python3 ./test.py -f import_merge/importBlock1HO.py
|
||||
python3 ./test.py -f import_merge/importBlock1HPO.py
|
||||
python3 ./test.py -f import_merge/importBlock1H.py
|
||||
python3 ./test.py -f import_merge/importBlock1S.py
|
||||
python3 ./test.py -f import_merge/importBlock1Sub.py
|
||||
python3 ./test.py -f import_merge/importBlock1TO.py
|
||||
python3 ./test.py -f import_merge/importBlock1TPO.py
|
||||
python3 ./test.py -f import_merge/importBlock1T.py
|
||||
python3 ./test.py -f import_merge/importBlock2HO.py
|
||||
python3 ./test.py -f import_merge/importBlock2HPO.py
|
||||
python3 ./test.py -f import_merge/importBlock2H.py
|
||||
python3 ./test.py -f import_merge/importBlock2S.py
|
||||
python3 ./test.py -f import_merge/importBlock2Sub.py
|
||||
python3 ./test.py -f import_merge/importBlock2TO.py
|
||||
python3 ./test.py -f import_merge/importBlock2TPO.py
|
||||
python3 ./test.py -f import_merge/importBlock2T.py
|
||||
python3 ./test.py -f import_merge/importBlockbetween.py
|
||||
python3 ./test.py -f import_merge/importCacheFileHO.py
|
||||
python3 ./test.py -f import_merge/importCacheFileHPO.py
|
||||
python3 ./test.py -f import_merge/importCacheFileH.py
|
||||
python3 ./test.py -f import_merge/importCacheFileS.py
|
||||
python3 ./test.py -f import_merge/importCacheFileSub.py
|
||||
python3 ./test.py -f import_merge/importCacheFileTO.py
|
||||
python3 ./test.py -f import_merge/importCacheFileTPO.py
|
||||
python3 ./test.py -f import_merge/importCacheFileT.py
|
||||
python3 ./test.py -f import_merge/importDataH2.py
|
||||
# python3 ./test.py -f import_merge/importDataHO2.py
|
||||
# python3 ./test.py -f import_merge/importDataHO.py
|
||||
python3 ./test.py -f import_merge/importDataHPO.py
|
||||
python3 ./test.py -f import_merge/importDataLastHO.py
|
||||
python3 ./test.py -f import_merge/importDataLastHPO.py
|
||||
python3 ./test.py -f import_merge/importDataLastH.py
|
||||
python3 ./test.py -f import_merge/importDataLastS.py
|
||||
python3 ./test.py -f import_merge/importDataLastSub.py
|
||||
python3 ./test.py -f import_merge/importDataLastTO.py
|
||||
python3 ./test.py -f import_merge/importDataLastTPO.py
|
||||
python3 ./test.py -f import_merge/importDataLastT.py
|
||||
python3 ./test.py -f import_merge/importDataS.py
|
||||
# python3 ./test.py -f import_merge/importDataSub.py
|
||||
python3 ./test.py -f import_merge/importDataTO.py
|
||||
python3 ./test.py -f import_merge/importDataTPO.py
|
||||
python3 ./test.py -f import_merge/importDataT.py
|
||||
python3 ./test.py -f import_merge/importHeadOverlap.py
|
||||
python3 ./test.py -f import_merge/importHeadPartOverlap.py
|
||||
python3 ./test.py -f import_merge/importHead.py
|
||||
python3 ./test.py -f import_merge/importHORestart.py
|
||||
python3 ./test.py -f import_merge/importHPORestart.py
|
||||
python3 ./test.py -f import_merge/importHRestart.py
|
||||
python3 ./test.py -f import_merge/importLastHO.py
|
||||
python3 ./test.py -f import_merge/importLastHPO.py
|
||||
python3 ./test.py -f import_merge/importLastH.py
|
||||
python3 ./test.py -f import_merge/importLastS.py
|
||||
python3 ./test.py -f import_merge/importLastSub.py
|
||||
python3 ./test.py -f import_merge/importLastTO.py
|
||||
python3 ./test.py -f import_merge/importLastTPO.py
|
||||
python3 ./test.py -f import_merge/importLastT.py
|
||||
python3 ./test.py -f import_merge/importSpan.py
|
||||
python3 ./test.py -f import_merge/importSRestart.py
|
||||
python3 ./test.py -f import_merge/importSubRestart.py
|
||||
python3 ./test.py -f import_merge/importTailOverlap.py
|
||||
python3 ./test.py -f import_merge/importTailPartOverlap.py
|
||||
python3 ./test.py -f import_merge/importTail.py
|
||||
python3 ./test.py -f import_merge/importToCommit.py
|
||||
python3 ./test.py -f import_merge/importTORestart.py
|
||||
python3 ./test.py -f import_merge/importTPORestart.py
|
||||
python3 ./test.py -f import_merge/importTRestart.py
|
||||
python3 ./test.py -f import_merge/importInsertThenImport.py
|
||||
|
||||
# user
|
||||
python3 ./test.py -f user/user_create.py
|
||||
python3 ./test.py -f user/pass_len.py
|
||||
|
||||
# table
|
||||
#python3 ./test.py -f table/del_stable.py
|
||||
|
||||
#query
|
||||
python3 ./test.py -f query/filter.py
|
||||
# python3 ./test.py -f query/filterCombo.py
|
||||
# python3 ./test.py -f query/queryNormal.py
|
||||
# python3 ./test.py -f query/queryError.py
|
|
@ -194,13 +194,13 @@ class TDDnode:
|
|||
selfPath = os.path.dirname(os.path.realpath(__file__))
|
||||
binPath = ""
|
||||
|
||||
if ("TDinternal" in selfPath):
|
||||
if ("community" in selfPath):
|
||||
projPath = selfPath + "/../../../../"
|
||||
|
||||
for root, dirs, files in os.walk(projPath):
|
||||
if ("taosd" in files):
|
||||
rootRealPath = os.path.dirname(os.path.realpath(root))
|
||||
if ("community" not in rootRealPath):
|
||||
if ("packaging" not in rootRealPath):
|
||||
binPath = os.path.join(root, "taosd")
|
||||
break
|
||||
else:
|
||||
|
@ -213,7 +213,7 @@ class TDDnode:
|
|||
break
|
||||
|
||||
if (binPath == ""):
|
||||
tdLog.exit("taosd not found!s")
|
||||
tdLog.exit("taosd not found!")
|
||||
else:
|
||||
tdLog.info("taosd found in %s" % rootRealPath)
|
||||
|
||||
|
|
|
@ -180,7 +180,7 @@ if $data03 != 0 then
|
|||
endi
|
||||
|
||||
print $data04
|
||||
if $data04 != 0.0000 then
|
||||
if $data04 != 0.00000 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
|
@ -201,7 +201,8 @@ if $data13 != 1 then
|
|||
return -1
|
||||
endi
|
||||
|
||||
if $data14 != 1.0000 then
|
||||
if $data14 != 1.00000 then
|
||||
print expect 1.00000, actual:$data14
|
||||
return -1
|
||||
endi
|
||||
|
||||
|
@ -345,6 +346,19 @@ if $data94 != 9 then
|
|||
return -1
|
||||
endi
|
||||
|
||||
sql select c1,sum(c1),avg(c1),count(*) from group_mt0 where c1<5 group by c1;
|
||||
if $row != 5 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
if $data00 != 0 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
if $data01 != 800 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
sql select first(c1), last(ts), first(ts), last(c1),sum(c1),avg(c1),count(*) from group_mt0 where c1<20 group by tbname,c1;
|
||||
if $row != 160 then
|
||||
return -1
|
||||
|
|
|
@ -0,0 +1,213 @@
|
|||
#unsupport run general/alter/cached_schema_after_alter.sim
|
||||
#unsupport run general/alter/count.sim
|
||||
#unsupport run general/alter/import.sim
|
||||
#unsupport run general/alter/insert1.sim
|
||||
#unsupport run general/alter/insert2.sim
|
||||
#unsupport run general/alter/metrics.sim
|
||||
#unsupport run general/alter/table.sim
|
||||
run general/cache/new_metrics.sim
|
||||
run general/cache/restart_metrics.sim
|
||||
run general/cache/restart_table.sim
|
||||
run general/connection/connection.sim
|
||||
run general/column/commit.sim
|
||||
run general/column/metrics.sim
|
||||
run general/column/table.sim
|
||||
run general/compress/commitlog.sim
|
||||
run general/compress/compress.sim
|
||||
run general/compress/compress2.sim
|
||||
run general/compress/uncompress.sim
|
||||
run general/compute/avg.sim
|
||||
run general/compute/bottom.sim
|
||||
run general/compute/count.sim
|
||||
run general/compute/diff.sim
|
||||
run general/compute/diff2.sim
|
||||
run general/compute/first.sim
|
||||
run general/compute/interval.sim
|
||||
run general/compute/last.sim
|
||||
run general/compute/leastsquare.sim
|
||||
run general/compute/max.sim
|
||||
run general/compute/min.sim
|
||||
run general/compute/null.sim
|
||||
run general/compute/percentile.sim
|
||||
run general/compute/stddev.sim
|
||||
run general/compute/sum.sim
|
||||
run general/compute/top.sim
|
||||
run general/db/alter_option.sim
|
||||
run general/db/alter_tables_d2.sim
|
||||
run general/db/alter_tables_v1.sim
|
||||
run general/db/alter_tables_v4.sim
|
||||
run general/db/alter_vgroups.sim
|
||||
run general/db/basic.sim
|
||||
run general/db/basic1.sim
|
||||
run general/db/basic2.sim
|
||||
run general/db/basic3.sim
|
||||
run general/db/basic4.sim
|
||||
run general/db/basic5.sim
|
||||
run general/db/delete_reuse1.sim
|
||||
run general/db/delete_reuse2.sim
|
||||
run general/db/delete_reusevnode.sim
|
||||
run general/db/delete_reusevnode2.sim
|
||||
run general/db/delete_writing1.sim
|
||||
run general/db/delete_writing2.sim
|
||||
run general/db/delete.sim
|
||||
run general/db/len.sim
|
||||
run general/db/repeat.sim
|
||||
run general/db/tables.sim
|
||||
run general/db/vnodes.sim
|
||||
run general/field/2.sim
|
||||
run general/field/3.sim
|
||||
run general/field/4.sim
|
||||
run general/field/5.sim
|
||||
run general/field/6.sim
|
||||
run general/field/bigint.sim
|
||||
run general/field/binary.sim
|
||||
run general/field/bool.sim
|
||||
run general/field/single.sim
|
||||
run general/field/smallint.sim
|
||||
run general/field/tinyint.sim
|
||||
run general/http/restful.sim
|
||||
run general/http/restful_insert.sim
|
||||
run general/http/restful_limit.sim
|
||||
run general/http/restful_full.sim
|
||||
run general/http/prepare.sim
|
||||
run general/http/telegraf.sim
|
||||
# run general/http/grafana_bug.sim
|
||||
# run general/http/grafana.sim
|
||||
run general/import/basic.sim
|
||||
run general/import/commit.sim
|
||||
run general/import/large.sim
|
||||
run general/import/replica1.sim
|
||||
run general/insert/basic.sim
|
||||
run general/insert/insert_drop.sim
|
||||
run general/insert/query_block1_memory.sim
|
||||
run general/insert/query_block2_memory.sim
|
||||
run general/insert/query_block1_file.sim
|
||||
run general/insert/query_block2_file.sim
|
||||
run general/insert/query_file_memory.sim
|
||||
run general/insert/query_multi_file.sim
|
||||
run general/insert/tcp.sim
|
||||
#unsupport run general/parser/alter.sim
|
||||
#unsupport run general/parser/alter1.sim
|
||||
#unsupport run general/parser/alter_stable.sim
|
||||
run general/parser/auto_create_tb.sim
|
||||
run general/parser/auto_create_tb_drop_tb.sim
|
||||
run general/parser/col_arithmetic_operation.sim
|
||||
run general/parser/columnValue.sim
|
||||
# run general/parser/commit.sim
|
||||
run general/parser/create_db.sim
|
||||
run general/parser/create_mt.sim
|
||||
run general/parser/create_tb.sim
|
||||
run general/parser/dbtbnameValidate.sim
|
||||
run general/parser/import_commit1.sim
|
||||
run general/parser/import_commit2.sim
|
||||
run general/parser/import_commit3.sim
|
||||
run general/parser/insert_tb.sim
|
||||
# run general/parser/first_last.sim
|
||||
#unsupport run general/parser/import_file.sim
|
||||
# run general/parser/lastrow.sim
|
||||
run general/parser/nchar.sim
|
||||
#unsupport run general/parser/null_char.sim
|
||||
# run general/parser/single_row_in_tb.sim
|
||||
run general/parser/select_from_cache_disk.sim
|
||||
# run general/parser/limit.sim
|
||||
# run general/parser/limit1.sim
|
||||
# run general/parser/limit1_tblocks100.sim
|
||||
# run general/parser/mixed_blocks.sim
|
||||
# run general/parser/selectResNum.sim
|
||||
run general/parser/select_across_vnodes.sim
|
||||
run general/parser/slimit1.sim
|
||||
run general/parser/tbnameIn.sim
|
||||
run general/parser/binary_escapeCharacter.sim
|
||||
# run general/parser/projection_limit_offset.sim
|
||||
run general/parser/limit2.sim
|
||||
# run general/parser/slimit.sim
|
||||
run general/parser/fill.sim
|
||||
# run general/parser/fill_stb.sim
|
||||
# run general/parser/interp.sim
|
||||
# run general/parser/where.sim
|
||||
#unsupport run general/parser/join.sim
|
||||
#unsupport run general/parser/join_multivnode.sim
|
||||
# run general/parser/select_with_tags.sim
|
||||
#unsupport run general/parser/groupby.sim
|
||||
#unsupport run general/parser/bug.sim
|
||||
#unsupport run general/parser/tags_dynamically_specifiy.sim
|
||||
#unsupport run general/parser/set_tag_vals.sim
|
||||
#unsupport run general/parser/repeatAlter.sim
|
||||
#unsupport run general/parser/slimit_alter_tags.sim
|
||||
#unsupport run general/parser/stream_on_sys.sim
|
||||
#unsupport run general/parser/stream.sim
|
||||
#unsupport run general/parser/repeatStream.sim
|
||||
run general/stable/disk.sim
|
||||
run general/stable/dnode3.sim
|
||||
run general/stable/metrics.sim
|
||||
run general/stable/values.sim
|
||||
run general/stable/vnode3.sim
|
||||
# run general/table/autocreate.sim
|
||||
run general/table/basic1.sim
|
||||
run general/table/basic2.sim
|
||||
run general/table/basic3.sim
|
||||
run general/table/bigint.sim
|
||||
run general/table/binary.sim
|
||||
run general/table/bool.sim
|
||||
run general/table/column_name.sim
|
||||
run general/table/column_num.sim
|
||||
run general/table/column_value.sim
|
||||
run general/table/column2.sim
|
||||
run general/table/date.sim
|
||||
run general/table/db.table.sim
|
||||
run general/table/delete_reuse1.sim
|
||||
run general/table/delete_reuse2.sim
|
||||
run general/table/delete_writing.sim
|
||||
run general/table/describe.sim
|
||||
run general/table/double.sim
|
||||
run general/table/fill.sim
|
||||
run general/table/float.sim
|
||||
run general/table/int.sim
|
||||
run general/table/limit.sim
|
||||
run general/table/smallint.sim
|
||||
run general/table/table_len.sim
|
||||
# run general/table/table.sim
|
||||
run general/table/tinyint.sim
|
||||
run general/table/vgroup.sim
|
||||
run general/tag/3.sim
|
||||
run general/tag/4.sim
|
||||
run general/tag/5.sim
|
||||
run general/tag/6.sim
|
||||
#unsupport run general/tag/add.sim
|
||||
run general/tag/bigint.sim
|
||||
run general/tag/binary_binary.sim
|
||||
run general/tag/binary.sim
|
||||
run general/tag/bool_binary.sim
|
||||
run general/tag/bool_int.sim
|
||||
run general/tag/bool.sim
|
||||
#unsupport run general/tag/change.sim
|
||||
run general/tag/column.sim
|
||||
#unsupport run general/tag/commit.sim
|
||||
run general/tag/create.sim
|
||||
#unsupport run general/tag/delete.sim
|
||||
run general/tag/double.sim
|
||||
run general/tag/filter.sim
|
||||
run general/tag/float.sim
|
||||
run general/tag/int_binary.sim
|
||||
run general/tag/int_float.sim
|
||||
run general/tag/int.sim
|
||||
#unsupport run general/tag/set.sim
|
||||
run general/tag/smallint.sim
|
||||
run general/tag/tinyint.sim
|
||||
run general/user/authority.sim
|
||||
run general/user/monitor.sim
|
||||
run general/user/pass_alter.sim
|
||||
run general/user/pass_len.sim
|
||||
run general/user/user_create.sim
|
||||
run general/user/user_len.sim
|
||||
run general/vector/metrics_field.sim
|
||||
run general/vector/metrics_mix.sim
|
||||
run general/vector/metrics_query.sim
|
||||
run general/vector/metrics_tag.sim
|
||||
run general/vector/metrics_time.sim
|
||||
run general/vector/multi.sim
|
||||
run general/vector/single.sim
|
||||
run general/vector/table_field.sim
|
||||
run general/vector/table_mix.sim
|
||||
run general/vector/table_query.sim
|
||||
run general/vector/table_time.sim
|
|
@ -24,14 +24,19 @@ GREEN_DARK='\033[0;32m'
|
|||
GREEN_UNDERLINE='\033[4;32m'
|
||||
NC='\033[0m'
|
||||
|
||||
echo "### run TSIM script ###"
|
||||
echo "### run TSIM test case ###"
|
||||
cd script
|
||||
|
||||
[ -f out.log ] && rm -f out.log
|
||||
|
||||
if [ "$1" == "cron" ]; then
|
||||
echo "### run TSIM regression test ###"
|
||||
runSimCaseOneByOne regressionSuite.sim
|
||||
elif [ "$1" == "full" ]; then
|
||||
echo "### run TSIM full test ###"
|
||||
runSimCaseOneByOne fullGeneralSuite.sim
|
||||
else
|
||||
echo "### run TSIM smoke test ###"
|
||||
runSimCaseOneByOne basicSuite.sim
|
||||
fi
|
||||
|
||||
|
@ -53,14 +58,19 @@ if [ "$totalFailed" -ne "0" ]; then
|
|||
# exit $totalFailed
|
||||
fi
|
||||
|
||||
echo "### run Python script ###"
|
||||
echo "### run Python test case ###"
|
||||
cd ../pytest
|
||||
|
||||
[ -f pytest-out.log ] && rm -f pytest-out.log
|
||||
|
||||
if [ "$1" == "cron" ]; then
|
||||
echo "### run Python regression test ###"
|
||||
runPyCaseOneByOne regressiontest.sh
|
||||
elif [ "$1" == "full" ]; then
|
||||
echo "### run Python full test ###"
|
||||
runPyCaseOneByOne fulltest.sh
|
||||
else
|
||||
echo "### run Python smoke test ###"
|
||||
runPyCaseOneByOne smoketest.sh
|
||||
fi
|
||||
totalPySuccess=`grep 'successfully executed' pytest-out.log | wc -l`
|
||||
|
|
Loading…
Reference in New Issue