Merge remote-tracking branch 'origin/develop' into feature/wal
This commit is contained in:
commit
3679e3724b
|
@ -75,6 +75,7 @@ typedef struct SJoinSupporter {
|
||||||
SArray* exprList;
|
SArray* exprList;
|
||||||
SFieldInfo fieldsInfo;
|
SFieldInfo fieldsInfo;
|
||||||
STagCond tagCond;
|
STagCond tagCond;
|
||||||
|
SSqlGroupbyExpr groupInfo; // group by info
|
||||||
struct STSBuf* pTSBuf; // the TSBuf struct that holds the compressed timestamp array
|
struct STSBuf* pTSBuf; // the TSBuf struct that holds the compressed timestamp array
|
||||||
FILE* f; // temporary file in order to create TSBuf
|
FILE* f; // temporary file in order to create TSBuf
|
||||||
char path[PATH_MAX]; // temporary file path, todo dynamic allocate memory
|
char path[PATH_MAX]; // temporary file path, todo dynamic allocate memory
|
||||||
|
@ -225,7 +226,7 @@ void tscInitQueryInfo(SQueryInfo* pQueryInfo);
|
||||||
|
|
||||||
void tscClearSubqueryInfo(SSqlCmd* pCmd);
|
void tscClearSubqueryInfo(SSqlCmd* pCmd);
|
||||||
void tscFreeVgroupTableInfo(SArray* pVgroupTables);
|
void tscFreeVgroupTableInfo(SArray* pVgroupTables);
|
||||||
SArray* tscCloneVgroupTableInfo(SArray* pVgroupTables);
|
SArray* tscVgroupTableInfoClone(SArray* pVgroupTables);
|
||||||
void tscRemoveVgroupTableGroup(SArray* pVgroupTable, int32_t index);
|
void tscRemoveVgroupTableGroup(SArray* pVgroupTable, int32_t index);
|
||||||
|
|
||||||
int tscGetSTableVgroupInfo(SSqlObj* pSql, int32_t clauseIndex);
|
int tscGetSTableVgroupInfo(SSqlObj* pSql, int32_t clauseIndex);
|
||||||
|
@ -265,6 +266,7 @@ void addGroupInfoForSubquery(SSqlObj* pParentObj, SSqlObj* pSql, int32_t sub
|
||||||
void doAddGroupColumnForSubquery(SQueryInfo* pQueryInfo, int32_t tagIndex);
|
void doAddGroupColumnForSubquery(SQueryInfo* pQueryInfo, int32_t tagIndex);
|
||||||
|
|
||||||
int16_t tscGetJoinTagColIdByUid(STagCond* pTagCond, uint64_t uid);
|
int16_t tscGetJoinTagColIdByUid(STagCond* pTagCond, uint64_t uid);
|
||||||
|
int16_t tscGetTagColIndexById(STableMeta* pTableMeta, int16_t colId);
|
||||||
|
|
||||||
void tscPrintSelectClause(SSqlObj* pSql, int32_t subClauseIndex);
|
void tscPrintSelectClause(SSqlObj* pSql, int32_t subClauseIndex);
|
||||||
|
|
||||||
|
|
|
@ -77,7 +77,7 @@ SSchema *tscGetTableColumnSchema(const STableMeta *pMeta, int32_t colIndex);
|
||||||
* @param colId
|
* @param colId
|
||||||
* @return
|
* @return
|
||||||
*/
|
*/
|
||||||
SSchema* tscGetTableColumnSchemaById(STableMeta* pTableMeta, int16_t colId);
|
SSchema* tscGetColumnSchemaById(STableMeta* pTableMeta, int16_t colId);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* check if the schema is valid or not, including following aspects:
|
* check if the schema is valid or not, including following aspects:
|
||||||
|
|
|
@ -128,7 +128,7 @@ typedef struct STableMetaInfo {
|
||||||
typedef struct SSqlExpr {
|
typedef struct SSqlExpr {
|
||||||
char aliasName[TSDB_COL_NAME_LEN]; // as aliasName
|
char aliasName[TSDB_COL_NAME_LEN]; // as aliasName
|
||||||
SColIndex colInfo;
|
SColIndex colInfo;
|
||||||
int64_t uid; // refactor use the pointer
|
uint64_t uid; // refactor use the pointer
|
||||||
int16_t functionId; // function id in aAgg array
|
int16_t functionId; // function id in aAgg array
|
||||||
int16_t resType; // return value type
|
int16_t resType; // return value type
|
||||||
int16_t resBytes; // length of return value
|
int16_t resBytes; // length of return value
|
||||||
|
|
|
@ -405,7 +405,8 @@ void tscTableMetaCallBack(void *param, TAOS_RES *res, int code) {
|
||||||
SSqlRes *pRes = &pSql->res;
|
SSqlRes *pRes = &pSql->res;
|
||||||
pRes->code = code;
|
pRes->code = code;
|
||||||
|
|
||||||
const char* msg = (pCmd->command == TSDB_SQL_STABLEVGROUP)? "vgroup-list":"table-meta";
|
SSqlObj *sub = (SSqlObj*) res;
|
||||||
|
const char* msg = (sub->cmd.command == TSDB_SQL_STABLEVGROUP)? "vgroup-list":"table-meta";
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
tscError("%p get %s failed, code:%s", pSql, msg, tstrerror(code));
|
tscError("%p get %s failed, code:%s", pSql, msg, tstrerror(code));
|
||||||
goto _error;
|
goto _error;
|
||||||
|
|
|
@ -1253,10 +1253,11 @@ bool genFinalResults(SSqlObj *pSql, SLocalReducer *pLocalReducer, bool noMoreCur
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
void resetOutputBuf(SQueryInfo *pQueryInfo, SLocalReducer *pLocalReducer) { // reset output buffer to the beginning
|
void resetOutputBuf(SQueryInfo *pQueryInfo, SLocalReducer *pLocalReducer) {// reset output buffer to the beginning
|
||||||
for (int32_t i = 0; i < pQueryInfo->fieldsInfo.numOfOutput; ++i) {
|
size_t t = tscSqlExprNumOfExprs(pQueryInfo);
|
||||||
pLocalReducer->pCtx[i].aOutputBuf =
|
for (int32_t i = 0; i < t; ++i) {
|
||||||
pLocalReducer->pResultBuf->data + tscFieldInfoGetOffset(pQueryInfo, i) * pLocalReducer->resColModel->capacity;
|
SSqlExpr* pExpr = tscSqlExprGet(pQueryInfo, i);
|
||||||
|
pLocalReducer->pCtx[i].aOutputBuf = pLocalReducer->pResultBuf->data + pExpr->offset * pLocalReducer->resColModel->capacity;
|
||||||
}
|
}
|
||||||
|
|
||||||
memset(pLocalReducer->pResultBuf, 0, pLocalReducer->nResultBufSize + sizeof(tFilePage));
|
memset(pLocalReducer->pResultBuf, 0, pLocalReducer->nResultBufSize + sizeof(tFilePage));
|
||||||
|
@ -1501,8 +1502,7 @@ int32_t tscDoLocalMerge(SSqlObj *pSql) {
|
||||||
if (pLocalReducer->discard && sameGroup) {
|
if (pLocalReducer->discard && sameGroup) {
|
||||||
pLocalReducer->hasUnprocessedRow = false;
|
pLocalReducer->hasUnprocessedRow = false;
|
||||||
tmpBuffer->num = 0;
|
tmpBuffer->num = 0;
|
||||||
} else {
|
} else { // current row does not belongs to the previous group, so it is not be handled yet.
|
||||||
// current row does not belongs to the previous group, so it is not be handled yet.
|
|
||||||
pLocalReducer->hasUnprocessedRow = true;
|
pLocalReducer->hasUnprocessedRow = true;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -2728,7 +2728,6 @@ static bool functionCompatibleCheck(SQueryInfo* pQueryInfo, bool joinQuery) {
|
||||||
int32_t parseGroupbyClause(SQueryInfo* pQueryInfo, tVariantList* pList, SSqlCmd* pCmd) {
|
int32_t parseGroupbyClause(SQueryInfo* pQueryInfo, tVariantList* pList, SSqlCmd* pCmd) {
|
||||||
const char* msg1 = "too many columns in group by clause";
|
const char* msg1 = "too many columns in group by clause";
|
||||||
const char* msg2 = "invalid column name in group by clause";
|
const char* msg2 = "invalid column name in group by clause";
|
||||||
// const char* msg3 = "group by columns must belong to one table";
|
|
||||||
const char* msg7 = "not support group by expression";
|
const char* msg7 = "not support group by expression";
|
||||||
const char* msg8 = "not allowed column type for group by";
|
const char* msg8 = "not allowed column type for group by";
|
||||||
const char* msg9 = "tags not allowed for table query";
|
const char* msg9 = "tags not allowed for table query";
|
||||||
|
@ -2803,7 +2802,7 @@ int32_t parseGroupbyClause(SQueryInfo* pQueryInfo, tVariantList* pList, SSqlCmd*
|
||||||
tscColumnListInsert(pTableMetaInfo->tagColList, &index);
|
tscColumnListInsert(pTableMetaInfo->tagColList, &index);
|
||||||
} else {
|
} else {
|
||||||
// check if the column type is valid, here only support the bool/tinyint/smallint/bigint group by
|
// check if the column type is valid, here only support the bool/tinyint/smallint/bigint group by
|
||||||
if (pSchema->type > TSDB_DATA_TYPE_BINARY) {
|
if (pSchema->type == TSDB_DATA_TYPE_TIMESTAMP || pSchema->type == TSDB_DATA_TYPE_FLOAT || pSchema->type == TSDB_DATA_TYPE_DOUBLE) {
|
||||||
return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg8);
|
return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg8);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -5281,20 +5280,26 @@ void addGroupInfoForSubquery(SSqlObj* pParentObj, SSqlObj* pSql, int32_t subClau
|
||||||
|
|
||||||
if (pParentQueryInfo->groupbyExpr.numOfGroupCols > 0) {
|
if (pParentQueryInfo->groupbyExpr.numOfGroupCols > 0) {
|
||||||
SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, subClauseIndex);
|
SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, subClauseIndex);
|
||||||
|
SSqlExpr* pExpr = NULL;
|
||||||
|
|
||||||
size_t size = taosArrayGetSize(pQueryInfo->exprList);
|
size_t size = taosArrayGetSize(pQueryInfo->exprList);
|
||||||
|
if (size > 0) {
|
||||||
SSqlExpr* pExpr = tscSqlExprGet(pQueryInfo, (int32_t)size - 1);
|
pExpr = tscSqlExprGet(pQueryInfo, (int32_t)size - 1);
|
||||||
|
}
|
||||||
|
|
||||||
if (pExpr->functionId != TSDB_FUNC_TAG) {
|
if (pExpr == NULL || pExpr->functionId != TSDB_FUNC_TAG) {
|
||||||
STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, tableIndex);
|
STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pParentQueryInfo, tableIndex);
|
||||||
int16_t columnInfo = tscGetJoinTagColIdByUid(&pQueryInfo->tagCond, pTableMetaInfo->pTableMeta->id.uid);
|
|
||||||
SColumnIndex index = {.tableIndex = 0, .columnIndex = columnInfo};
|
int16_t colId = tscGetJoinTagColIdByUid(&pQueryInfo->tagCond, pTableMetaInfo->pTableMeta->id.uid);
|
||||||
SSchema* pSchema = tscGetTableTagSchema(pTableMetaInfo->pTableMeta);
|
|
||||||
|
SSchema* pTagSchema = tscGetColumnSchemaById(pTableMetaInfo->pTableMeta, colId);
|
||||||
|
int16_t colIndex = tscGetTagColIndexById(pTableMetaInfo->pTableMeta, colId);
|
||||||
|
SColumnIndex index = {.tableIndex = 0, .columnIndex = colIndex};
|
||||||
|
|
||||||
|
char* name = pTagSchema->name;
|
||||||
|
int16_t type = pTagSchema->type;
|
||||||
|
int16_t bytes = pTagSchema->bytes;
|
||||||
|
|
||||||
int16_t type = pSchema[index.columnIndex].type;
|
|
||||||
int16_t bytes = pSchema[index.columnIndex].bytes;
|
|
||||||
char* name = pSchema[index.columnIndex].name;
|
|
||||||
|
|
||||||
pExpr = tscSqlExprAppend(pQueryInfo, TSDB_FUNC_TAG, &index, type, bytes, bytes, true);
|
pExpr = tscSqlExprAppend(pQueryInfo, TSDB_FUNC_TAG, &index, type, bytes, bytes, true);
|
||||||
pExpr->colInfo.flag = TSDB_COL_TAG;
|
pExpr->colInfo.flag = TSDB_COL_TAG;
|
||||||
|
|
||||||
|
|
|
@ -118,7 +118,7 @@ SSchema* tscGetTableColumnSchema(const STableMeta* pTableMeta, int32_t colIndex)
|
||||||
}
|
}
|
||||||
|
|
||||||
// TODO for large number of columns, employ the binary search method
|
// TODO for large number of columns, employ the binary search method
|
||||||
SSchema* tscGetTableColumnSchemaById(STableMeta* pTableMeta, int16_t colId) {
|
SSchema* tscGetColumnSchemaById(STableMeta* pTableMeta, int16_t colId) {
|
||||||
STableComInfo tinfo = tscGetTableInfo(pTableMeta);
|
STableComInfo tinfo = tscGetTableInfo(pTableMeta);
|
||||||
|
|
||||||
for(int32_t i = 0; i < tinfo.numOfColumns + tinfo.numOfTags; ++i) {
|
for(int32_t i = 0; i < tinfo.numOfColumns + tinfo.numOfTags; ++i) {
|
||||||
|
|
|
@ -150,7 +150,7 @@ void tscProcessHeartBeatRsp(void *param, TAOS_RES *tres, int code) {
|
||||||
if (pObj == NULL) return;
|
if (pObj == NULL) return;
|
||||||
|
|
||||||
if (pObj != pObj->signature) {
|
if (pObj != pObj->signature) {
|
||||||
tscError("heart beat msg, pObj:%p, signature:%p invalid", pObj, pObj->signature);
|
tscError("heartbeat msg, pObj:%p, signature:%p invalid", pObj, pObj->signature);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -175,12 +175,12 @@ void tscProcessHeartBeatRsp(void *param, TAOS_RES *tres, int code) {
|
||||||
if (pRsp->streamId) tscKillStream(pObj, htonl(pRsp->streamId));
|
if (pRsp->streamId) tscKillStream(pObj, htonl(pRsp->streamId));
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
tscDebug("heartbeat failed, code:%s", tstrerror(code));
|
tscDebug("%p heartbeat failed, code:%s", pObj->pHb, tstrerror(code));
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pObj->pHb != NULL) {
|
if (pObj->pHb != NULL) {
|
||||||
int32_t waitingDuring = tsShellActivityTimer * 500;
|
int32_t waitingDuring = tsShellActivityTimer * 500;
|
||||||
tscDebug("%p start heartbeat in %dms", pSql, waitingDuring);
|
tscDebug("%p send heartbeat in %dms", pSql, waitingDuring);
|
||||||
|
|
||||||
taosTmrReset(tscProcessActivityTimer, waitingDuring, pObj, tscTmr, &pObj->pTimer);
|
taosTmrReset(tscProcessActivityTimer, waitingDuring, pObj, tscTmr, &pObj->pTimer);
|
||||||
} else {
|
} else {
|
||||||
|
@ -208,6 +208,7 @@ void tscProcessActivityTimer(void *handle, void *tmrId) {
|
||||||
|
|
||||||
assert(*pHB->self == pHB);
|
assert(*pHB->self == pHB);
|
||||||
|
|
||||||
|
pHB->retry = 0;
|
||||||
int32_t code = tscProcessSql(pHB);
|
int32_t code = tscProcessSql(pHB);
|
||||||
taosCacheRelease(tscObjCache, (void**) &p, false);
|
taosCacheRelease(tscObjCache, (void**) &p, false);
|
||||||
|
|
||||||
|
@ -552,11 +553,28 @@ static int32_t tscEstimateQueryMsgSize(SSqlCmd *pCmd, int32_t clauseIndex) {
|
||||||
SQueryInfo * pQueryInfo = tscGetQueryInfoDetail(pCmd, clauseIndex);
|
SQueryInfo * pQueryInfo = tscGetQueryInfoDetail(pCmd, clauseIndex);
|
||||||
|
|
||||||
int32_t srcColListSize = (int32_t)(taosArrayGetSize(pQueryInfo->colList) * sizeof(SColumnInfo));
|
int32_t srcColListSize = (int32_t)(taosArrayGetSize(pQueryInfo->colList) * sizeof(SColumnInfo));
|
||||||
|
|
||||||
size_t numOfExprs = tscSqlExprNumOfExprs(pQueryInfo);
|
size_t numOfExprs = tscSqlExprNumOfExprs(pQueryInfo);
|
||||||
int32_t exprSize = (int32_t)(sizeof(SSqlFuncMsg) * numOfExprs);
|
int32_t exprSize = (int32_t)(sizeof(SSqlFuncMsg) * numOfExprs);
|
||||||
|
|
||||||
return MIN_QUERY_MSG_PKT_SIZE + minMsgSize() + sizeof(SQueryTableMsg) + srcColListSize + exprSize + 4096;
|
int32_t tsBufSize = (pQueryInfo->tsBuf != NULL) ? pQueryInfo->tsBuf->fileSize : 0;
|
||||||
|
|
||||||
|
int32_t tableSerialize = 0;
|
||||||
|
STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
|
||||||
|
if (pTableMetaInfo->pVgroupTables != NULL) {
|
||||||
|
size_t numOfGroups = taosArrayGetSize(pTableMetaInfo->pVgroupTables);
|
||||||
|
|
||||||
|
int32_t totalTables = 0;
|
||||||
|
for (int32_t i = 0; i < numOfGroups; ++i) {
|
||||||
|
SVgroupTableInfo *pTableInfo = taosArrayGet(pTableMetaInfo->pVgroupTables, i);
|
||||||
|
totalTables += (int32_t) taosArrayGetSize(pTableInfo->itemList);
|
||||||
|
}
|
||||||
|
|
||||||
|
tableSerialize = totalTables * sizeof(STableIdInfo);
|
||||||
|
}
|
||||||
|
|
||||||
|
return MIN_QUERY_MSG_PKT_SIZE + minMsgSize() + sizeof(SQueryTableMsg) + srcColListSize + exprSize + tsBufSize +
|
||||||
|
tableSerialize + 4096;
|
||||||
}
|
}
|
||||||
|
|
||||||
static char *doSerializeTableInfo(SQueryTableMsg* pQueryMsg, SSqlObj *pSql, char *pMsg) {
|
static char *doSerializeTableInfo(SQueryTableMsg* pQueryMsg, SSqlObj *pSql, char *pMsg) {
|
||||||
|
@ -1641,11 +1659,14 @@ int tscBuildHeartBeatMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
|
||||||
int size = numOfQueries * sizeof(SQueryDesc) + numOfStreams * sizeof(SStreamDesc) + sizeof(SCMHeartBeatMsg) + 100;
|
int size = numOfQueries * sizeof(SQueryDesc) + numOfStreams * sizeof(SStreamDesc) + sizeof(SCMHeartBeatMsg) + 100;
|
||||||
if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, size)) {
|
if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, size)) {
|
||||||
pthread_mutex_unlock(&pObj->mutex);
|
pthread_mutex_unlock(&pObj->mutex);
|
||||||
tscError("%p failed to malloc for heartbeat msg", pSql);
|
tscError("%p failed to create heartbeat msg", pSql);
|
||||||
return TSDB_CODE_TSC_OUT_OF_MEMORY;
|
return TSDB_CODE_TSC_OUT_OF_MEMORY;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// TODO the expired hb and client can not be identified by server till now.
|
||||||
SCMHeartBeatMsg *pHeartbeat = (SCMHeartBeatMsg *)pCmd->payload;
|
SCMHeartBeatMsg *pHeartbeat = (SCMHeartBeatMsg *)pCmd->payload;
|
||||||
|
tstrncpy(pHeartbeat->clientVer, version, tListLen(pHeartbeat->clientVer));
|
||||||
|
|
||||||
pHeartbeat->numOfQueries = numOfQueries;
|
pHeartbeat->numOfQueries = numOfQueries;
|
||||||
pHeartbeat->numOfStreams = numOfStreams;
|
pHeartbeat->numOfStreams = numOfStreams;
|
||||||
|
|
||||||
|
@ -1998,10 +2019,11 @@ static void createHBObj(STscObj* pObj) {
|
||||||
}
|
}
|
||||||
|
|
||||||
int tscProcessConnectRsp(SSqlObj *pSql) {
|
int tscProcessConnectRsp(SSqlObj *pSql) {
|
||||||
char temp[TSDB_TABLE_FNAME_LEN * 2];
|
|
||||||
STscObj *pObj = pSql->pTscObj;
|
STscObj *pObj = pSql->pTscObj;
|
||||||
SSqlRes *pRes = &pSql->res;
|
SSqlRes *pRes = &pSql->res;
|
||||||
|
|
||||||
|
char temp[TSDB_TABLE_FNAME_LEN * 2] = {0};
|
||||||
|
|
||||||
SCMConnectRsp *pConnect = (SCMConnectRsp *)pRes->pRsp;
|
SCMConnectRsp *pConnect = (SCMConnectRsp *)pRes->pRsp;
|
||||||
tstrncpy(pObj->acctId, pConnect->acctId, sizeof(pObj->acctId)); // copy acctId from response
|
tstrncpy(pObj->acctId, pConnect->acctId, sizeof(pObj->acctId)); // copy acctId from response
|
||||||
int32_t len = sprintf(temp, "%s%s%s", pObj->acctId, TS_PATH_DELIMITER, pObj->db);
|
int32_t len = sprintf(temp, "%s%s%s", pObj->acctId, TS_PATH_DELIMITER, pObj->db);
|
||||||
|
@ -2020,6 +2042,8 @@ int tscProcessConnectRsp(SSqlObj *pSql) {
|
||||||
pObj->connId = htonl(pConnect->connId);
|
pObj->connId = htonl(pConnect->connId);
|
||||||
|
|
||||||
createHBObj(pObj);
|
createHBObj(pObj);
|
||||||
|
|
||||||
|
//launch a timer to send heartbeat to maintain the connection and send status to mnode
|
||||||
taosTmrReset(tscProcessActivityTimer, tsShellActivityTimer * 500, pObj, tscTmr, &pObj->pTimer);
|
taosTmrReset(tscProcessActivityTimer, tsShellActivityTimer * 500, pObj, tscTmr, &pObj->pTimer);
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
|
|
|
@ -320,11 +320,8 @@ static int32_t tscLaunchRealSubqueries(SSqlObj* pSql) {
|
||||||
pQueryInfo->colList = pSupporter->colList;
|
pQueryInfo->colList = pSupporter->colList;
|
||||||
pQueryInfo->exprList = pSupporter->exprList;
|
pQueryInfo->exprList = pSupporter->exprList;
|
||||||
pQueryInfo->fieldsInfo = pSupporter->fieldsInfo;
|
pQueryInfo->fieldsInfo = pSupporter->fieldsInfo;
|
||||||
|
pQueryInfo->groupbyExpr = pSupporter->groupInfo;
|
||||||
|
|
||||||
pSupporter->exprList = NULL;
|
|
||||||
pSupporter->colList = NULL;
|
|
||||||
memset(&pSupporter->fieldsInfo, 0, sizeof(SFieldInfo));
|
|
||||||
|
|
||||||
SQueryInfo *pNewQueryInfo = tscGetQueryInfoDetail(&pNew->cmd, 0);
|
SQueryInfo *pNewQueryInfo = tscGetQueryInfoDetail(&pNew->cmd, 0);
|
||||||
assert(pNew->subState.numOfSub == 0 && pNew->cmd.numOfClause == 1 && pNewQueryInfo->numOfTables == 1);
|
assert(pNew->subState.numOfSub == 0 && pNew->cmd.numOfClause == 1 && pNewQueryInfo->numOfTables == 1);
|
||||||
|
|
||||||
|
@ -332,7 +329,12 @@ static int32_t tscLaunchRealSubqueries(SSqlObj* pSql) {
|
||||||
|
|
||||||
STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pNewQueryInfo, 0);
|
STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pNewQueryInfo, 0);
|
||||||
pTableMetaInfo->pVgroupTables = pSupporter->pVgroupTables;
|
pTableMetaInfo->pVgroupTables = pSupporter->pVgroupTables;
|
||||||
|
|
||||||
|
pSupporter->exprList = NULL;
|
||||||
|
pSupporter->colList = NULL;
|
||||||
pSupporter->pVgroupTables = NULL;
|
pSupporter->pVgroupTables = NULL;
|
||||||
|
memset(&pSupporter->fieldsInfo, 0, sizeof(SFieldInfo));
|
||||||
|
memset(&pSupporter->groupInfo, 0, sizeof(SSqlGroupbyExpr));
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* When handling the projection query, the offset value will be modified for table-table join, which is changed
|
* When handling the projection query, the offset value will be modified for table-table join, which is changed
|
||||||
|
@ -460,18 +462,36 @@ static void updateQueryTimeRange(SQueryInfo* pQueryInfo, STimeWindow* win) {
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t tscCompareTidTags(const void* p1, const void* p2) {
|
int32_t tidTagsCompar(const void* p1, const void* p2) {
|
||||||
const STidTags* t1 = (const STidTags*) varDataVal(p1);
|
const STidTags* t1 = (const STidTags*) (p1);
|
||||||
const STidTags* t2 = (const STidTags*) varDataVal(p2);
|
const STidTags* t2 = (const STidTags*) (p2);
|
||||||
|
|
||||||
if (t1->vgId != t2->vgId) {
|
if (t1->vgId != t2->vgId) {
|
||||||
return (t1->vgId > t2->vgId) ? 1 : -1;
|
return (t1->vgId > t2->vgId) ? 1 : -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (t1->tid != t2->tid) {
|
tstr* tag1 = (tstr*) t1->tag;
|
||||||
return (t1->tid > t2->tid) ? 1 : -1;
|
tstr* tag2 = (tstr*) t2->tag;
|
||||||
|
|
||||||
|
if (tag1->len != tag2->len) {
|
||||||
|
return (tag1->len > tag2->len)? 1: -1;
|
||||||
}
|
}
|
||||||
return 0;
|
|
||||||
|
return strncmp(tag1->data, tag2->data, tag1->len);
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t tagValCompar(const void* p1, const void* p2) {
|
||||||
|
const STidTags* t1 = (const STidTags*) varDataVal(p1);
|
||||||
|
const STidTags* t2 = (const STidTags*) varDataVal(p2);
|
||||||
|
|
||||||
|
tstr* tag1 = (tstr*) t1->tag;
|
||||||
|
tstr* tag2 = (tstr*) t2->tag;
|
||||||
|
|
||||||
|
if (tag1->len != tag2->len) {
|
||||||
|
return (tag1->len > tag2->len)? 1: -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
return strncmp(tag1->data, tag2->data, tag1->len);
|
||||||
}
|
}
|
||||||
|
|
||||||
void tscBuildVgroupTableInfo(SSqlObj* pSql, STableMetaInfo* pTableMetaInfo, SArray* tables) {
|
void tscBuildVgroupTableInfo(SSqlObj* pSql, STableMetaInfo* pTableMetaInfo, SArray* tables) {
|
||||||
|
@ -587,17 +607,19 @@ static int32_t getIntersectionOfTableTuple(SQueryInfo* pQueryInfo, SSqlObj* pPar
|
||||||
SJoinSupporter* p1 = pParentSql->pSubs[0]->param;
|
SJoinSupporter* p1 = pParentSql->pSubs[0]->param;
|
||||||
SJoinSupporter* p2 = pParentSql->pSubs[1]->param;
|
SJoinSupporter* p2 = pParentSql->pSubs[1]->param;
|
||||||
|
|
||||||
qsort(p1->pIdTagList, p1->num, p1->tagSize, tscCompareTidTags);
|
// sort according to the tag value
|
||||||
qsort(p2->pIdTagList, p2->num, p2->tagSize, tscCompareTidTags);
|
qsort(p1->pIdTagList, p1->num, p1->tagSize, tagValCompar);
|
||||||
|
qsort(p2->pIdTagList, p2->num, p2->tagSize, tagValCompar);
|
||||||
|
|
||||||
STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
|
STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
|
||||||
int16_t tagColId = tscGetJoinTagColIdByUid(&pQueryInfo->tagCond, pTableMetaInfo->pTableMeta->id.uid);
|
int16_t tagColId = tscGetJoinTagColIdByUid(&pQueryInfo->tagCond, pTableMetaInfo->pTableMeta->id.uid);
|
||||||
|
|
||||||
SSchema* pColSchema = tscGetTableColumnSchemaById(pTableMetaInfo->pTableMeta, tagColId);
|
SSchema* pColSchema = tscGetColumnSchemaById(pTableMetaInfo->pTableMeta, tagColId);
|
||||||
|
|
||||||
// int16_t for padding
|
// int16_t for padding
|
||||||
*s1 = taosArrayInit(p1->num, p1->tagSize - sizeof(int16_t));
|
int32_t size = p1->tagSize - sizeof(int16_t);
|
||||||
*s2 = taosArrayInit(p2->num, p2->tagSize - sizeof(int16_t));
|
*s1 = taosArrayInit(p1->num, size);
|
||||||
|
*s2 = taosArrayInit(p2->num, size);
|
||||||
|
|
||||||
if (!(checkForDuplicateTagVal(pColSchema, p1, pParentSql) && checkForDuplicateTagVal(pColSchema, p2, pParentSql))) {
|
if (!(checkForDuplicateTagVal(pColSchema, p1, pParentSql) && checkForDuplicateTagVal(pColSchema, p2, pParentSql))) {
|
||||||
return TSDB_CODE_QRY_DUP_JOIN_KEY;
|
return TSDB_CODE_QRY_DUP_JOIN_KEY;
|
||||||
|
@ -625,6 +647,14 @@ static int32_t getIntersectionOfTableTuple(SQueryInfo* pQueryInfo, SSqlObj* pPar
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// reorganize the tid-tag value according to both the vgroup id and tag values
|
||||||
|
// sort according to the tag value
|
||||||
|
size_t t1 = taosArrayGetSize(*s1);
|
||||||
|
size_t t2 = taosArrayGetSize(*s2);
|
||||||
|
|
||||||
|
qsort((*s1)->pData, t1, size, tidTagsCompar);
|
||||||
|
qsort((*s2)->pData, t2, size, tidTagsCompar);
|
||||||
|
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -744,10 +774,10 @@ static void tidTagRetrieveCallback(void* param, TAOS_RES* tres, int32_t numOfRow
|
||||||
tscBuildVgroupTableInfo(pParentSql, pTableMetaInfo2, s2);
|
tscBuildVgroupTableInfo(pParentSql, pTableMetaInfo2, s2);
|
||||||
|
|
||||||
SSqlObj* psub1 = pParentSql->pSubs[0];
|
SSqlObj* psub1 = pParentSql->pSubs[0];
|
||||||
((SJoinSupporter*)psub1->param)->pVgroupTables = tscCloneVgroupTableInfo(pTableMetaInfo1->pVgroupTables);
|
((SJoinSupporter*)psub1->param)->pVgroupTables = tscVgroupTableInfoClone(pTableMetaInfo1->pVgroupTables);
|
||||||
|
|
||||||
SSqlObj* psub2 = pParentSql->pSubs[1];
|
SSqlObj* psub2 = pParentSql->pSubs[1];
|
||||||
((SJoinSupporter*)psub2->param)->pVgroupTables = tscCloneVgroupTableInfo(pTableMetaInfo2->pVgroupTables);
|
((SJoinSupporter*)psub2->param)->pVgroupTables = tscVgroupTableInfoClone(pTableMetaInfo2->pVgroupTables);
|
||||||
|
|
||||||
pParentSql->subState.numOfSub = 2;
|
pParentSql->subState.numOfSub = 2;
|
||||||
pParentSql->subState.numOfRemain = pParentSql->subState.numOfSub;
|
pParentSql->subState.numOfRemain = pParentSql->subState.numOfSub;
|
||||||
|
@ -1313,6 +1343,9 @@ int32_t tscCreateJoinSubquery(SSqlObj *pSql, int16_t tableIndex, SJoinSupporter
|
||||||
return TSDB_CODE_TSC_OUT_OF_MEMORY;
|
return TSDB_CODE_TSC_OUT_OF_MEMORY;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pSupporter->groupInfo = pNewQueryInfo->groupbyExpr;
|
||||||
|
memset(&pNewQueryInfo->groupbyExpr, 0, sizeof(SSqlGroupbyExpr));
|
||||||
|
|
||||||
pNew->cmd.numOfCols = 0;
|
pNew->cmd.numOfCols = 0;
|
||||||
pNewQueryInfo->interval.interval = 0;
|
pNewQueryInfo->interval.interval = 0;
|
||||||
pSupporter->limit = pNewQueryInfo->limit;
|
pSupporter->limit = pNewQueryInfo->limit;
|
||||||
|
@ -1333,17 +1366,9 @@ int32_t tscCreateJoinSubquery(SSqlObj *pSql, int16_t tableIndex, SJoinSupporter
|
||||||
assert(pTagCond->joinInfo.hasJoin);
|
assert(pTagCond->joinInfo.hasJoin);
|
||||||
|
|
||||||
int32_t tagColId = tscGetJoinTagColIdByUid(pTagCond, pTableMetaInfo->pTableMeta->id.uid);
|
int32_t tagColId = tscGetJoinTagColIdByUid(pTagCond, pTableMetaInfo->pTableMeta->id.uid);
|
||||||
SSchema* s = tscGetTableColumnSchemaById(pTableMetaInfo->pTableMeta, tagColId);
|
SSchema* s = tscGetColumnSchemaById(pTableMetaInfo->pTableMeta, tagColId);
|
||||||
|
|
||||||
// get the tag colId column index
|
colIndex.columnIndex = tscGetTagColIndexById(pTableMetaInfo->pTableMeta, tagColId);
|
||||||
int32_t numOfTags = tscGetNumOfTags(pTableMetaInfo->pTableMeta);
|
|
||||||
SSchema* pSchema = tscGetTableTagSchema(pTableMetaInfo->pTableMeta);
|
|
||||||
for(int32_t i = 0; i < numOfTags; ++i) {
|
|
||||||
if (pSchema[i].colId == tagColId) {
|
|
||||||
colIndex.columnIndex = i;
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
int16_t bytes = 0;
|
int16_t bytes = 0;
|
||||||
int16_t type = 0;
|
int16_t type = 0;
|
||||||
|
@ -2165,7 +2190,8 @@ static void doBuildResFromSubqueries(SSqlObj* pSql) {
|
||||||
numOfRes = (int32_t)(MIN(numOfRes, remain));
|
numOfRes = (int32_t)(MIN(numOfRes, remain));
|
||||||
}
|
}
|
||||||
|
|
||||||
if (numOfRes == 0) {
|
if (numOfRes == 0) { // no result any more, free all subquery objects
|
||||||
|
freeJoinSubqueryObj(pSql);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -80,7 +80,7 @@ int32_t tscInitRpc(const char *user, const char *secretEncrypt, void **pDnodeCon
|
||||||
|
|
||||||
|
|
||||||
void taos_init_imp(void) {
|
void taos_init_imp(void) {
|
||||||
char temp[128];
|
char temp[128] = {0};
|
||||||
|
|
||||||
errno = TSDB_CODE_SUCCESS;
|
errno = TSDB_CODE_SUCCESS;
|
||||||
srand(taosGetTimestampSec());
|
srand(taosGetTimestampSec());
|
||||||
|
@ -149,30 +149,42 @@ void taos_init_imp(void) {
|
||||||
|
|
||||||
tscRefId = taosOpenRef(200, tscCloseTscObj);
|
tscRefId = taosOpenRef(200, tscCloseTscObj);
|
||||||
|
|
||||||
|
// in other language APIs, taos_cleanup is not available yet.
|
||||||
|
// So, to make sure taos_cleanup will be invoked to clean up the allocated
|
||||||
|
// resource to suppress the valgrind warning.
|
||||||
|
atexit(taos_cleanup);
|
||||||
tscDebug("client is initialized successfully");
|
tscDebug("client is initialized successfully");
|
||||||
}
|
}
|
||||||
|
|
||||||
void taos_init() { pthread_once(&tscinit, taos_init_imp); }
|
void taos_init() { pthread_once(&tscinit, taos_init_imp); }
|
||||||
|
|
||||||
void taos_cleanup() {
|
// this function may be called by user or system, or by both simultaneously.
|
||||||
if (tscMetaCache != NULL) {
|
void taos_cleanup(void) {
|
||||||
taosCacheCleanup(tscMetaCache);
|
tscDebug("start to cleanup client environment");
|
||||||
tscMetaCache = NULL;
|
|
||||||
|
|
||||||
taosCacheCleanup(tscObjCache);
|
void* m = tscMetaCache;
|
||||||
tscObjCache = NULL;
|
if (m != NULL && atomic_val_compare_exchange_ptr(&tscMetaCache, m, 0) == m) {
|
||||||
|
taosCacheCleanup(m);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (tscQhandle != NULL) {
|
m = tscObjCache;
|
||||||
taosCleanUpScheduler(tscQhandle);
|
if (m != NULL && atomic_val_compare_exchange_ptr(&tscObjCache, m, 0) == m) {
|
||||||
tscQhandle = NULL;
|
taosCacheCleanup(m);
|
||||||
|
}
|
||||||
|
|
||||||
|
m = tscQhandle;
|
||||||
|
if (m != NULL && atomic_val_compare_exchange_ptr(&tscQhandle, m, 0) == m) {
|
||||||
|
taosCleanUpScheduler(m);
|
||||||
}
|
}
|
||||||
|
|
||||||
taosCloseRef(tscRefId);
|
taosCloseRef(tscRefId);
|
||||||
taosCleanupKeywordsTable();
|
taosCleanupKeywordsTable();
|
||||||
taosCloseLog();
|
taosCloseLog();
|
||||||
|
|
||||||
taosTmrCleanUp(tscTmr);
|
m = tscTmr;
|
||||||
|
if (m != NULL && atomic_val_compare_exchange_ptr(&tscTmr, m, 0) == m) {
|
||||||
|
taosTmrCleanUp(m);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
static int taos_options_imp(TSDB_OPTION option, const char *pStr) {
|
static int taos_options_imp(TSDB_OPTION option, const char *pStr) {
|
||||||
|
|
|
@ -1665,6 +1665,7 @@ static void freeQueryInfoImpl(SQueryInfo* pQueryInfo) {
|
||||||
if (pQueryInfo->groupbyExpr.columnInfo != NULL) {
|
if (pQueryInfo->groupbyExpr.columnInfo != NULL) {
|
||||||
taosArrayDestroy(pQueryInfo->groupbyExpr.columnInfo);
|
taosArrayDestroy(pQueryInfo->groupbyExpr.columnInfo);
|
||||||
pQueryInfo->groupbyExpr.columnInfo = NULL;
|
pQueryInfo->groupbyExpr.columnInfo = NULL;
|
||||||
|
pQueryInfo->groupbyExpr.numOfGroupCols = 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
pQueryInfo->tsBuf = tsBufDestroy(pQueryInfo->tsBuf);
|
pQueryInfo->tsBuf = tsBufDestroy(pQueryInfo->tsBuf);
|
||||||
|
@ -1713,7 +1714,7 @@ void tscRemoveVgroupTableGroup(SArray* pVgroupTable, int32_t index) {
|
||||||
taosArrayRemove(pVgroupTable, index);
|
taosArrayRemove(pVgroupTable, index);
|
||||||
}
|
}
|
||||||
|
|
||||||
SArray* tscCloneVgroupTableInfo(SArray* pVgroupTables) {
|
SArray* tscVgroupTableInfoClone(SArray* pVgroupTables) {
|
||||||
if (pVgroupTables == NULL) {
|
if (pVgroupTables == NULL) {
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
@ -1739,7 +1740,7 @@ SArray* tscCloneVgroupTableInfo(SArray* pVgroupTables) {
|
||||||
}
|
}
|
||||||
|
|
||||||
void clearAllTableMetaInfo(SQueryInfo* pQueryInfo, const char* address, bool removeFromCache) {
|
void clearAllTableMetaInfo(SQueryInfo* pQueryInfo, const char* address, bool removeFromCache) {
|
||||||
tscDebug("%p deref the table meta in cache, numOfTables:%d", address, pQueryInfo->numOfTables);
|
tscDebug("%p unref %d tables in the tableMeta cache", address, pQueryInfo->numOfTables);
|
||||||
|
|
||||||
for(int32_t i = 0; i < pQueryInfo->numOfTables; ++i) {
|
for(int32_t i = 0; i < pQueryInfo->numOfTables; ++i) {
|
||||||
STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, i);
|
STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, i);
|
||||||
|
@ -1779,6 +1780,7 @@ STableMetaInfo* tscAddTableMetaInfo(SQueryInfo* pQueryInfo, const char* name, ST
|
||||||
pTableMetaInfo->vgroupList = tscVgroupInfoClone(vgroupList);
|
pTableMetaInfo->vgroupList = tscVgroupInfoClone(vgroupList);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// TODO handle malloc failure
|
||||||
pTableMetaInfo->tagColList = taosArrayInit(4, POINTER_BYTES);
|
pTableMetaInfo->tagColList = taosArrayInit(4, POINTER_BYTES);
|
||||||
if (pTableMetaInfo->tagColList == NULL) {
|
if (pTableMetaInfo->tagColList == NULL) {
|
||||||
return NULL;
|
return NULL;
|
||||||
|
@ -1788,7 +1790,7 @@ STableMetaInfo* tscAddTableMetaInfo(SQueryInfo* pQueryInfo, const char* name, ST
|
||||||
tscColumnListCopy(pTableMetaInfo->tagColList, pTagCols, -1);
|
tscColumnListCopy(pTableMetaInfo->tagColList, pTagCols, -1);
|
||||||
}
|
}
|
||||||
|
|
||||||
pTableMetaInfo->pVgroupTables = tscCloneVgroupTableInfo(pVgroupTables);
|
pTableMetaInfo->pVgroupTables = tscVgroupTableInfoClone(pVgroupTables);
|
||||||
|
|
||||||
pQueryInfo->numOfTables += 1;
|
pQueryInfo->numOfTables += 1;
|
||||||
return pTableMetaInfo;
|
return pTableMetaInfo;
|
||||||
|
@ -2155,6 +2157,19 @@ int16_t tscGetJoinTagColIdByUid(STagCond* pTagCond, uint64_t uid) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int16_t tscGetTagColIndexById(STableMeta* pTableMeta, int16_t colId) {
|
||||||
|
int32_t numOfTags = tscGetNumOfColumns(pTableMeta);
|
||||||
|
|
||||||
|
SSchema* pSchema = tscGetTableTagSchema(pTableMeta);
|
||||||
|
for(int32_t i = 0; i < numOfTags; ++i) {
|
||||||
|
if (pSchema[i].colId == colId) {
|
||||||
|
return i;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
bool tscIsUpdateQuery(SSqlObj* pSql) {
|
bool tscIsUpdateQuery(SSqlObj* pSql) {
|
||||||
if (pSql == NULL || pSql->signature != pSql) {
|
if (pSql == NULL || pSql->signature != pSql) {
|
||||||
terrno = TSDB_CODE_TSC_DISCONNECTED;
|
terrno = TSDB_CODE_TSC_DISCONNECTED;
|
||||||
|
@ -2469,6 +2484,7 @@ void tscSCMVgroupInfoCopy(SCMVgroupInfo* dst, const SCMVgroupInfo* src) {
|
||||||
dst->vgId = src->vgId;
|
dst->vgId = src->vgId;
|
||||||
dst->numOfEps = src->numOfEps;
|
dst->numOfEps = src->numOfEps;
|
||||||
for(int32_t i = 0; i < dst->numOfEps; ++i) {
|
for(int32_t i = 0; i < dst->numOfEps; ++i) {
|
||||||
|
taosTFree(dst->epAddr[i].fqdn);
|
||||||
dst->epAddr[i].port = src->epAddr[i].port;
|
dst->epAddr[i].port = src->epAddr[i].port;
|
||||||
dst->epAddr[i].fqdn = strdup(src->epAddr[i].fqdn);
|
dst->epAddr[i].fqdn = strdup(src->epAddr[i].fqdn);
|
||||||
}
|
}
|
||||||
|
|
|
@ -1 +1 @@
|
||||||
Subproject commit 8c58c512b6acda8bcdfa48fdc7140227b5221766
|
Subproject commit 8d7bf743852897110cbdcc7c4322cd7a74d4167b
|
|
@ -64,7 +64,7 @@ typedef struct taosField {
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
DLL_EXPORT void taos_init();
|
DLL_EXPORT void taos_init();
|
||||||
DLL_EXPORT void taos_cleanup();
|
DLL_EXPORT void taos_cleanup(void);
|
||||||
DLL_EXPORT int taos_options(TSDB_OPTION option, const void *arg, ...);
|
DLL_EXPORT int taos_options(TSDB_OPTION option, const void *arg, ...);
|
||||||
DLL_EXPORT TAOS *taos_connect(const char *ip, const char *user, const char *pass, const char *db, uint16_t port);
|
DLL_EXPORT TAOS *taos_connect(const char *ip, const char *user, const char *pass, const char *db, uint16_t port);
|
||||||
DLL_EXPORT void taos_close(TAOS *taos);
|
DLL_EXPORT void taos_close(TAOS *taos);
|
||||||
|
|
|
@ -106,6 +106,10 @@ TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY12, "dummy12" )
|
||||||
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY13, "dummy13" )
|
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY13, "dummy13" )
|
||||||
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY14, "dummy14" )
|
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY14, "dummy14" )
|
||||||
|
|
||||||
|
|
||||||
|
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_NETWORK_TEST, "network-test" )
|
||||||
|
|
||||||
|
|
||||||
#ifndef TAOS_MESSAGE_C
|
#ifndef TAOS_MESSAGE_C
|
||||||
TSDB_MSG_TYPE_MAX // 105
|
TSDB_MSG_TYPE_MAX // 105
|
||||||
#endif
|
#endif
|
||||||
|
@ -781,6 +785,7 @@ typedef struct {
|
||||||
} SStreamDesc;
|
} SStreamDesc;
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
|
char clientVer[TSDB_VERSION_LEN];
|
||||||
uint32_t connId;
|
uint32_t connId;
|
||||||
int32_t pid;
|
int32_t pid;
|
||||||
int32_t numOfQueries;
|
int32_t numOfQueries;
|
||||||
|
|
|
@ -80,7 +80,10 @@ int main(int argc, char* argv[]) {
|
||||||
shellParseArgument(argc, argv, &args);
|
shellParseArgument(argc, argv, &args);
|
||||||
|
|
||||||
if (args.netTestRole && args.netTestRole[0] != 0) {
|
if (args.netTestRole && args.netTestRole[0] != 0) {
|
||||||
taosNetTest(args.host, (uint16_t)args.port, (uint16_t)args.endPort, args.pktLen, args.netTestRole);
|
taos_init();
|
||||||
|
CmdArguments cmdArgs;
|
||||||
|
memcpy(&cmdArgs, &args, sizeof(SShellArguments));
|
||||||
|
taosNetTest(&cmdArgs);
|
||||||
exit(0);
|
exit(0);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -232,12 +232,16 @@ static int32_t mnodeProcessRetrieveMsg(SMnodeMsg *pMsg) {
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t mnodeProcessHeartBeatMsg(SMnodeMsg *pMsg) {
|
static int32_t mnodeProcessHeartBeatMsg(SMnodeMsg *pMsg) {
|
||||||
SCMHeartBeatRsp *pHBRsp = (SCMHeartBeatRsp *) rpcMallocCont(sizeof(SCMHeartBeatRsp));
|
SCMHeartBeatRsp *pRsp = (SCMHeartBeatRsp *) rpcMallocCont(sizeof(SCMHeartBeatRsp));
|
||||||
if (pHBRsp == NULL) {
|
if (pRsp == NULL) {
|
||||||
return TSDB_CODE_MND_OUT_OF_MEMORY;
|
return TSDB_CODE_MND_OUT_OF_MEMORY;
|
||||||
}
|
}
|
||||||
|
|
||||||
SCMHeartBeatMsg *pHBMsg = pMsg->rpcMsg.pCont;
|
SCMHeartBeatMsg *pHBMsg = pMsg->rpcMsg.pCont;
|
||||||
|
if (taosCheckVersion(pHBMsg->clientVer, version, 3) != TSDB_CODE_SUCCESS) {
|
||||||
|
return TSDB_CODE_TSC_INVALID_VERSION; // todo change the error code
|
||||||
|
}
|
||||||
|
|
||||||
SRpcConnInfo connInfo = {0};
|
SRpcConnInfo connInfo = {0};
|
||||||
rpcGetConnInfo(pMsg->rpcMsg.handle, &connInfo);
|
rpcGetConnInfo(pMsg->rpcMsg.handle, &connInfo);
|
||||||
|
|
||||||
|
@ -251,33 +255,33 @@ static int32_t mnodeProcessHeartBeatMsg(SMnodeMsg *pMsg) {
|
||||||
if (pConn == NULL) {
|
if (pConn == NULL) {
|
||||||
// do not close existing links, otherwise
|
// do not close existing links, otherwise
|
||||||
// mError("failed to create connId, close connect");
|
// mError("failed to create connId, close connect");
|
||||||
// pHBRsp->killConnection = 1;
|
// pRsp->killConnection = 1;
|
||||||
} else {
|
} else {
|
||||||
pHBRsp->connId = htonl(pConn->connId);
|
pRsp->connId = htonl(pConn->connId);
|
||||||
mnodeSaveQueryStreamList(pConn, pHBMsg);
|
mnodeSaveQueryStreamList(pConn, pHBMsg);
|
||||||
|
|
||||||
if (pConn->killed != 0) {
|
if (pConn->killed != 0) {
|
||||||
pHBRsp->killConnection = 1;
|
pRsp->killConnection = 1;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pConn->streamId != 0) {
|
if (pConn->streamId != 0) {
|
||||||
pHBRsp->streamId = htonl(pConn->streamId);
|
pRsp->streamId = htonl(pConn->streamId);
|
||||||
pConn->streamId = 0;
|
pConn->streamId = 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pConn->queryId != 0) {
|
if (pConn->queryId != 0) {
|
||||||
pHBRsp->queryId = htonl(pConn->queryId);
|
pRsp->queryId = htonl(pConn->queryId);
|
||||||
pConn->queryId = 0;
|
pConn->queryId = 0;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pHBRsp->onlineDnodes = htonl(mnodeGetOnlineDnodesNum());
|
pRsp->onlineDnodes = htonl(mnodeGetOnlineDnodesNum());
|
||||||
pHBRsp->totalDnodes = htonl(mnodeGetDnodesNum());
|
pRsp->totalDnodes = htonl(mnodeGetDnodesNum());
|
||||||
mnodeGetMnodeEpSetForShell(&pHBRsp->epSet);
|
mnodeGetMnodeEpSetForShell(&pRsp->epSet);
|
||||||
|
|
||||||
pMsg->rpcRsp.rsp = pHBRsp;
|
pMsg->rpcRsp.rsp = pRsp;
|
||||||
pMsg->rpcRsp.len = sizeof(SCMHeartBeatRsp);
|
pMsg->rpcRsp.len = sizeof(SCMHeartBeatRsp);
|
||||||
|
|
||||||
mnodeReleaseConn(pConn);
|
mnodeReleaseConn(pConn);
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
|
@ -940,7 +940,6 @@ static char *getDataBlock(SQueryRuntimeEnv *pRuntimeEnv, SArithmeticSupport *sas
|
||||||
sas->data = calloc(pQuery->numOfCols, POINTER_BYTES);
|
sas->data = calloc(pQuery->numOfCols, POINTER_BYTES);
|
||||||
|
|
||||||
if (sas->data == NULL) {
|
if (sas->data == NULL) {
|
||||||
finalizeQueryResult(pRuntimeEnv); // clean up allocated resource during query
|
|
||||||
longjmp(pRuntimeEnv->env, TSDB_CODE_QRY_OUT_OF_MEMORY);
|
longjmp(pRuntimeEnv->env, TSDB_CODE_QRY_OUT_OF_MEMORY);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1003,7 +1002,6 @@ static void blockwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SDataStatis *
|
||||||
|
|
||||||
SArithmeticSupport *sasArray = calloc((size_t)pQuery->numOfOutput, sizeof(SArithmeticSupport));
|
SArithmeticSupport *sasArray = calloc((size_t)pQuery->numOfOutput, sizeof(SArithmeticSupport));
|
||||||
if (sasArray == NULL) {
|
if (sasArray == NULL) {
|
||||||
finalizeQueryResult(pRuntimeEnv); // clean up allocated resource during query
|
|
||||||
longjmp(pRuntimeEnv->env, TSDB_CODE_QRY_OUT_OF_MEMORY);
|
longjmp(pRuntimeEnv->env, TSDB_CODE_QRY_OUT_OF_MEMORY);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1277,7 +1275,6 @@ static void rowwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SDataStatis *pS
|
||||||
|
|
||||||
SArithmeticSupport *sasArray = calloc((size_t)pQuery->numOfOutput, sizeof(SArithmeticSupport));
|
SArithmeticSupport *sasArray = calloc((size_t)pQuery->numOfOutput, sizeof(SArithmeticSupport));
|
||||||
if (sasArray == NULL) {
|
if (sasArray == NULL) {
|
||||||
finalizeQueryResult(pRuntimeEnv); // clean up allocated resource during query
|
|
||||||
longjmp(pRuntimeEnv->env, TSDB_CODE_QRY_OUT_OF_MEMORY);
|
longjmp(pRuntimeEnv->env, TSDB_CODE_QRY_OUT_OF_MEMORY);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1857,8 +1854,11 @@ static bool needReverseScan(SQuery *pQuery) {
|
||||||
}
|
}
|
||||||
|
|
||||||
if (functionId == TSDB_FUNC_LAST || functionId == TSDB_FUNC_LAST_DST) {
|
if (functionId == TSDB_FUNC_LAST || functionId == TSDB_FUNC_LAST_DST) {
|
||||||
|
// the scan order to acquire the last result of the specified column
|
||||||
int32_t order = (int32_t)pQuery->pSelectExpr[i].base.arg->argValue.i64;
|
int32_t order = (int32_t)pQuery->pSelectExpr[i].base.arg->argValue.i64;
|
||||||
return order != pQuery->order.order;
|
if (order != pQuery->order.order) {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -3667,7 +3667,6 @@ void scanOneTableDataBlocks(SQueryRuntimeEnv *pRuntimeEnv, TSKEY start) {
|
||||||
|
|
||||||
// check if query is killed or not
|
// check if query is killed or not
|
||||||
if (IS_QUERY_KILLED(pQInfo)) {
|
if (IS_QUERY_KILLED(pQInfo)) {
|
||||||
finalizeQueryResult(pRuntimeEnv); // clean up allocated resource during query
|
|
||||||
longjmp(pRuntimeEnv->env, TSDB_CODE_TSC_QUERY_CANCELLED);
|
longjmp(pRuntimeEnv->env, TSDB_CODE_TSC_QUERY_CANCELLED);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -4310,7 +4309,6 @@ void skipBlocks(SQueryRuntimeEnv *pRuntimeEnv) {
|
||||||
SDataBlockInfo blockInfo = SDATA_BLOCK_INITIALIZER;
|
SDataBlockInfo blockInfo = SDATA_BLOCK_INITIALIZER;
|
||||||
while (tsdbNextDataBlock(pQueryHandle)) {
|
while (tsdbNextDataBlock(pQueryHandle)) {
|
||||||
if (IS_QUERY_KILLED(GET_QINFO_ADDR(pRuntimeEnv))) {
|
if (IS_QUERY_KILLED(GET_QINFO_ADDR(pRuntimeEnv))) {
|
||||||
finalizeQueryResult(pRuntimeEnv); // clean up allocated resource during query
|
|
||||||
longjmp(pRuntimeEnv->env, TSDB_CODE_TSC_QUERY_CANCELLED);
|
longjmp(pRuntimeEnv->env, TSDB_CODE_TSC_QUERY_CANCELLED);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -5174,7 +5172,7 @@ static void doSaveContext(SQInfo *pQInfo) {
|
||||||
SWITCH_ORDER(pQuery->order.order);
|
SWITCH_ORDER(pQuery->order.order);
|
||||||
|
|
||||||
if (pRuntimeEnv->pTSBuf != NULL) {
|
if (pRuntimeEnv->pTSBuf != NULL) {
|
||||||
pRuntimeEnv->pTSBuf->cur.order = pQuery->order.order;
|
SWITCH_ORDER(pRuntimeEnv->pTSBuf->cur.order);
|
||||||
}
|
}
|
||||||
|
|
||||||
STsdbQueryCond cond = {
|
STsdbQueryCond cond = {
|
||||||
|
@ -5267,7 +5265,6 @@ static void multiTableQueryProcess(SQInfo *pQInfo) {
|
||||||
// query error occurred or query is killed, abort current execution
|
// query error occurred or query is killed, abort current execution
|
||||||
if (pQInfo->code != TSDB_CODE_SUCCESS || IS_QUERY_KILLED(pQInfo)) {
|
if (pQInfo->code != TSDB_CODE_SUCCESS || IS_QUERY_KILLED(pQInfo)) {
|
||||||
qDebug("QInfo:%p query killed or error occurred, code:%s, abort", pQInfo, tstrerror(pQInfo->code));
|
qDebug("QInfo:%p query killed or error occurred, code:%s, abort", pQInfo, tstrerror(pQInfo->code));
|
||||||
finalizeQueryResult(pRuntimeEnv); // clean up allocated resource during query
|
|
||||||
longjmp(pRuntimeEnv->env, TSDB_CODE_TSC_QUERY_CANCELLED);
|
longjmp(pRuntimeEnv->env, TSDB_CODE_TSC_QUERY_CANCELLED);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -5289,7 +5286,8 @@ static void multiTableQueryProcess(SQInfo *pQInfo) {
|
||||||
|
|
||||||
if (pQInfo->code != TSDB_CODE_SUCCESS || IS_QUERY_KILLED(pQInfo)) {
|
if (pQInfo->code != TSDB_CODE_SUCCESS || IS_QUERY_KILLED(pQInfo)) {
|
||||||
qDebug("QInfo:%p query killed or error occurred, code:%s, abort", pQInfo, tstrerror(pQInfo->code));
|
qDebug("QInfo:%p query killed or error occurred, code:%s, abort", pQInfo, tstrerror(pQInfo->code));
|
||||||
finalizeQueryResult(pRuntimeEnv); // clean up allocated resource during query
|
//TODO finalizeQueryResult may cause SEGSEV, since the memory may not allocated yet, add a cleanup function instead
|
||||||
|
// finalizeQueryResult(pRuntimeEnv); // clean up allocated resource during query
|
||||||
longjmp(pRuntimeEnv->env, TSDB_CODE_TSC_QUERY_CANCELLED);
|
longjmp(pRuntimeEnv->env, TSDB_CODE_TSC_QUERY_CANCELLED);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -5329,7 +5327,6 @@ static void tableFixedOutputProcess(SQInfo *pQInfo, STableQueryInfo* pTableInfo)
|
||||||
finalizeQueryResult(pRuntimeEnv);
|
finalizeQueryResult(pRuntimeEnv);
|
||||||
|
|
||||||
if (IS_QUERY_KILLED(pQInfo)) {
|
if (IS_QUERY_KILLED(pQInfo)) {
|
||||||
finalizeQueryResult(pRuntimeEnv); // clean up allocated resource during query
|
|
||||||
longjmp(pRuntimeEnv->env, TSDB_CODE_TSC_QUERY_CANCELLED);
|
longjmp(pRuntimeEnv->env, TSDB_CODE_TSC_QUERY_CANCELLED);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -251,16 +251,16 @@ static const char isIdChar[] = {
|
||||||
1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 0, 0, 0, 0, 0, /* 7x */
|
1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 0, 0, 0, 0, 0, /* 7x */
|
||||||
};
|
};
|
||||||
|
|
||||||
static void* KeywordHashTable = NULL;
|
static void* keywordHashTable = NULL;
|
||||||
|
|
||||||
static void doInitKeywordsTable(void) {
|
static void doInitKeywordsTable(void) {
|
||||||
int numOfEntries = tListLen(keywordTable);
|
int numOfEntries = tListLen(keywordTable);
|
||||||
|
|
||||||
KeywordHashTable = taosHashInit(numOfEntries, MurmurHash3_32, true, false);
|
keywordHashTable = taosHashInit(numOfEntries, MurmurHash3_32, true, false);
|
||||||
for (int32_t i = 0; i < numOfEntries; i++) {
|
for (int32_t i = 0; i < numOfEntries; i++) {
|
||||||
keywordTable[i].len = (uint8_t)strlen(keywordTable[i].name);
|
keywordTable[i].len = (uint8_t)strlen(keywordTable[i].name);
|
||||||
void* ptr = &keywordTable[i];
|
void* ptr = &keywordTable[i];
|
||||||
taosHashPut(KeywordHashTable, keywordTable[i].name, keywordTable[i].len, (void*)&ptr, POINTER_BYTES);
|
taosHashPut(keywordHashTable, keywordTable[i].name, keywordTable[i].len, (void*)&ptr, POINTER_BYTES);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -282,7 +282,7 @@ int tSQLKeywordCode(const char* z, int n) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
SKeyword** pKey = (SKeyword**)taosHashGet(KeywordHashTable, key, n);
|
SKeyword** pKey = (SKeyword**)taosHashGet(keywordHashTable, key, n);
|
||||||
return (pKey != NULL)? (*pKey)->type:TK_ID;
|
return (pKey != NULL)? (*pKey)->type:TK_ID;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -660,5 +660,8 @@ SStrToken tStrGetToken(char* str, int32_t* i, bool isPrevOptr, uint32_t numOfIgn
|
||||||
bool isKeyWord(const char* z, int32_t len) { return (tSQLKeywordCode((char*)z, len) != TK_ID); }
|
bool isKeyWord(const char* z, int32_t len) { return (tSQLKeywordCode((char*)z, len) != TK_ID); }
|
||||||
|
|
||||||
void taosCleanupKeywordsTable() {
|
void taosCleanupKeywordsTable() {
|
||||||
taosHashCleanup(KeywordHashTable);
|
void* m = keywordHashTable;
|
||||||
|
if (m != NULL && atomic_val_compare_exchange_ptr(&keywordHashTable, m, 0) == m) {
|
||||||
|
taosHashCleanup(m);
|
||||||
|
}
|
||||||
}
|
}
|
|
@ -1076,6 +1076,13 @@ static void *rpcProcessMsgFromPeer(SRecvInfo *pRecv) {
|
||||||
tDebug("%s %p %p, %s is sent with error code:0x%x", pRpc->label, pConn, (void *)pHead->ahandle, taosMsg[pHead->msgType+1], code);
|
tDebug("%s %p %p, %s is sent with error code:0x%x", pRpc->label, pConn, (void *)pHead->ahandle, taosMsg[pHead->msgType+1], code);
|
||||||
}
|
}
|
||||||
} else { // msg is passed to app only parsing is ok
|
} else { // msg is passed to app only parsing is ok
|
||||||
|
|
||||||
|
if (pHead->msgType == TSDB_MSG_TYPE_NETWORK_TEST) {
|
||||||
|
rpcSendQuickRsp(pConn, TSDB_CODE_SUCCESS);
|
||||||
|
rpcFreeMsg(pRecv->msg);
|
||||||
|
return pConn;
|
||||||
|
}
|
||||||
|
|
||||||
rpcProcessIncomingMsg(pConn, pHead, pContext);
|
rpcProcessIncomingMsg(pConn, pHead, pContext);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -562,12 +562,12 @@ int tsdbUnlockRepoMeta(STsdbRepo *pRepo) {
|
||||||
void tsdbRefTable(STable *pTable) {
|
void tsdbRefTable(STable *pTable) {
|
||||||
int32_t ref = T_REF_INC(pTable);
|
int32_t ref = T_REF_INC(pTable);
|
||||||
UNUSED(ref);
|
UNUSED(ref);
|
||||||
// tsdbDebug("ref table %"PRIu64", tid:%d, refCount:%d", TABLE_UID(pTable), TABLE_TID(pTable), ref);
|
tsdbDebug("ref table %s uid %" PRIu64 " tid:%d, refCount:%d", TABLE_CHAR_NAME(pTable), TABLE_UID(pTable), TABLE_TID(pTable), ref);
|
||||||
}
|
}
|
||||||
|
|
||||||
void tsdbUnRefTable(STable *pTable) {
|
void tsdbUnRefTable(STable *pTable) {
|
||||||
int32_t ref = T_REF_DEC(pTable);
|
int32_t ref = T_REF_DEC(pTable);
|
||||||
tsdbDebug("unref table uid:%"PRIu64", tid:%d, refCount:%d", TABLE_UID(pTable), TABLE_TID(pTable), ref);
|
tsdbDebug("unref table %s uid:%"PRIu64" tid:%d, refCount:%d", TABLE_CHAR_NAME(pTable), TABLE_UID(pTable), TABLE_TID(pTable), ref);
|
||||||
|
|
||||||
if (ref == 0) {
|
if (ref == 0) {
|
||||||
// tsdbDebug("destory table name:%s uid:%"PRIu64", tid:%d", TABLE_CHAR_NAME(pTable), TABLE_UID(pTable), TABLE_TID(pTable));
|
// tsdbDebug("destory table name:%s uid:%"PRIu64", tid:%d", TABLE_CHAR_NAME(pTable), TABLE_UID(pTable), TABLE_TID(pTable));
|
||||||
|
@ -745,7 +745,7 @@ static STable *tsdbCreateTableFromCfg(STableCfg *pCfg, bool isSuper) {
|
||||||
|
|
||||||
T_REF_INC(pTable);
|
T_REF_INC(pTable);
|
||||||
|
|
||||||
tsdbTrace("table %s tid %d uid %" PRIu64 " is created", TABLE_CHAR_NAME(pTable), TABLE_TID(pTable),
|
tsdbDebug("table %s tid %d uid %" PRIu64 " is created", TABLE_CHAR_NAME(pTable), TABLE_TID(pTable),
|
||||||
TABLE_UID(pTable));
|
TABLE_UID(pTable));
|
||||||
|
|
||||||
return pTable;
|
return pTable;
|
||||||
|
@ -889,7 +889,7 @@ static void tsdbRemoveTableFromMeta(STsdbRepo *pRepo, STable *pTable, bool rmFro
|
||||||
}
|
}
|
||||||
|
|
||||||
if (lock) tsdbUnlockRepoMeta(pRepo);
|
if (lock) tsdbUnlockRepoMeta(pRepo);
|
||||||
tsdbDebug("vgId:%d table %s is removed from meta", REPO_ID(pRepo), TABLE_CHAR_NAME(pTable));
|
tsdbDebug("vgId:%d table %s uid %" PRIu64 " is removed from meta", REPO_ID(pRepo), TABLE_CHAR_NAME(pTable), TABLE_UID(pTable));
|
||||||
tsdbUnRefTable(pTable);
|
tsdbUnRefTable(pTable);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -2121,7 +2121,16 @@ STimeWindow changeTableGroupByLastrow(STableGroupInfo *groupList) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// clear current group
|
// clear current group, unref unused table
|
||||||
|
for (int32_t i = 0; i < numOfTables; ++i) {
|
||||||
|
STableKeyInfo* pKeyInfo = (STableKeyInfo*)taosArrayGet(pGroup, i);
|
||||||
|
|
||||||
|
// keyInfo.pTable may be NULL here.
|
||||||
|
if (pKeyInfo->pTable != keyInfo.pTable) {
|
||||||
|
tsdbUnRefTable(pKeyInfo->pTable);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
taosArrayClear(pGroup);
|
taosArrayClear(pGroup);
|
||||||
|
|
||||||
// more than one table in each group, only one table left for each group
|
// more than one table in each group, only one table left for each group
|
||||||
|
|
|
@ -1,6 +1,7 @@
|
||||||
CMAKE_MINIMUM_REQUIRED(VERSION 2.8)
|
CMAKE_MINIMUM_REQUIRED(VERSION 2.8)
|
||||||
PROJECT(TDengine)
|
PROJECT(TDengine)
|
||||||
|
|
||||||
|
INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/rpc/inc)
|
||||||
AUX_SOURCE_DIRECTORY(src SRC)
|
AUX_SOURCE_DIRECTORY(src SRC)
|
||||||
ADD_LIBRARY(tutil ${SRC})
|
ADD_LIBRARY(tutil ${SRC})
|
||||||
TARGET_LINK_LIBRARIES(tutil pthread osdetail lz4 z)
|
TARGET_LINK_LIBRARIES(tutil pthread osdetail lz4 z)
|
||||||
|
|
|
@ -20,7 +20,27 @@
|
||||||
extern "C" {
|
extern "C" {
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
void taosNetTest(const char* host, uint16_t port, uint16_t endPort, int pktLen, const char* netTestRole);
|
typedef struct CmdArguments {
|
||||||
|
char* host;
|
||||||
|
char* password;
|
||||||
|
char* user;
|
||||||
|
char* auth;
|
||||||
|
char* database;
|
||||||
|
char* timezone;
|
||||||
|
bool is_raw_time;
|
||||||
|
bool is_use_passwd;
|
||||||
|
char file[TSDB_FILENAME_LEN];
|
||||||
|
char dir[TSDB_FILENAME_LEN];
|
||||||
|
int threadNum;
|
||||||
|
char* commands;
|
||||||
|
int abort;
|
||||||
|
int port;
|
||||||
|
int endPort;
|
||||||
|
int pktLen;
|
||||||
|
char* netTestRole;
|
||||||
|
} CmdArguments;
|
||||||
|
|
||||||
|
void taosNetTest(CmdArguments* args);
|
||||||
|
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
}
|
}
|
||||||
|
|
|
@ -21,15 +21,42 @@
|
||||||
extern "C" {
|
extern "C" {
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
int taosOpenRef(int max, void (*fp)(void *)); // return refId which will be used by other APIs
|
// open an instance, return refId which will be used by other APIs
|
||||||
void taosCloseRef(int refId);
|
int taosOpenRef(int max, void (*fp)(void *));
|
||||||
int taosListRef(); // return the number of references in system
|
|
||||||
int taosAddRef(int refId, void *p);
|
|
||||||
int taosAcquireRef(int refId, void *p);
|
|
||||||
void taosReleaseRef(int refId, void *p);
|
|
||||||
|
|
||||||
|
// close the Ref instance
|
||||||
|
void taosCloseRef(int refId);
|
||||||
|
|
||||||
|
// add ref, p is the pointer to resource or pointer ID
|
||||||
|
int taosAddRef(int refId, void *p);
|
||||||
#define taosRemoveRef taosReleaseRef
|
#define taosRemoveRef taosReleaseRef
|
||||||
|
|
||||||
|
// acquire ref, p is the pointer to resource or pointer ID
|
||||||
|
int taosAcquireRef(int refId, void *p);
|
||||||
|
|
||||||
|
// release ref, p is the pointer to resource or pinter ID
|
||||||
|
void taosReleaseRef(int refId, void *p);
|
||||||
|
|
||||||
|
// return the first if p is null, otherwise return the next after p
|
||||||
|
void *taosIterateRef(int refId, void *p);
|
||||||
|
|
||||||
|
// return the number of references in system
|
||||||
|
int taosListRef();
|
||||||
|
|
||||||
|
/* sample code to iterate the refs
|
||||||
|
|
||||||
|
void demoIterateRefs(int refId) {
|
||||||
|
|
||||||
|
void *p = taosIterateRef(refId, NULL);
|
||||||
|
while (p) {
|
||||||
|
|
||||||
|
// process P
|
||||||
|
|
||||||
|
p = taosIterateRef(refId, p);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
*/
|
||||||
|
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
}
|
}
|
||||||
|
|
|
@ -15,11 +15,16 @@
|
||||||
|
|
||||||
#include "os.h"
|
#include "os.h"
|
||||||
#include "taosdef.h"
|
#include "taosdef.h"
|
||||||
|
#include "taosmsg.h"
|
||||||
#include "taoserror.h"
|
#include "taoserror.h"
|
||||||
#include "tulog.h"
|
#include "tulog.h"
|
||||||
#include "tconfig.h"
|
#include "tconfig.h"
|
||||||
#include "tglobal.h"
|
#include "tglobal.h"
|
||||||
#include "tsocket.h"
|
#include "tsocket.h"
|
||||||
|
#include "trpc.h"
|
||||||
|
#include "rpcHead.h"
|
||||||
|
#include "tutil.h"
|
||||||
|
#include "tnettest.h"
|
||||||
|
|
||||||
#define MAX_PKG_LEN (64*1000)
|
#define MAX_PKG_LEN (64*1000)
|
||||||
#define BUFFER_SIZE (MAX_PKG_LEN + 1024)
|
#define BUFFER_SIZE (MAX_PKG_LEN + 1024)
|
||||||
|
@ -30,9 +35,15 @@ typedef struct {
|
||||||
uint16_t pktLen;
|
uint16_t pktLen;
|
||||||
} info_s;
|
} info_s;
|
||||||
|
|
||||||
static char serverFqdn[TSDB_FQDN_LEN];
|
extern int tsRpcMaxUdpSize;
|
||||||
|
|
||||||
|
static char g_user[TSDB_USER_LEN+1] = {0};
|
||||||
|
static char g_pass[TSDB_PASSWORD_LEN+1] = {0};
|
||||||
|
static char g_serverFqdn[TSDB_FQDN_LEN] = {0};
|
||||||
static uint16_t g_startPort = 0;
|
static uint16_t g_startPort = 0;
|
||||||
static uint16_t g_endPort = 6042;
|
static uint16_t g_endPort = 6042;
|
||||||
|
static uint32_t g_pktLen = 0;
|
||||||
|
|
||||||
|
|
||||||
static void *bindUdpPort(void *sarg) {
|
static void *bindUdpPort(void *sarg) {
|
||||||
info_s *pinfo = (info_s *)sarg;
|
info_s *pinfo = (info_s *)sarg;
|
||||||
|
@ -321,20 +332,146 @@ static void checkPort(uint32_t hostIp, uint16_t startPort, uint16_t maxPort, uin
|
||||||
return ;
|
return ;
|
||||||
}
|
}
|
||||||
|
|
||||||
static void taosNetTestClient(const char* serverFqdn, uint16_t startPort, uint16_t endPort, int pktLen) {
|
void* tnetInitRpc(char* secretEncrypt, char spi) {
|
||||||
uint32_t serverIp = taosGetIpFromFqdn(serverFqdn);
|
SRpcInit rpcInit;
|
||||||
|
void* pRpcConn = NULL;
|
||||||
|
|
||||||
|
taosEncryptPass((uint8_t *)g_pass, strlen(g_pass), secretEncrypt);
|
||||||
|
|
||||||
|
memset(&rpcInit, 0, sizeof(rpcInit));
|
||||||
|
rpcInit.localPort = 0;
|
||||||
|
rpcInit.label = "NET-TEST";
|
||||||
|
rpcInit.numOfThreads = 1; // every DB connection has only one thread
|
||||||
|
rpcInit.cfp = NULL;
|
||||||
|
rpcInit.sessions = 16;
|
||||||
|
rpcInit.connType = TAOS_CONN_CLIENT;
|
||||||
|
rpcInit.user = g_user;
|
||||||
|
rpcInit.idleTime = 2000;
|
||||||
|
rpcInit.ckey = "key";
|
||||||
|
rpcInit.spi = spi;
|
||||||
|
rpcInit.secret = secretEncrypt;
|
||||||
|
|
||||||
|
pRpcConn = rpcOpen(&rpcInit);
|
||||||
|
return pRpcConn;
|
||||||
|
}
|
||||||
|
|
||||||
|
static int rpcCheckPortImpl(const char* serverFqdn, uint16_t port, uint16_t pktLen, char spi) {
|
||||||
|
SRpcEpSet epSet;
|
||||||
|
SRpcMsg reqMsg;
|
||||||
|
SRpcMsg rspMsg;
|
||||||
|
void* pRpcConn;
|
||||||
|
|
||||||
|
char secretEncrypt[32] = {0};
|
||||||
|
|
||||||
|
pRpcConn = tnetInitRpc(secretEncrypt, spi);
|
||||||
|
if (NULL == pRpcConn) {
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
memset(&epSet, 0, sizeof(SRpcEpSet));
|
||||||
|
epSet.inUse = 0;
|
||||||
|
epSet.numOfEps = 1;
|
||||||
|
epSet.port[0] = port;
|
||||||
|
strcpy(epSet.fqdn[0], serverFqdn);
|
||||||
|
|
||||||
|
reqMsg.msgType = TSDB_MSG_TYPE_NETWORK_TEST;
|
||||||
|
reqMsg.pCont = rpcMallocCont(pktLen);
|
||||||
|
reqMsg.contLen = pktLen;
|
||||||
|
reqMsg.code = 0;
|
||||||
|
reqMsg.handle = NULL; // rpc handle returned to app
|
||||||
|
reqMsg.ahandle = NULL; // app handle set by client
|
||||||
|
|
||||||
|
rpcSendRecv(pRpcConn, &epSet, &reqMsg, &rspMsg);
|
||||||
|
|
||||||
|
// handle response
|
||||||
|
if ((rspMsg.code != 0) || (rspMsg.msgType != TSDB_MSG_TYPE_NETWORK_TEST + 1)) {
|
||||||
|
//printf("code:%d[%s]\n", rspMsg.code, tstrerror(rspMsg.code));
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
rpcFreeCont(rspMsg.pCont);
|
||||||
|
|
||||||
|
rpcClose(pRpcConn);
|
||||||
|
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
static void rpcCheckPort(uint32_t hostIp) {
|
||||||
|
int ret;
|
||||||
|
char spi;
|
||||||
|
|
||||||
|
for (uint16_t port = g_startPort; port <= g_endPort; port++) {
|
||||||
|
//printf("test: %s:%d\n", info.host, port);
|
||||||
|
printf("\n");
|
||||||
|
|
||||||
|
//================ check tcp port ================
|
||||||
|
int32_t pktLen;
|
||||||
|
if (g_pktLen <= tsRpcMaxUdpSize) {
|
||||||
|
pktLen = tsRpcMaxUdpSize + 1000;
|
||||||
|
} else {
|
||||||
|
pktLen = g_pktLen;
|
||||||
|
}
|
||||||
|
|
||||||
|
spi = 1;
|
||||||
|
ret = rpcCheckPortImpl(g_serverFqdn, port, pktLen, spi);
|
||||||
|
if (ret != 0) {
|
||||||
|
spi = 0;
|
||||||
|
ret = rpcCheckPortImpl(g_serverFqdn, port, pktLen, spi);
|
||||||
|
if (ret != 0) {
|
||||||
|
printf("TCP port:%d test fail.\t\t", port);
|
||||||
|
} else {
|
||||||
|
//printf("tcp port:%d test ok.\t\t", port);
|
||||||
|
printf("TCP port:\033[32m%d test OK\033[0m\t\t", port);
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
//printf("tcp port:%d test ok.\t\t", port);
|
||||||
|
printf("TCP port:\033[32m%d test OK\033[0m\t\t", port);
|
||||||
|
}
|
||||||
|
|
||||||
|
//================ check udp port ================
|
||||||
|
if (g_pktLen >= tsRpcMaxUdpSize) {
|
||||||
|
pktLen = tsRpcMaxUdpSize - 1000;
|
||||||
|
} else {
|
||||||
|
pktLen = g_pktLen;
|
||||||
|
}
|
||||||
|
|
||||||
|
spi = 0;
|
||||||
|
ret = rpcCheckPortImpl(g_serverFqdn, port, pktLen, spi);
|
||||||
|
if (ret != 0) {
|
||||||
|
spi = 1;
|
||||||
|
ret = rpcCheckPortImpl(g_serverFqdn, port, pktLen, spi);
|
||||||
|
if (ret != 0) {
|
||||||
|
printf("udp port:%d test fail.\t\n", port);
|
||||||
|
} else {
|
||||||
|
//printf("udp port:%d test ok.\t\n", port);
|
||||||
|
printf("UDP port:\033[32m%d test OK\033[0m\t\n", port);
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
//printf("udp port:%d test ok.\t\n", port);
|
||||||
|
printf("UDP port:\033[32m%d test OK\033[0m\t\n", port);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
printf("\n");
|
||||||
|
return ;
|
||||||
|
}
|
||||||
|
|
||||||
|
static void taosNetTestClient(int flag) {
|
||||||
|
uint32_t serverIp = taosGetIpFromFqdn(g_serverFqdn);
|
||||||
if (serverIp == 0xFFFFFFFF) {
|
if (serverIp == 0xFFFFFFFF) {
|
||||||
printf("Failed to resolve FQDN:%s", serverFqdn);
|
printf("Failed to resolve FQDN:%s", g_serverFqdn);
|
||||||
exit(-1);
|
exit(-1);
|
||||||
}
|
}
|
||||||
|
|
||||||
checkPort(serverIp, startPort, endPort, pktLen);
|
if (0 == flag) {
|
||||||
|
checkPort(serverIp, g_startPort, g_endPort, g_pktLen);
|
||||||
|
} else {
|
||||||
|
rpcCheckPort(serverIp);
|
||||||
|
}
|
||||||
|
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
static void taosNetTestServer(uint16_t startPort, uint16_t endPort, int pktLen) {
|
static void taosNetTestServer(uint16_t startPort, uint16_t endPort, int pktLen) {
|
||||||
|
|
||||||
int port = startPort;
|
int port = startPort;
|
||||||
|
@ -375,49 +512,66 @@ static void taosNetTestServer(uint16_t startPort, uint16_t endPort, int pktLen)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
void taosNetTest(const char* host, uint16_t port, uint16_t endPort, int pktLen, const char* netTestRole) {
|
void taosNetTest(CmdArguments *args) {
|
||||||
if (pktLen > MAX_PKG_LEN) {
|
if (0 == args->pktLen) {
|
||||||
printf("test packet len overflow: %d, max len not greater than %d bytes\n", pktLen, MAX_PKG_LEN);
|
g_pktLen = 1000;
|
||||||
exit(-1);
|
} else {
|
||||||
|
g_pktLen = args->pktLen;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (port && endPort) {
|
if (args->port && args->endPort) {
|
||||||
if (port > endPort) {
|
if (args->port > args->endPort) {
|
||||||
printf("endPort[%d] must not lesss port[%d]\n", endPort, port);
|
printf("endPort[%d] must not lesss port[%d]\n", args->endPort, args->port);
|
||||||
exit(-1);
|
exit(-1);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if (host && host[0] != 0) {
|
if (args->host && args->host[0] != 0) {
|
||||||
if (strlen(host) >= TSDB_EP_LEN) {
|
if (strlen(args->host) >= TSDB_EP_LEN) {
|
||||||
printf("host invalid: %s\n", host);
|
printf("host invalid: %s\n", args->host);
|
||||||
exit(-1);
|
exit(-1);
|
||||||
}
|
}
|
||||||
|
|
||||||
taosGetFqdnPortFromEp(host, serverFqdn, &g_startPort);
|
taosGetFqdnPortFromEp(args->host, g_serverFqdn, &g_startPort);
|
||||||
} else {
|
} else {
|
||||||
tstrncpy(serverFqdn, "127.0.0.1", TSDB_IPv4ADDR_LEN);
|
tstrncpy(g_serverFqdn, "127.0.0.1", TSDB_IPv4ADDR_LEN);
|
||||||
g_startPort = tsServerPort;
|
g_startPort = tsServerPort;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (port) {
|
if (args->port) {
|
||||||
g_startPort = port;
|
g_startPort = args->port;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (endPort) {
|
if (args->endPort) {
|
||||||
g_endPort = endPort;
|
g_endPort = args->endPort;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (port > endPort) {
|
if (g_startPort > g_endPort) {
|
||||||
printf("endPort[%d] must not lesss port[%d]\n", g_endPort, g_startPort);
|
printf("endPort[%d] must not lesss port[%d]\n", g_endPort, g_startPort);
|
||||||
exit(-1);
|
exit(-1);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
if (0 == strcmp("client", netTestRole)) {
|
if (args->is_use_passwd) {
|
||||||
printf("host: %s\tstart port: %d\tend port: %d\tpacket len: %d\n", serverFqdn, g_startPort, g_endPort, pktLen);
|
if (args->password == NULL) args->password = getpass("Enter password: ");
|
||||||
taosNetTestClient(serverFqdn, g_startPort, g_endPort, pktLen);
|
} else {
|
||||||
} else if (0 == strcmp("server", netTestRole)) {
|
args->password = TSDB_DEFAULT_PASS;
|
||||||
taosNetTestServer(g_startPort, g_endPort, pktLen);
|
}
|
||||||
|
tstrncpy(g_pass, args->password, TSDB_PASSWORD_LEN);
|
||||||
|
|
||||||
|
if (args->user == NULL) {
|
||||||
|
args->user = TSDB_DEFAULT_USER;
|
||||||
|
}
|
||||||
|
tstrncpy(g_user, args->user, TSDB_USER_LEN);
|
||||||
|
|
||||||
|
if (0 == strcmp("client", args->netTestRole)) {
|
||||||
|
printf("host: %s\tstart port: %d\tend port: %d\tpacket len: %d\n", g_serverFqdn, g_startPort, g_endPort, g_pktLen);
|
||||||
|
taosNetTestClient(0);
|
||||||
|
} else if (0 == strcmp("clients", args->netTestRole)) {
|
||||||
|
printf("host: %s\tstart port: %d\tend port: %d\tpacket len: %d\n", g_serverFqdn, g_startPort, g_endPort, g_pktLen);
|
||||||
|
taosNetTestClient(1);
|
||||||
|
} else if (0 == strcmp("server", args->netTestRole)) {
|
||||||
|
taosNetTestServer(g_startPort, g_endPort, g_pktLen);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -143,8 +143,6 @@ int taosAddRef(int refId, void *p)
|
||||||
return TSDB_CODE_REF_INVALID_ID;
|
return TSDB_CODE_REF_INVALID_ID;
|
||||||
}
|
}
|
||||||
|
|
||||||
uTrace("refId:%d p:%p try to add", refId, p);
|
|
||||||
|
|
||||||
pSet = tsRefSetList + refId;
|
pSet = tsRefSetList + refId;
|
||||||
taosIncRefCount(pSet);
|
taosIncRefCount(pSet);
|
||||||
if (pSet->state != TSDB_REF_STATE_ACTIVE) {
|
if (pSet->state != TSDB_REF_STATE_ACTIVE) {
|
||||||
|
@ -203,8 +201,6 @@ int taosAcquireRef(int refId, void *p)
|
||||||
return TSDB_CODE_REF_INVALID_ID;
|
return TSDB_CODE_REF_INVALID_ID;
|
||||||
}
|
}
|
||||||
|
|
||||||
uTrace("refId:%d p:%p try to acquire", refId, p);
|
|
||||||
|
|
||||||
pSet = tsRefSetList + refId;
|
pSet = tsRefSetList + refId;
|
||||||
taosIncRefCount(pSet);
|
taosIncRefCount(pSet);
|
||||||
if (pSet->state != TSDB_REF_STATE_ACTIVE) {
|
if (pSet->state != TSDB_REF_STATE_ACTIVE) {
|
||||||
|
@ -254,8 +250,6 @@ void taosReleaseRef(int refId, void *p)
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
uTrace("refId:%d p:%p try to release", refId, p);
|
|
||||||
|
|
||||||
pSet = tsRefSetList + refId;
|
pSet = tsRefSetList + refId;
|
||||||
if (pSet->state == TSDB_REF_STATE_EMPTY) {
|
if (pSet->state == TSDB_REF_STATE_EMPTY) {
|
||||||
uTrace("refId:%d p:%p failed to release, cleaned", refId, p);
|
uTrace("refId:%d p:%p failed to release, cleaned", refId, p);
|
||||||
|
@ -305,6 +299,75 @@ void taosReleaseRef(int refId, void *p)
|
||||||
if (released) taosDecRefCount(pSet);
|
if (released) taosDecRefCount(pSet);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// if p is NULL, return the first p in hash list, otherwise, return the next after p
|
||||||
|
void *taosIterateRef(int refId, void *p) {
|
||||||
|
SRefNode *pNode = NULL;
|
||||||
|
SRefSet *pSet;
|
||||||
|
|
||||||
|
if (refId < 0 || refId >= TSDB_REF_OBJECTS) {
|
||||||
|
uTrace("refId:%d p:%p failed to iterate, refId not valid", refId, p);
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
pSet = tsRefSetList + refId;
|
||||||
|
taosIncRefCount(pSet);
|
||||||
|
if (pSet->state != TSDB_REF_STATE_ACTIVE) {
|
||||||
|
uTrace("refId:%d p:%p failed to iterate, not active", refId, p);
|
||||||
|
taosDecRefCount(pSet);
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
int hash = 0;
|
||||||
|
if (p) {
|
||||||
|
hash = taosHashRef(pSet, p);
|
||||||
|
taosLockList(pSet->lockedBy+hash);
|
||||||
|
|
||||||
|
pNode = pSet->nodeList[hash];
|
||||||
|
while (pNode) {
|
||||||
|
if (pNode->p == p) break;
|
||||||
|
pNode = pNode->next;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (pNode == NULL) {
|
||||||
|
uError("refId:%d p:%p not there, quit", refId, p);
|
||||||
|
taosUnlockList(pSet->lockedBy+hash);
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
// p is there
|
||||||
|
pNode = pNode->next;
|
||||||
|
if (pNode == NULL) {
|
||||||
|
taosUnlockList(pSet->lockedBy+hash);
|
||||||
|
hash++;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (pNode == NULL) {
|
||||||
|
for (; hash < pSet->max; ++hash) {
|
||||||
|
taosLockList(pSet->lockedBy+hash);
|
||||||
|
pNode = pSet->nodeList[hash];
|
||||||
|
if (pNode) break;
|
||||||
|
taosUnlockList(pSet->lockedBy+hash);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
void *newP = NULL;
|
||||||
|
if (pNode) {
|
||||||
|
pNode->count++; // acquire it
|
||||||
|
newP = pNode->p;
|
||||||
|
taosUnlockList(pSet->lockedBy+hash);
|
||||||
|
uTrace("refId:%d p:%p is returned", refId, p);
|
||||||
|
} else {
|
||||||
|
uTrace("refId:%d p:%p the list is over", refId, p);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (p) taosReleaseRef(refId, p); // release the current one
|
||||||
|
|
||||||
|
taosDecRefCount(pSet);
|
||||||
|
|
||||||
|
return newP;
|
||||||
|
}
|
||||||
|
|
||||||
int taosListRef() {
|
int taosListRef() {
|
||||||
SRefSet *pSet;
|
SRefSet *pSet;
|
||||||
SRefNode *pNode;
|
SRefNode *pNode;
|
||||||
|
|
|
@ -326,6 +326,7 @@ int32_t taosHexStrToByteArray(char hexstr[], char bytes[]) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// TODO move to comm module
|
||||||
bool taosGetVersionNumber(char *versionStr, int *versionNubmer) {
|
bool taosGetVersionNumber(char *versionStr, int *versionNubmer) {
|
||||||
if (versionStr == NULL || versionNubmer == NULL) {
|
if (versionStr == NULL || versionNubmer == NULL) {
|
||||||
return false;
|
return false;
|
||||||
|
|
|
@ -17,6 +17,19 @@ typedef struct {
|
||||||
void **p;
|
void **p;
|
||||||
} SRefSpace;
|
} SRefSpace;
|
||||||
|
|
||||||
|
void iterateRefs(int refId) {
|
||||||
|
int count = 0;
|
||||||
|
|
||||||
|
void *p = taosIterateRef(refId, NULL);
|
||||||
|
while (p) {
|
||||||
|
// process P
|
||||||
|
count++;
|
||||||
|
p = taosIterateRef(refId, p);
|
||||||
|
}
|
||||||
|
|
||||||
|
printf(" %d ", count);
|
||||||
|
}
|
||||||
|
|
||||||
void *takeRefActions(void *param) {
|
void *takeRefActions(void *param) {
|
||||||
SRefSpace *pSpace = (SRefSpace *)param;
|
SRefSpace *pSpace = (SRefSpace *)param;
|
||||||
int code, id;
|
int code, id;
|
||||||
|
@ -44,6 +57,9 @@ void *takeRefActions(void *param) {
|
||||||
usleep(id % 5 + 1);
|
usleep(id % 5 + 1);
|
||||||
taosReleaseRef(pSpace->refId, pSpace->p[id]);
|
taosReleaseRef(pSpace->refId, pSpace->p[id]);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
id = random() % pSpace->refNum;
|
||||||
|
iterateRefs(id);
|
||||||
}
|
}
|
||||||
|
|
||||||
for (int i=0; i < pSpace->refNum; ++i) {
|
for (int i=0; i < pSpace->refNum; ++i) {
|
||||||
|
@ -63,7 +79,7 @@ void *openRefSpace(void *param) {
|
||||||
SRefSpace *pSpace = (SRefSpace *)param;
|
SRefSpace *pSpace = (SRefSpace *)param;
|
||||||
|
|
||||||
printf("c");
|
printf("c");
|
||||||
pSpace->refId = taosOpenRef(10000, myfree);
|
pSpace->refId = taosOpenRef(50, myfree);
|
||||||
|
|
||||||
if (pSpace->refId < 0) {
|
if (pSpace->refId < 0) {
|
||||||
printf("failed to open ref, reson:%s\n", tstrerror(pSpace->refId));
|
printf("failed to open ref, reson:%s\n", tstrerror(pSpace->refId));
|
||||||
|
|
|
@ -154,6 +154,7 @@ python3 ./test.py -f query/queryConnection.py
|
||||||
python3 ./test.py -f query/queryCountCSVData.py
|
python3 ./test.py -f query/queryCountCSVData.py
|
||||||
python3 ./test.py -f query/natualInterval.py
|
python3 ./test.py -f query/natualInterval.py
|
||||||
python3 ./test.py -f query/bug1471.py
|
python3 ./test.py -f query/bug1471.py
|
||||||
|
python3 ./test.py -f query/dataLossTest.py
|
||||||
|
|
||||||
#stream
|
#stream
|
||||||
python3 ./test.py -f stream/metric_1.py
|
python3 ./test.py -f stream/metric_1.py
|
||||||
|
|
|
@ -0,0 +1,76 @@
|
||||||
|
###################################################################
|
||||||
|
# Copyright (c) 2016 by TAOS Technologies, Inc.
|
||||||
|
# All rights reserved.
|
||||||
|
#
|
||||||
|
# This file is proprietary and confidential to TAOS Technologies.
|
||||||
|
# No part of this file may be reproduced, stored, transmitted,
|
||||||
|
# disclosed or used in any form or by any means other than as
|
||||||
|
# expressly provided by the written permission from Jianhui Tao
|
||||||
|
#
|
||||||
|
###################################################################
|
||||||
|
|
||||||
|
# -*- coding: utf-8 -*-
|
||||||
|
|
||||||
|
import sys
|
||||||
|
import taos
|
||||||
|
import os
|
||||||
|
from util.log import *
|
||||||
|
from util.cases import *
|
||||||
|
from util.sql import *
|
||||||
|
from util.dnodes import *
|
||||||
|
import inspect
|
||||||
|
|
||||||
|
|
||||||
|
class TDTestCase:
|
||||||
|
def init(self, conn, logSql):
|
||||||
|
tdLog.debug("start to execute %s" % __file__)
|
||||||
|
tdSql.init(conn.cursor())
|
||||||
|
|
||||||
|
self.numberOfTables = 240
|
||||||
|
self.numberOfRecords = 10000
|
||||||
|
|
||||||
|
def run(self):
|
||||||
|
tdSql.prepare()
|
||||||
|
|
||||||
|
os.system("yes | taosdemo -t %d -n %d" % (self.numberOfTables, self.numberOfRecords))
|
||||||
|
print("==============step1")
|
||||||
|
|
||||||
|
tdSql.execute("use test")
|
||||||
|
sql = "select count(*) from meters"
|
||||||
|
tdSql.query(sql)
|
||||||
|
rows = tdSql.getData(0, 0)
|
||||||
|
print ("number of records: %d" % rows)
|
||||||
|
|
||||||
|
newRows = rows
|
||||||
|
for i in range(10000):
|
||||||
|
print("kill taosd")
|
||||||
|
time.sleep(10)
|
||||||
|
os.system("sudo kill -9 $(pgrep taosd)")
|
||||||
|
tdDnodes.startWithoutSleep(1)
|
||||||
|
while True:
|
||||||
|
try:
|
||||||
|
tdSql.query(sql)
|
||||||
|
newRows = tdSql.getData(0, 0)
|
||||||
|
print("numer of records after kill taosd %d" % newRows)
|
||||||
|
time.sleep(10)
|
||||||
|
break
|
||||||
|
except Exception as e:
|
||||||
|
pass
|
||||||
|
continue
|
||||||
|
|
||||||
|
if newRows < rows:
|
||||||
|
caller = inspect.getframeinfo(inspect.stack()[1][0])
|
||||||
|
args = (caller.filename, caller.lineno, sql, newRows, rows)
|
||||||
|
tdLog.exit("%s(%d) failed: sql:%s, queryRows:%d != expect:%d" % args)
|
||||||
|
break
|
||||||
|
|
||||||
|
tdSql.query(sql)
|
||||||
|
tdSql.checkData(0, 0, rows)
|
||||||
|
|
||||||
|
def stop(self):
|
||||||
|
tdSql.close()
|
||||||
|
tdLog.success("%s successfully executed" % __file__)
|
||||||
|
|
||||||
|
|
||||||
|
tdCases.addWindows(__file__, TDTestCase())
|
||||||
|
tdCases.addLinux(__file__, TDTestCase())
|
|
@ -15,6 +15,7 @@ import sys
|
||||||
import os
|
import os
|
||||||
import os.path
|
import os.path
|
||||||
import subprocess
|
import subprocess
|
||||||
|
from time import sleep
|
||||||
from util.log import *
|
from util.log import *
|
||||||
|
|
||||||
|
|
||||||
|
@ -210,6 +211,7 @@ class TDDnode:
|
||||||
(self.index, self.cfgPath))
|
(self.index, self.cfgPath))
|
||||||
|
|
||||||
def getBuildPath(self):
|
def getBuildPath(self):
|
||||||
|
buildPath = ""
|
||||||
selfPath = os.path.dirname(os.path.realpath(__file__))
|
selfPath = os.path.dirname(os.path.realpath(__file__))
|
||||||
|
|
||||||
if ("community" in selfPath):
|
if ("community" in selfPath):
|
||||||
|
@ -256,6 +258,35 @@ class TDDnode:
|
||||||
|
|
||||||
tdLog.debug("wait 5 seconds for the dnode:%d to start." % (self.index))
|
tdLog.debug("wait 5 seconds for the dnode:%d to start." % (self.index))
|
||||||
time.sleep(5)
|
time.sleep(5)
|
||||||
|
|
||||||
|
def startWithoutSleep(self):
|
||||||
|
buildPath = self.getBuildPath()
|
||||||
|
|
||||||
|
if (buildPath == ""):
|
||||||
|
tdLog.exit("taosd not found!")
|
||||||
|
else:
|
||||||
|
tdLog.info("taosd found in %s" % buildPath)
|
||||||
|
|
||||||
|
binPath = buildPath + "/build/bin/taosd"
|
||||||
|
|
||||||
|
if self.deployed == 0:
|
||||||
|
tdLog.exit("dnode:%d is not deployed" % (self.index))
|
||||||
|
|
||||||
|
if self.valgrind == 0:
|
||||||
|
cmd = "nohup %s -c %s > /dev/null 2>&1 & " % (
|
||||||
|
binPath, self.cfgDir)
|
||||||
|
else:
|
||||||
|
valgrindCmdline = "valgrind --tool=memcheck --leak-check=full --show-reachable=no --track-origins=yes --show-leak-kinds=all -v --workaround-gcc296-bugs=yes"
|
||||||
|
|
||||||
|
cmd = "nohup %s %s -c %s 2>&1 & " % (
|
||||||
|
valgrindCmdline, binPath, self.cfgDir)
|
||||||
|
|
||||||
|
print(cmd)
|
||||||
|
|
||||||
|
if os.system(cmd) != 0:
|
||||||
|
tdLog.exit(cmd)
|
||||||
|
self.running = 1
|
||||||
|
tdLog.debug("dnode:%d is running with %s " % (self.index, cmd))
|
||||||
|
|
||||||
def stop(self):
|
def stop(self):
|
||||||
if self.valgrind == 0:
|
if self.valgrind == 0:
|
||||||
|
@ -425,6 +456,10 @@ class TDDnodes:
|
||||||
def start(self, index):
|
def start(self, index):
|
||||||
self.check(index)
|
self.check(index)
|
||||||
self.dnodes[index - 1].start()
|
self.dnodes[index - 1].start()
|
||||||
|
|
||||||
|
def startWithoutSleep(self, index):
|
||||||
|
self.check(index)
|
||||||
|
self.dnodes[index - 1].startWithoutSleep()
|
||||||
|
|
||||||
def stop(self, index):
|
def stop(self, index):
|
||||||
self.check(index)
|
self.check(index)
|
||||||
|
|
|
@ -25,7 +25,7 @@ class TDSql:
|
||||||
self.queryCols = 0
|
self.queryCols = 0
|
||||||
self.affectedRows = 0
|
self.affectedRows = 0
|
||||||
|
|
||||||
def init(self, cursor, log=True):
|
def init(self, cursor, log=False):
|
||||||
self.cursor = cursor
|
self.cursor = cursor
|
||||||
|
|
||||||
if (log):
|
if (log):
|
||||||
|
|
|
@ -132,4 +132,134 @@ sql select join_mt0.ts, join_mt1.t1, join_mt0.t1, join_mt1.tbname, join_mt0.tbna
|
||||||
|
|
||||||
sql select join_mt0.ts, join_mt1.t1, join_mt0.t1, join_mt1.tbname, join_mt0.tbname from join_mt0, join_mt1 where join_mt0.ts=join_mt1.ts and join_mt0.t1=join_mt1.t1 limit 1
|
sql select join_mt0.ts, join_mt1.t1, join_mt0.t1, join_mt1.tbname, join_mt0.tbname from join_mt0, join_mt1 where join_mt0.ts=join_mt1.ts and join_mt0.t1=join_mt1.t1 limit 1
|
||||||
|
|
||||||
|
#1970-01-01 08:01:40.800 | 10 | 45.000000000 | 0 | true | false | 0 |
|
||||||
|
#1970-01-01 08:01:40.790 | 10 | 945.000000000 | 90 | true | true | 0 |
|
||||||
|
|
||||||
|
sql select count(join_mt0.c1), sum(join_mt1.c2), first(join_mt0.c5), last(join_mt1.c7), first(join_mt1.c7) from join_mt0, join_mt1 where join_mt0.t1=join_mt1.t1 and join_mt0.ts=join_mt1.ts interval(10a) group by join_mt0.t1 order by join_mt0.ts desc limit 20 offset 19;
|
||||||
|
if $rows != 100 then
|
||||||
|
return -1
|
||||||
|
endi
|
||||||
|
|
||||||
|
# c5 is null ! error
|
||||||
|
|
||||||
|
sql select count(join_mt0.c1), sum(join_mt0.c2)/count(*), avg(c2), first(join_mt0.c5), last(c7) from join_mt0 interval(10a) group by join_mt0.t1 order by join_mt0.ts desc;
|
||||||
|
if $rows != 100 then
|
||||||
|
return -1
|
||||||
|
endi
|
||||||
|
|
||||||
|
if $data00 != @70-01-01 08:01:40.990@ then
|
||||||
|
print expect 0, actual: $data00
|
||||||
|
return -1
|
||||||
|
endi
|
||||||
|
|
||||||
|
if $data01 != 30 then
|
||||||
|
return -1
|
||||||
|
endi
|
||||||
|
|
||||||
|
if $data02 != 94.500000000 then
|
||||||
|
print expect 94.500000000, actual $data02
|
||||||
|
return -1
|
||||||
|
endi
|
||||||
|
|
||||||
|
if $data03 != 94.500000000 then
|
||||||
|
return -1
|
||||||
|
endi
|
||||||
|
|
||||||
|
if $data04 != 90 then
|
||||||
|
return -1
|
||||||
|
endi
|
||||||
|
|
||||||
|
if $data05 != 1 then
|
||||||
|
return -1
|
||||||
|
endi
|
||||||
|
|
||||||
|
if $data06 != 2 then
|
||||||
|
return -1
|
||||||
|
endi
|
||||||
|
|
||||||
|
if $data10 != @70-01-01 08:01:40.980@ then
|
||||||
|
print expect 70-01-01 08:01:40.980, actual: $data10
|
||||||
|
return -1
|
||||||
|
endi
|
||||||
|
|
||||||
|
if $data11 != 30 then
|
||||||
|
return -1
|
||||||
|
endi
|
||||||
|
|
||||||
|
if $data12 != 84.500000000 then
|
||||||
|
print expect 84.500000000, actual $data12
|
||||||
|
return -1
|
||||||
|
endi
|
||||||
|
|
||||||
|
if $data13 != 84.500000000 then
|
||||||
|
return -1
|
||||||
|
endi
|
||||||
|
|
||||||
|
if $data14 != 80 then
|
||||||
|
return -1
|
||||||
|
endi
|
||||||
|
|
||||||
|
if $data15 != 1 then
|
||||||
|
return -1
|
||||||
|
endi
|
||||||
|
|
||||||
|
if $data16 != 2 then
|
||||||
|
return -1
|
||||||
|
endi
|
||||||
|
|
||||||
|
# this function will cause shell crash
|
||||||
|
sql select count(join_mt0.c1), first(join_mt0.c1) from join_mt0, join_mt1 where join_mt0.t1=join_mt1.t1 and join_mt0.ts=join_mt1.ts interval(10a) group by join_mt0.t1 order by join_mt0.ts desc;
|
||||||
|
if $rows != 100 then
|
||||||
|
return -1
|
||||||
|
endi
|
||||||
|
|
||||||
|
if $data00 != @70-01-01 08:01:40.990@ then
|
||||||
|
return -1
|
||||||
|
endi
|
||||||
|
|
||||||
|
if $data01 != 10 then
|
||||||
|
return -1
|
||||||
|
endi
|
||||||
|
|
||||||
|
if $data02 != 90 then
|
||||||
|
return -1
|
||||||
|
endi
|
||||||
|
|
||||||
|
if $data03 != 0 then
|
||||||
|
return -1
|
||||||
|
endi
|
||||||
|
|
||||||
|
if $data11 != 10 then
|
||||||
|
return -1
|
||||||
|
endi
|
||||||
|
|
||||||
|
if $data12 != 80 then
|
||||||
|
return -1
|
||||||
|
endi
|
||||||
|
|
||||||
|
if $data13 != 0 then
|
||||||
|
return -1
|
||||||
|
endi
|
||||||
|
|
||||||
|
sql select last(join_mt1.c7), first(join_mt1.c7) from join_mt0, join_mt1 where join_mt0.t1=join_mt1.t1 and join_mt0.ts=join_mt1.ts interval(10m) group by join_mt0.t1 order by join_mt0.ts asc;
|
||||||
|
if $rows != 1 then
|
||||||
|
return -1
|
||||||
|
endi
|
||||||
|
|
||||||
|
if $data00 != @70-01-01 08:00:00.000@ then
|
||||||
|
return -1
|
||||||
|
endi
|
||||||
|
|
||||||
|
if $data01 != 1 then
|
||||||
|
return -1
|
||||||
|
endi
|
||||||
|
|
||||||
|
if $data02 != 0 then
|
||||||
|
return -1
|
||||||
|
endi
|
||||||
|
|
||||||
|
if $data03 != 0 then
|
||||||
|
return -1
|
||||||
|
endi
|
||||||
|
|
||||||
system sh/exec.sh -n dnode1 -s stop -x SIGINT
|
system sh/exec.sh -n dnode1 -s stop -x SIGINT
|
|
@ -324,8 +324,22 @@ sql create table tm0 using m1 tags(1);
|
||||||
sql create table tm1 using m1 tags(2);
|
sql create table tm1 using m1 tags(2);
|
||||||
|
|
||||||
sql insert into tm0 values(10000, 1) (20000, 2)(30000, 3) (40000, NULL) (50000, 2) tm1 values(10001, 2)(20000,4)(90000,9);
|
sql insert into tm0 values(10000, 1) (20000, 2)(30000, 3) (40000, NULL) (50000, 2) tm1 values(10001, 2)(20000,4)(90000,9);
|
||||||
sql select count(*),first(k),last(k) from m1 where tbname in ('tm0') interval(1s) order by ts desc;
|
|
||||||
|
|
||||||
|
#=============================tbase-1205
|
||||||
|
sql select count(*) from tm1 where ts<now and ts>= now -1d interval(1h) fill(NULL);
|
||||||
|
|
||||||
|
print ===================>TD-1834
|
||||||
|
sql select * from tm0 where ts>11000 and ts< 20000 order by ts asc
|
||||||
|
if $rows != 0 then
|
||||||
|
return -1
|
||||||
|
endi
|
||||||
|
|
||||||
|
sql select * from tm0 where ts>11000 and ts< 20000 order by ts desc
|
||||||
|
if $rows != 0 then
|
||||||
|
return -1
|
||||||
|
endi
|
||||||
|
|
||||||
|
sql select count(*),first(k),last(k) from m1 where tbname in ('tm0') interval(1s) order by ts desc;
|
||||||
if $row != 5 then
|
if $row != 5 then
|
||||||
return -1
|
return -1
|
||||||
endi
|
endi
|
||||||
|
@ -386,7 +400,25 @@ sql_error select k+1,sum(k) from tm0;
|
||||||
sql_error select k, sum(k) from tm0;
|
sql_error select k, sum(k) from tm0;
|
||||||
sql_error select k, sum(k)+1 from tm0;
|
sql_error select k, sum(k)+1 from tm0;
|
||||||
|
|
||||||
|
print ================== restart server to commit data into disk
|
||||||
|
system sh/exec.sh -n dnode1 -s stop -x SIGINT
|
||||||
|
sleep 5000
|
||||||
|
system sh/exec.sh -n dnode1 -s start
|
||||||
|
print ================== server restart completed
|
||||||
|
|
||||||
#=============================tbase-1205
|
#=============================tbase-1205
|
||||||
sql select count(*) from tm1 where ts<now and ts>= now -1d interval(1h) fill(NULL);
|
sql select count(*) from tm1 where ts<now and ts>= now -1d interval(1h) fill(NULL);
|
||||||
|
|
||||||
|
print ===================>TD-1834
|
||||||
|
sql select * from tm0 where ts>11000 and ts< 20000 order by ts asc
|
||||||
|
if $rows != 0 then
|
||||||
|
return -1
|
||||||
|
endi
|
||||||
|
|
||||||
|
sql select * from tm0 where ts>11000 and ts< 20000 order by ts desc
|
||||||
|
if $rows != 0 then
|
||||||
|
return -1
|
||||||
|
endi
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue