Merge branch 'develop' into feature/2.0tsdb

This commit is contained in:
Hongze Cheng 2020-07-10 10:50:10 +08:00
commit 691dc5b4c7
24 changed files with 1050 additions and 808 deletions

View File

@ -87,6 +87,16 @@ typedef struct SVgroupTableInfo {
SArray* itemList; //SArray<STableIdInfo>
} SVgroupTableInfo;
static FORCE_INLINE SQueryInfo* tscGetQueryInfoDetail(SSqlCmd* pCmd, int32_t subClauseIndex) {
assert(pCmd != NULL && subClauseIndex >= 0 && subClauseIndex < TSDB_MAX_UNION_CLAUSE);
if (pCmd->pQueryInfo == NULL || subClauseIndex >= pCmd->numOfClause) {
return NULL;
}
return pCmd->pQueryInfo[subClauseIndex];
}
int32_t tscCreateDataBlock(size_t initialSize, int32_t rowSize, int32_t startOffset, const char* name,
STableMeta* pTableMeta, STableDataBlocks** dataBlocks);
void tscDestroyDataBlock(STableDataBlocks* pDataBlock);

View File

@ -477,7 +477,13 @@ void tscTableMetaCallBack(void *param, TAOS_RES *res, int code) {
tscDebug("%p redo parse sql string to build submit block", pSql);
pCmd->parseFinished = false;
if ((code = tsParseSql(pSql, true)) == TSDB_CODE_SUCCESS) {
code = tsParseSql(pSql, true);
if (code == TSDB_CODE_TSC_ACTION_IN_PROGRESS) {
return;
}
if (code == TSDB_CODE_SUCCESS) {
/*
* Discard previous built submit blocks, and then parse the sql string again and build up all submit blocks,
* and send the required submit block according to index value in supporter to server.

View File

@ -340,13 +340,12 @@ bool stableQueryFunctChanged(int32_t funcId) {
*/
void resetResultInfo(SResultInfo *pResInfo) { pResInfo->initialized = false; }
void setResultInfoBuf(SResultInfo *pResInfo, int32_t size, bool superTable) {
void setResultInfoBuf(SResultInfo *pResInfo, int32_t size, bool superTable, char* buf) {
assert(pResInfo->interResultBuf == NULL);
pResInfo->bufLen = size;
pResInfo->superTableQ = superTable;
pResInfo->interResultBuf = calloc(1, (size_t)size);
pResInfo->interResultBuf = buf;
}
// set the query flag to denote that query is completed

View File

@ -1310,6 +1310,11 @@ int tsParseSql(SSqlObj *pSql, bool initial) {
tscDebug("%p resume to parse sql: %s", pSql, pCmd->curSql);
}
ret = tscAllocPayload(&pSql->cmd, TSDB_DEFAULT_PAYLOAD_SIZE);
if (TSDB_CODE_SUCCESS != ret) {
return ret;
}
if (tscIsInsertData(pSql->sqlstr)) {
/*
* Set the fp before parse the sql string, in case of getTableMeta failed, in which
@ -1326,11 +1331,6 @@ int tsParseSql(SSqlObj *pSql, bool initial) {
ret = tsParseInsertSql(pSql);
} else {
ret = tscAllocPayload(&pSql->cmd, TSDB_DEFAULT_PAYLOAD_SIZE);
if (TSDB_CODE_SUCCESS != ret) {
return ret;
}
SSqlInfo SQLInfo = qSQLParse(pSql->sqlstr);
ret = tscToSQLCmd(pSql, &SQLInfo);
SQLInfoDestroy(&SQLInfo);

View File

@ -1464,16 +1464,6 @@ STableMetaInfo* tscGetMetaInfo(SQueryInfo* pQueryInfo, int32_t tableIndex) {
return pQueryInfo->pTableMetaInfo[tableIndex];
}
SQueryInfo* tscGetQueryInfoDetail(SSqlCmd* pCmd, int32_t subClauseIndex) {
assert(pCmd != NULL && subClauseIndex >= 0 && subClauseIndex < TSDB_MAX_UNION_CLAUSE);
if (pCmd->pQueryInfo == NULL || subClauseIndex >= pCmd->numOfClause) {
return NULL;
}
return pCmd->pQueryInfo[subClauseIndex];
}
int32_t tscGetQueryInfoDetailSafely(SSqlCmd* pCmd, int32_t subClauseIndex, SQueryInfo** pQueryInfo) {
int32_t ret = TSDB_CODE_SUCCESS;
@ -2097,7 +2087,7 @@ void tscTryQueryNextClause(SSqlObj* pSql, void (*queryFp)()) {
}
void tscGetResultColumnChr(SSqlRes* pRes, SFieldInfo* pFieldInfo, int32_t columnIndex) {
SFieldSupInfo* pInfo = taosArrayGet(pFieldInfo->pSupportInfo, columnIndex);//tscFieldInfoGetSupp(pFieldInfo, columnIndex);
SFieldSupInfo* pInfo = taosArrayGet(pFieldInfo->pSupportInfo, columnIndex);
assert(pInfo->pSqlExpr != NULL);
int32_t type = pInfo->pSqlExpr->resType;
@ -2112,7 +2102,7 @@ void tscGetResultColumnChr(SSqlRes* pRes, SFieldInfo* pFieldInfo, int32_t column
if (isNull(pData, type)) {
pRes->tsrow[columnIndex] = NULL;
} else {
pRes->tsrow[columnIndex] = pData + VARSTR_HEADER_SIZE;
pRes->tsrow[columnIndex] = ((tstr*)pData)->data;
}
if (realLen < pInfo->pSqlExpr->resBytes - VARSTR_HEADER_SIZE) { // todo refactor

View File

@ -269,8 +269,14 @@ static void cqProcessStreamRes(void *param, TAOS_RES *tres, TAOS_ROW row) {
void* val = row[i];
if (val == NULL) {
val = getNullValue(c->type);
} else if (IS_VAR_DATA_TYPE(c->type)) {
} else if (c->type == TSDB_DATA_TYPE_BINARY) {
val = ((char*)val) - sizeof(VarDataLenT);
} else if (c->type == TSDB_DATA_TYPE_NCHAR) {
char buf[TSDB_MAX_NCHAR_LEN];
size_t len = taos_fetch_lengths(tres)[i];
taosMbsToUcs4(val, len, buf, sizeof(buf), &len);
memcpy(val + sizeof(VarDataLenT), buf, len);
varDataLen(val) = len;
}
tdAppendColVal(trow, val, c->type, c->bytes, c->offset);
}

View File

@ -471,8 +471,8 @@ static int32_t sdbInsertHash(SSdbTable *pTable, SSdbOper *pOper) {
atomic_add_fetch_32(&pTable->autoIndex, 1);
}
sdbDebug("table:%s, insert record:%s to hash, rowSize:%d numOfRows:%" PRId64 " ver:%" PRIu64 ", msg:%p", pTable->tableName,
sdbGetKeyStrFromObj(pTable, pOper->pObj), pOper->rowSize, pTable->numOfRows, sdbGetVersion(), pOper->pMsg);
sdbDebug("table:%s, insert record:%s to hash, rowSize:%d numOfRows:%" PRId64 ", msg:%p", pTable->tableName,
sdbGetKeyStrFromObj(pTable, pOper->pObj), pOper->rowSize, pTable->numOfRows, pOper->pMsg);
(*pTable->insertFp)(pOper);
return TSDB_CODE_SUCCESS;
@ -490,8 +490,8 @@ static int32_t sdbDeleteHash(SSdbTable *pTable, SSdbOper *pOper) {
taosHashRemove(pTable->iHandle, key, keySize);
atomic_sub_fetch_32(&pTable->numOfRows, 1);
sdbDebug("table:%s, delete record:%s from hash, numOfRows:%" PRId64 " ver:%" PRIu64 ", msg:%p", pTable->tableName,
sdbGetKeyStrFromObj(pTable, pOper->pObj), pTable->numOfRows, sdbGetVersion(), pOper->pMsg);
sdbDebug("table:%s, delete record:%s from hash, numOfRows:%" PRId64 ", msg:%p", pTable->tableName,
sdbGetKeyStrFromObj(pTable, pOper->pObj), pTable->numOfRows, pOper->pMsg);
int8_t *updateEnd = pOper->pObj + pTable->refCountPos - 1;
*updateEnd = 1;
@ -501,8 +501,8 @@ static int32_t sdbDeleteHash(SSdbTable *pTable, SSdbOper *pOper) {
}
static int32_t sdbUpdateHash(SSdbTable *pTable, SSdbOper *pOper) {
sdbDebug("table:%s, update record:%s in hash, numOfRows:%" PRId64 " ver:%" PRIu64 ", msg:%p", pTable->tableName,
sdbGetKeyStrFromObj(pTable, pOper->pObj), pTable->numOfRows, sdbGetVersion(), pOper->pMsg);
sdbDebug("table:%s, update record:%s in hash, numOfRows:%" PRId64 ", msg:%p", pTable->tableName,
sdbGetKeyStrFromObj(pTable, pOper->pObj), pTable->numOfRows, pOper->pMsg);
(*pTable->updateFp)(pOper);
return TSDB_CODE_SUCCESS;
@ -967,7 +967,11 @@ static void *sdbWorkerFp(void *param) {
}
int32_t code = sdbWrite(pOper, pHead, type);
if (pOper && code <= 0) pOper->retCode = code;
if (code > 0) code = 0;
if (pOper)
pOper->retCode = code;
else
pHead->len = code; // hackway
}
walFsync(tsSdbObj.wal);
@ -982,7 +986,8 @@ static void *sdbWorkerFp(void *param) {
sdbDecRef(pOper->table, pOper->pObj);
sdbConfirmForward(NULL, pOper, pOper->retCode);
} else if (type == TAOS_QTYPE_FWD) {
syncConfirmForward(tsSdbObj.sync, pHead->version, TSDB_CODE_SUCCESS);
pHead = (SWalHead *)item;
syncConfirmForward(tsSdbObj.sync, pHead->version, pHead->len);
taosFreeQitem(item);
} else {
taosFreeQitem(item);

View File

@ -310,7 +310,8 @@ static int32_t mnodeProcessConnectMsg(SMnodeMsg *pMsg) {
if (pDb->status != TSDB_DB_STATUS_READY) {
mError("db:%s, status:%d, in dropping", pDb->name, pDb->status);
return TSDB_CODE_MND_DB_IN_DROPPING;
code = TSDB_CODE_MND_DB_IN_DROPPING;
goto connect_over;
}
mnodeDecDbRef(pDb);
}
@ -355,7 +356,7 @@ static int32_t mnodeProcessUseMsg(SMnodeMsg *pMsg) {
int32_t code = TSDB_CODE_SUCCESS;
if (pMsg->pDb == NULL) pMsg->pDb = mnodeGetDb(pUseDbMsg->db);
if (pMsg->pDb == NULL) {
code = TSDB_CODE_MND_INVALID_DB;
return TSDB_CODE_MND_INVALID_DB;
}
if (pMsg->pDb->status != TSDB_DB_STATUS_READY) {

View File

@ -172,6 +172,7 @@ typedef struct SQueryRuntimeEnv {
bool topBotQuery; // false
bool groupbyNormalCol; // denote if this is a groupby normal column query
bool hasTagResults; // if there are tag values in final result or not
int32_t interBufSize; // intermediate buffer sizse
int32_t prevGroupId; // previous executed group id
SDiskbasedResultBuf* pResultBuf; // query result buffer based on blocked-wised disk file
} SQueryRuntimeEnv;

View File

@ -15,6 +15,8 @@
#ifndef TDENGINE_QUERYUTIL_H
#define TDENGINE_QUERYUTIL_H
int32_t getOutputInterResultBufSize(SQuery* pQuery);
void clearTimeWindowResBuf(SQueryRuntimeEnv* pRuntimeEnv, SWindowResult* pOneOutputRes);
void copyTimeWindowResBuf(SQueryRuntimeEnv* pRuntimeEnv, SWindowResult* dst, const SWindowResult* src);
@ -35,7 +37,7 @@ SWindowResult *getWindowResult(SWindowResInfo *pWindowResInfo, int32_t slot);
#define curTimeWindow(_winres) ((_winres)->curIndex)
bool isWindowResClosed(SWindowResInfo *pWindowResInfo, int32_t slot);
void createQueryResultInfo(SQuery *pQuery, SWindowResult *pResultRow, bool isSTableQuery, SPosInfo *posInfo);
void createQueryResultInfo(SQuery *pQuery, SWindowResult *pResultRow, bool isSTableQuery, SPosInfo *posInfo, size_t interBufSize);
char *getPosInResultPage(SQueryRuntimeEnv *pRuntimeEnv, int32_t columnIndex, SWindowResult *pResult);

View File

@ -272,7 +272,7 @@ bool top_bot_datablock_filter(SQLFunctionCtx *pCtx, int32_t functionId, char *mi
bool stableQueryFunctChanged(int32_t funcId);
void resetResultInfo(SResultInfo *pResInfo);
void setResultInfoBuf(SResultInfo *pResInfo, int32_t size, bool superTable);
void setResultInfoBuf(SResultInfo *pResInfo, int32_t size, bool superTable, char* buf);
static FORCE_INLINE void initResultInfo(SResultInfo *pResInfo) {
pResInfo->initialized = true; // the this struct has been initialized flag

View File

@ -123,6 +123,14 @@ static void setQueryStatus(SQuery *pQuery, int8_t status);
#define QUERY_IS_INTERVAL_QUERY(_q) ((_q)->intervalTime > 0)
// previous time window may not be of the same size of pQuery->intervalTime
#define GET_NEXT_TIMEWINDOW(_q, tw) \
do { \
int32_t factor = GET_FORWARD_DIRECTION_FACTOR((_q)->order.order); \
(tw)->skey += ((_q)->slidingTime * factor); \
(tw)->ekey = (tw)->skey + ((_q)->intervalTime - 1); \
} while (0)
// todo move to utility
static int32_t mergeIntoGroupResultImpl(SQInfo *pQInfo, SArray *group);
@ -130,7 +138,6 @@ static void setWindowResOutputBuf(SQueryRuntimeEnv *pRuntimeEnv, SWindowResult *
static void setWindowResOutputBufInitCtx(SQueryRuntimeEnv *pRuntimeEnv, SWindowResult *pResult);
static void resetMergeResultBuf(SQuery *pQuery, SQLFunctionCtx *pCtx, SResultInfo *pResultInfo);
static bool functionNeedToExecute(SQueryRuntimeEnv *pRuntimeEnv, SQLFunctionCtx *pCtx, int32_t functionId);
static void getNextTimeWindow(SQuery *pQuery, STimeWindow *pTimeWindow);
static void setExecParams(SQuery *pQuery, SQLFunctionCtx *pCtx, void* inputData, TSKEY *tsCol, SDataBlockInfo* pBlockInfo,
SDataStatis *pStatis, void *param, int32_t colIndex);
@ -419,7 +426,7 @@ static SWindowResult *doSetTimeWindowFromKey(SQueryRuntimeEnv *pRuntimeEnv, SWin
for (int32_t i = pWindowResInfo->capacity; i < newCap; ++i) {
SPosInfo pos = {-1, -1};
createQueryResultInfo(pQuery, &pWindowResInfo->pResult[i], pRuntimeEnv->stableQuery, &pos);
createQueryResultInfo(pQuery, &pWindowResInfo->pResult[i], pRuntimeEnv->stableQuery, &pos, pRuntimeEnv->interBufSize);
}
pWindowResInfo->capacity = newCap;
}
@ -551,19 +558,29 @@ static SWindowStatus *getTimeWindowResStatus(SWindowResInfo *pWindowResInfo, int
static int32_t getForwardStepsInBlock(int32_t numOfRows, __block_search_fn_t searchFn, TSKEY ekey, int16_t pos,
int16_t order, int64_t *pData) {
int32_t endPos = searchFn((char *)pData, numOfRows, ekey, order);
int32_t forwardStep = 0;
if (endPos >= 0) {
forwardStep = (order == TSDB_ORDER_ASC) ? (endPos - pos) : (pos - endPos);
assert(forwardStep >= 0);
if (order == TSDB_ORDER_ASC) {
int32_t end = searchFn((char*) &pData[pos], numOfRows - pos, ekey, order);
if (end >= 0) {
forwardStep = end;
// endPos data is equalled to the key so, we do need to read the element in endPos
if (pData[endPos] == ekey) {
if (pData[end + pos] == ekey) {
forwardStep += 1;
}
}
} else {
int32_t end = searchFn((char *)pData, pos + 1, ekey, order);
if (end >= 0) {
forwardStep = pos - end;
if (pData[end] == ekey) {
forwardStep += 1;
}
}
}
assert(forwardStep > 0);
return forwardStep;
}
@ -686,7 +703,7 @@ static int32_t getNumOfRowsInTimeWindow(SQuery *pQuery, SDataBlockInfo *pDataBlo
}
}
assert(num >= 0);
assert(num > 0);
return num;
}
@ -736,59 +753,60 @@ static void doRowwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SWindowStatus
}
}
static int32_t getNextQualifiedWindow(SQueryRuntimeEnv *pRuntimeEnv, STimeWindow *pNextWin,
SDataBlockInfo *pDataBlockInfo, TSKEY *primaryKeys,
__block_search_fn_t searchFn) {
static int32_t getNextQualifiedWindow(SQueryRuntimeEnv *pRuntimeEnv, STimeWindow *pNext, SDataBlockInfo *pDataBlockInfo,
TSKEY *primaryKeys, __block_search_fn_t searchFn, int32_t prevPosition) {
SQuery *pQuery = pRuntimeEnv->pQuery;
// tumbling time window query, a special case of sliding time window query
if (pQuery->slidingTime == pQuery->intervalTime) {
// todo opt
}
getNextTimeWindow(pQuery, pNextWin);
GET_NEXT_TIMEWINDOW(pQuery, pNext);
// next time window is not in current block
if ((pNextWin->skey > pDataBlockInfo->window.ekey && QUERY_IS_ASC_QUERY(pQuery)) ||
(pNextWin->ekey < pDataBlockInfo->window.skey && !QUERY_IS_ASC_QUERY(pQuery))) {
if ((pNext->skey > pDataBlockInfo->window.ekey && QUERY_IS_ASC_QUERY(pQuery)) ||
(pNext->ekey < pDataBlockInfo->window.skey && !QUERY_IS_ASC_QUERY(pQuery))) {
return -1;
}
TSKEY startKey = -1;
if (QUERY_IS_ASC_QUERY(pQuery)) {
startKey = pNextWin->skey;
startKey = pNext->skey;
if (startKey < pQuery->window.skey) {
startKey = pQuery->window.skey;
}
} else {
startKey = pNextWin->ekey;
startKey = pNext->ekey;
if (startKey > pQuery->window.skey) {
startKey = pQuery->window.skey;
}
}
int32_t startPos = searchFn((char *)primaryKeys, pDataBlockInfo->rows, startKey, pQuery->order.order);
int32_t startPos = 0;
// tumbling time window query, a special case of sliding time window query
if (pQuery->slidingTime == pQuery->intervalTime && prevPosition != -1) {
int32_t factor = GET_FORWARD_DIRECTION_FACTOR(pQuery->order.order);
startPos = prevPosition + factor;
} else {
startPos = searchFn((char *)primaryKeys, pDataBlockInfo->rows, startKey, pQuery->order.order);
}
/*
* This time window does not cover any data, try next time window,
* this case may happen when the time window is too small
*/
if (QUERY_IS_ASC_QUERY(pQuery) && primaryKeys[startPos] > pNextWin->ekey) {
if (QUERY_IS_ASC_QUERY(pQuery) && primaryKeys[startPos] > pNext->ekey) {
TSKEY next = primaryKeys[startPos];
pNextWin->ekey += ((next - pNextWin->ekey + pQuery->slidingTime - 1)/pQuery->slidingTime) * pQuery->slidingTime;
pNextWin->skey = pNextWin->ekey - pQuery->intervalTime + 1;
} else if ((!QUERY_IS_ASC_QUERY(pQuery)) && primaryKeys[startPos] < pNextWin->skey) {
pNext->ekey += ((next - pNext->ekey + pQuery->slidingTime - 1)/pQuery->slidingTime) * pQuery->slidingTime;
pNext->skey = pNext->ekey - pQuery->intervalTime + 1;
} else if ((!QUERY_IS_ASC_QUERY(pQuery)) && primaryKeys[startPos] < pNext->skey) {
TSKEY next = primaryKeys[startPos];
pNextWin->skey -= ((pNextWin->skey - next + pQuery->slidingTime - 1) / pQuery->slidingTime) * pQuery->slidingTime;
pNextWin->ekey = pNextWin->skey + pQuery->intervalTime - 1;
pNext->skey -= ((pNext->skey - next + pQuery->slidingTime - 1) / pQuery->slidingTime) * pQuery->slidingTime;
pNext->ekey = pNext->skey + pQuery->intervalTime - 1;
}
return startPos;
}
static TSKEY reviseWindowEkey(SQuery *pQuery, STimeWindow *pWindow) {
static FORCE_INLINE TSKEY reviseWindowEkey(SQuery *pQuery, STimeWindow *pWindow) {
TSKEY ekey = -1;
if (QUERY_IS_ASC_QUERY(pQuery)) {
ekey = pWindow->ekey;
@ -924,20 +942,23 @@ static void blockwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SDataStatis *
return;
}
int32_t forwardStep = 0;
int32_t startPos = pQuery->pos;
if (hasTimeWindow) {
TSKEY ekey = reviseWindowEkey(pQuery, &win);
int32_t forwardStep =
getNumOfRowsInTimeWindow(pQuery, pDataBlockInfo, tsCols, pQuery->pos, ekey, searchFn, true);
forwardStep = getNumOfRowsInTimeWindow(pQuery, pDataBlockInfo, tsCols, pQuery->pos, ekey, searchFn, true);
SWindowStatus *pStatus = getTimeWindowResStatus(pWindowResInfo, curTimeWindow(pWindowResInfo));
doBlockwiseApplyFunctions(pRuntimeEnv, pStatus, &win, pQuery->pos, forwardStep, tsCols, pDataBlockInfo->rows);
doBlockwiseApplyFunctions(pRuntimeEnv, pStatus, &win, startPos, forwardStep, tsCols, pDataBlockInfo->rows);
}
int32_t index = pWindowResInfo->curIndex;
STimeWindow nextWin = win;
while (1) {
int32_t startPos = getNextQualifiedWindow(pRuntimeEnv, &nextWin, pDataBlockInfo, tsCols, searchFn);
int32_t prevEndPos = (forwardStep - 1) * step + startPos;
startPos = getNextQualifiedWindow(pRuntimeEnv, &nextWin, pDataBlockInfo, tsCols, searchFn, prevEndPos);
if (startPos < 0) {
break;
}
@ -953,7 +974,7 @@ static void blockwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SDataStatis *
}
TSKEY ekey = reviseWindowEkey(pQuery, &nextWin);
int32_t forwardStep = getNumOfRowsInTimeWindow(pQuery, pDataBlockInfo, tsCols, startPos, ekey, searchFn, true);
forwardStep = getNumOfRowsInTimeWindow(pQuery, pDataBlockInfo, tsCols, startPos, ekey, searchFn, true);
SWindowStatus* pStatus = getTimeWindowResStatus(pWindowResInfo, curTimeWindow(pWindowResInfo));
doBlockwiseApplyFunctions(pRuntimeEnv, pStatus, &nextWin, startPos, forwardStep, tsCols, pDataBlockInfo->rows);
@ -1224,7 +1245,7 @@ static void rowwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SDataStatis *pS
int32_t index = pWindowResInfo->curIndex;
while (1) {
getNextTimeWindow(pQuery, &nextWin);
GET_NEXT_TIMEWINDOW(pQuery, &nextWin);
if (/*pWindowResInfo->startTime > nextWin.skey ||*/
(nextWin.skey > pQuery->window.ekey && QUERY_IS_ASC_QUERY(pQuery)) ||
(nextWin.skey < pQuery->window.ekey && !QUERY_IS_ASC_QUERY(pQuery))) {
@ -1236,7 +1257,7 @@ static void rowwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SDataStatis *pS
}
// null data, failed to allocate more memory buffer
bool hasTimeWindow = false;
hasTimeWindow = false;
if (setWindowOutputBufByKey(pRuntimeEnv, pWindowResInfo, pDataBlockInfo->tid, &nextWin, masterScan, &hasTimeWindow) != TSDB_CODE_SUCCESS) {
break;
}
@ -1459,11 +1480,13 @@ static void setCtxTagColumnInfo(SQueryRuntimeEnv *pRuntimeEnv, SQLFunctionCtx *p
}
}
static FORCE_INLINE void setWindowResultInfo(SResultInfo *pResultInfo, SQuery *pQuery, bool isStableQuery) {
static FORCE_INLINE void setWindowResultInfo(SResultInfo *pResultInfo, SQuery *pQuery, bool isStableQuery, char* buf) {
char* p = buf;
for (int32_t i = 0; i < pQuery->numOfOutput; ++i) {
assert(pQuery->pSelectExpr[i].interBytes <= DEFAULT_INTERN_BUF_PAGE_SIZE);
int32_t size = pQuery->pSelectExpr[i].interBytes;
setResultInfoBuf(&pResultInfo[i], size, isStableQuery, p);
setResultInfoBuf(&pResultInfo[i], pQuery->pSelectExpr[i].interBytes, isStableQuery);
p += size;
}
}
@ -1542,8 +1565,10 @@ static int32_t setupQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv, int16_t order
}
}
char* buf = calloc(1, pRuntimeEnv->interBufSize);
// set the intermediate result output buffer
setWindowResultInfo(pRuntimeEnv->resultInfo, pQuery, pRuntimeEnv->stableQuery);
setWindowResultInfo(pRuntimeEnv->resultInfo, pQuery, pRuntimeEnv->stableQuery, buf);
// if it is group by normal column, do not set output buffer, the output buffer is pResult
if (!isGroupbyNormalCol(pQuery->pGroupbyExpr) && !pRuntimeEnv->stableQuery) {
@ -1581,9 +1606,9 @@ static void teardownQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv) {
tVariantDestroy(&pCtx->tag);
tfree(pCtx->tagInfo.pTagCtxList);
tfree(pRuntimeEnv->resultInfo[i].interResultBuf);
}
tfree(pRuntimeEnv->resultInfo[0].interResultBuf);
tfree(pRuntimeEnv->resultInfo);
tfree(pRuntimeEnv->pCtx);
}
@ -2017,14 +2042,6 @@ static bool needToLoadDataBlock(SQuery *pQuery, SDataStatis *pDataStatis, SQLFun
return true;
}
// previous time window may not be of the same size of pQuery->intervalTime
static void getNextTimeWindow(SQuery *pQuery, STimeWindow *pTimeWindow) {
int32_t factor = GET_FORWARD_DIRECTION_FACTOR(pQuery->order.order);
pTimeWindow->skey += (pQuery->slidingTime * factor);
pTimeWindow->ekey = pTimeWindow->skey + (pQuery->intervalTime - 1);
}
SArray *loadDataBlockOnDemand(SQueryRuntimeEnv *pRuntimeEnv, void* pQueryHandle, SDataBlockInfo* pBlockInfo, SDataStatis **pStatis) {
SQuery *pQuery = pRuntimeEnv->pQuery;
@ -2737,7 +2754,8 @@ int32_t mergeIntoGroupResultImpl(SQInfo *pQInfo, SArray *pGroup) {
longjmp(pRuntimeEnv->env, TSDB_CODE_QRY_OUT_OF_MEMORY);
}
setWindowResultInfo(pResultInfo, pQuery, pRuntimeEnv->stableQuery);
char* buf = calloc(1, pRuntimeEnv->interBufSize);
setWindowResultInfo(pResultInfo, pQuery, pRuntimeEnv->stableQuery, buf);
resetMergeResultBuf(pQuery, pRuntimeEnv->pCtx, pResultInfo);
int64_t lastTimestamp = -1;
@ -2823,11 +2841,9 @@ int32_t mergeIntoGroupResultImpl(SQInfo *pQInfo, SArray *pGroup) {
tfree(pTree);
pQInfo->offset = 0;
for (int32_t i = 0; i < pQuery->numOfOutput; ++i) {
tfree(pResultInfo[i].interResultBuf);
}
tfree(pResultInfo);
tfree(buf);
return pQInfo->numOfGroupResultPages;
}
@ -2980,14 +2996,16 @@ void switchCtxOrder(SQueryRuntimeEnv *pRuntimeEnv) {
}
}
void createQueryResultInfo(SQuery *pQuery, SWindowResult *pResultRow, bool isSTableQuery, SPosInfo *posInfo) {
void createQueryResultInfo(SQuery *pQuery, SWindowResult *pResultRow, bool isSTableQuery, SPosInfo *posInfo, size_t interBufSize) {
int32_t numOfCols = pQuery->numOfOutput;
pResultRow->resultInfo = calloc((size_t)numOfCols, sizeof(SResultInfo));
pResultRow->pos = *posInfo;
char* buf = calloc(1, interBufSize);
// set the intermediate result output buffer
setWindowResultInfo(pResultRow->resultInfo, pQuery, isSTableQuery);
setWindowResultInfo(pResultRow->resultInfo, pQuery, isSTableQuery, buf);
}
void resetCtxOutputBuf(SQueryRuntimeEnv *pRuntimeEnv) {
@ -3365,7 +3383,7 @@ static STableQueryInfo *createTableQueryInfo(SQueryRuntimeEnv *pRuntimeEnv, void
// set more initial size of interval/groupby query
if (QUERY_IS_INTERVAL_QUERY(pQuery) || pRuntimeEnv->groupbyNormalCol) {
int32_t initialSize = 20;
int32_t initialSize = 16;
int32_t initialThreshold = 100;
initWindowResInfo(&pTableQueryInfo->windowResInfo, pRuntimeEnv, initialSize, initialThreshold, TSDB_DATA_TYPE_INT);
} else { // in other aggregate query, do not initialize the windowResInfo
@ -3591,20 +3609,7 @@ bool needPrimaryTimestampCol(SQuery *pQuery, SDataBlockInfo *pDataBlockInfo) {
return loadPrimaryTS;
}
static int32_t getNumOfSubset(SQInfo *pQInfo) {
SQuery *pQuery = pQInfo->runtimeEnv.pQuery;
int32_t totalSubset = 0;
if (pQInfo->runtimeEnv.groupbyNormalCol || (QUERY_IS_INTERVAL_QUERY(pQuery))) {
totalSubset = numOfClosedTimeWindow(&pQInfo->runtimeEnv.windowResInfo);
} else {
totalSubset = GET_NUM_OF_TABLEGROUP(pQInfo);
}
return totalSubset;
}
static int32_t doCopyToSData(SQInfo *pQInfo, SWindowResult *result, int32_t orderType) {
static int32_t doCopyToSData(SQInfo *pQInfo, SWindowResInfo *pResultInfo, int32_t orderType) {
SQueryRuntimeEnv *pRuntimeEnv = &pQInfo->runtimeEnv;
SQuery * pQuery = pRuntimeEnv->pQuery;
@ -3613,17 +3618,18 @@ static int32_t doCopyToSData(SQInfo *pQInfo, SWindowResult *result, int32_t orde
int32_t step = -1;
qDebug("QInfo:%p start to copy data from windowResInfo to query buf", pQInfo);
int32_t totalSubset = getNumOfSubset(pQInfo);
int32_t totalSet = numOfClosedTimeWindow(pResultInfo);
SWindowResult* result = pResultInfo->pResult;
if (orderType == TSDB_ORDER_ASC) {
startIdx = pQInfo->groupIndex;
step = 1;
} else { // desc order copy all data
startIdx = totalSubset - pQInfo->groupIndex - 1;
startIdx = totalSet - pQInfo->groupIndex - 1;
step = -1;
}
for (int32_t i = startIdx; (i < totalSubset) && (i >= 0); i += step) {
for (int32_t i = startIdx; (i < totalSet) && (i >= 0); i += step) {
if (result[i].numOfRows == 0) {
pQInfo->offset = 0;
pQInfo->groupIndex += 1;
@ -3678,11 +3684,11 @@ static int32_t doCopyToSData(SQInfo *pQInfo, SWindowResult *result, int32_t orde
* @param pQInfo
* @param result
*/
void copyFromWindowResToSData(SQInfo *pQInfo, SWindowResult *result) {
void copyFromWindowResToSData(SQInfo *pQInfo, SWindowResInfo *pResultInfo) {
SQuery *pQuery = pQInfo->runtimeEnv.pQuery;
int32_t orderType = (pQuery->pGroupbyExpr != NULL) ? pQuery->pGroupbyExpr->orderType : TSDB_ORDER_ASC;
int32_t numOfResult = doCopyToSData(pQInfo, result, orderType);
int32_t numOfResult = doCopyToSData(pQInfo, pResultInfo, orderType);
pQuery->rec.rows += numOfResult;
@ -4013,7 +4019,7 @@ static bool skipTimeInterval(SQueryRuntimeEnv *pRuntimeEnv, TSKEY* start) {
}
STimeWindow tw = win;
getNextTimeWindow(pQuery, &tw);
GET_NEXT_TIMEWINDOW(pQuery, &tw);
if (pQuery->limit.offset == 0) {
if ((tw.skey <= blockInfo.window.ekey && QUERY_IS_ASC_QUERY(pQuery)) ||
@ -4025,7 +4031,7 @@ static bool skipTimeInterval(SQueryRuntimeEnv *pRuntimeEnv, TSKEY* start) {
tw = win;
int32_t startPos =
getNextQualifiedWindow(pRuntimeEnv, &tw, &blockInfo, pColInfoData->pData, binarySearchForKey);
getNextQualifiedWindow(pRuntimeEnv, &tw, &blockInfo, pColInfoData->pData, binarySearchForKey, -1);
assert(startPos >= 0);
// set the abort info
@ -4068,7 +4074,7 @@ static bool skipTimeInterval(SQueryRuntimeEnv *pRuntimeEnv, TSKEY* start) {
tw = win;
int32_t startPos =
getNextQualifiedWindow(pRuntimeEnv, &tw, &blockInfo, pColInfoData->pData, binarySearchForKey);
getNextQualifiedWindow(pRuntimeEnv, &tw, &blockInfo, pColInfoData->pData, binarySearchForKey, -1);
assert(startPos >= 0);
// set the abort info
@ -4197,7 +4203,7 @@ int32_t doInitQInfo(SQInfo *pQInfo, STSBuf *pTsBuf, void *tsdb, int32_t vgId, bo
type = TSDB_DATA_TYPE_INT; // group id
}
initWindowResInfo(&pRuntimeEnv->windowResInfo, pRuntimeEnv, 512, 4096, type);
initWindowResInfo(&pRuntimeEnv->windowResInfo, pRuntimeEnv, 32, 4096, type);
}
} else if (pRuntimeEnv->groupbyNormalCol || QUERY_IS_INTERVAL_QUERY(pQuery)) {
@ -4505,7 +4511,7 @@ static void sequentialTableProcess(SQInfo *pQInfo) {
pQInfo->groupIndex = 0;
ensureOutputBufferSimple(pRuntimeEnv, pWindowResInfo->size);
copyFromWindowResToSData(pQInfo, pWindowResInfo->pResult);
copyFromWindowResToSData(pQInfo, pWindowResInfo);
pQInfo->groupIndex = currentGroupIndex; //restore the group index
assert(pQuery->rec.rows == pWindowResInfo->size);
@ -4520,7 +4526,7 @@ static void sequentialTableProcess(SQInfo *pQInfo) {
* we need to return it to client in the first place.
*/
if (pQInfo->groupIndex > 0) {
copyFromWindowResToSData(pQInfo, pRuntimeEnv->windowResInfo.pResult);
copyFromWindowResToSData(pQInfo, &pRuntimeEnv->windowResInfo);
pQuery->rec.total += pQuery->rec.rows;
if (pQuery->rec.rows > 0) {
@ -4721,7 +4727,7 @@ static void multiTableQueryProcess(SQInfo *pQInfo) {
displayInterResult(pQuery->sdata, pRuntimeEnv, pQuery->sdata[0]->num);
#endif
} else {
copyFromWindowResToSData(pQInfo, pRuntimeEnv->windowResInfo.pResult);
copyFromWindowResToSData(pQInfo, &pRuntimeEnv->windowResInfo);
}
qDebug("QInfo:%p current:%"PRId64", total:%"PRId64"", pQInfo, pQuery->rec.rows, pQuery->rec.total);
@ -4772,7 +4778,7 @@ static void multiTableQueryProcess(SQInfo *pQInfo) {
#endif
}
} else { // not a interval query
copyFromWindowResToSData(pQInfo, pRuntimeEnv->windowResInfo.pResult);
copyFromWindowResToSData(pQInfo, &pRuntimeEnv->windowResInfo);
}
// handle the limitation of output buffer
@ -4927,7 +4933,7 @@ static void tableIntervalProcess(SQInfo *pQInfo, STableQueryInfo* pTableInfo) {
if (QUERY_IS_INTERVAL_QUERY(pQuery)) {
pQInfo->groupIndex = 0; // always start from 0
pQuery->rec.rows = 0;
copyFromWindowResToSData(pQInfo, pRuntimeEnv->windowResInfo.pResult);
copyFromWindowResToSData(pQInfo, &pRuntimeEnv->windowResInfo);
clearFirstNTimeWindow(pRuntimeEnv, pQInfo->groupIndex);
}
@ -4956,7 +4962,7 @@ static void tableIntervalProcess(SQInfo *pQInfo, STableQueryInfo* pTableInfo) {
if (pRuntimeEnv->groupbyNormalCol) { // todo refactor with merge interval time result
pQInfo->groupIndex = 0;
pQuery->rec.rows = 0;
copyFromWindowResToSData(pQInfo, pRuntimeEnv->windowResInfo.pResult);
copyFromWindowResToSData(pQInfo, &pRuntimeEnv->windowResInfo);
clearFirstNTimeWindow(pRuntimeEnv, pQInfo->groupIndex);
}
@ -4988,7 +4994,7 @@ static void tableQueryImpl(SQInfo *pQInfo) {
pQInfo->groupIndex = 0; // always start from 0
if (pRuntimeEnv->windowResInfo.size > 0) {
copyFromWindowResToSData(pQInfo, pRuntimeEnv->windowResInfo.pResult);
copyFromWindowResToSData(pQInfo, &pRuntimeEnv->windowResInfo);
clearFirstNTimeWindow(pRuntimeEnv, pQInfo->groupIndex);
if (pQuery->rec.rows > 0) {
@ -5736,7 +5742,7 @@ static SQInfo *createQInfoImpl(SQueryTableMsg *pQueryMsg, SArray* pTableIdList,
STimeWindow window = pQueryMsg->window;
taosArraySort(pTableIdList, compareTableIdInfo);
// TODO optimize the STableQueryInfo malloc strategy
pQInfo->runtimeEnv.interBufSize = getOutputInterResultBufSize(pQuery);
pQInfo->pBuf = calloc(pTableGroupInfo->numOfTables, sizeof(STableQueryInfo));
int32_t index = 0;

View File

@ -17,15 +17,24 @@
#include "hash.h"
#include "taosmsg.h"
#include "qextbuffer.h"
#include "ttime.h"
#include "qfill.h"
#include "ttime.h"
#include "qExecutor.h"
#include "qUtil.h"
int32_t getOutputInterResultBufSize(SQuery* pQuery) {
int32_t size = 0;
for (int32_t i = 0; i < pQuery->numOfOutput; ++i) {
assert(pQuery->pSelectExpr[i].interBytes <= DEFAULT_INTERN_BUF_PAGE_SIZE);
size += pQuery->pSelectExpr[i].interBytes;
}
assert(size > 0);
return size;
}
int32_t initWindowResInfo(SWindowResInfo *pWindowResInfo, SQueryRuntimeEnv *pRuntimeEnv, int32_t size,
int32_t threshold, int16_t type) {
pWindowResInfo->capacity = size;
@ -43,7 +52,7 @@ int32_t initWindowResInfo(SWindowResInfo *pWindowResInfo, SQueryRuntimeEnv *pRun
pWindowResInfo->pResult = calloc(threshold, sizeof(SWindowResult));
for (int32_t i = 0; i < pWindowResInfo->capacity; ++i) {
SPosInfo posInfo = {-1, -1};
createQueryResultInfo(pRuntimeEnv->pQuery, &pWindowResInfo->pResult[i], pRuntimeEnv->stableQuery, &posInfo);
createQueryResultInfo(pRuntimeEnv->pQuery, &pWindowResInfo->pResult[i], pRuntimeEnv->stableQuery, &posInfo, pRuntimeEnv->interBufSize);
}
return TSDB_CODE_SUCCESS;
@ -54,11 +63,7 @@ void destroyTimeWindowRes(SWindowResult *pWindowRes, int32_t nOutputCols) {
return;
}
// TODO opt malloc strategy
for (int32_t i = 0; i < nOutputCols; ++i) {
free(pWindowRes->resultInfo[i].interResultBuf);
}
free(pWindowRes->resultInfo[0].interResultBuf);
free(pWindowRes->resultInfo);
}
@ -241,10 +246,9 @@ void clearTimeWindowResBuf(SQueryRuntimeEnv *pRuntimeEnv, SWindowResult *pWindow
}
pWindowRes->numOfRows = 0;
// pWindowRes->nAlloc = 0;
pWindowRes->pos = (SPosInfo){-1, -1};
pWindowRes->status.closed = false;
pWindowRes->window = (STimeWindow){0, 0};
pWindowRes->window = TSWINDOW_INITIALIZER;
}
/**
@ -254,7 +258,6 @@ void clearTimeWindowResBuf(SQueryRuntimeEnv *pRuntimeEnv, SWindowResult *pWindow
*/
void copyTimeWindowResBuf(SQueryRuntimeEnv *pRuntimeEnv, SWindowResult *dst, const SWindowResult *src) {
dst->numOfRows = src->numOfRows;
// dst->nAlloc = src->nAlloc;
dst->window = src->window;
dst->status = src->status;

View File

@ -1366,6 +1366,7 @@ static int32_t rpcCompressRpcMsg(char* pCont, int32_t contLen) {
}
int32_t compLen = LZ4_compress_default(pCont, buf, contLen, contLen + overhead);
tDebug("compress rpc msg, before:%d, after:%d, overhead:%d", contLen, compLen, overhead);
/*
* only the compressed size is less than the value of contLen - overhead, the compression is applied
@ -1378,7 +1379,7 @@ static int32_t rpcCompressRpcMsg(char* pCont, int32_t contLen) {
memcpy(pCont + overhead, buf, compLen);
pHead->comp = 1;
//tDebug("compress rpc msg, before:%d, after:%d", contLen, compLen);
tDebug("compress rpc msg, before:%d, after:%d", contLen, compLen);
finalLen = compLen + overhead;
} else {
finalLen = contLen;

View File

@ -606,6 +606,47 @@ class DbConnRest(DbConn):
print(self._result)
raise RuntimeError("TBD")
# Duplicate code from TDMySQL, TODO: merge all this into DbConnNative
class MyTDSql:
def __init__(self):
self.queryRows = 0
self.queryCols = 0
self.affectedRows = 0
def init(self, cursor, log=True):
self.cursor = cursor
# if (log):
# caller = inspect.getframeinfo(inspect.stack()[1][0])
# self.cursor.log(caller.filename + ".sql")
def close(self):
self.cursor.close()
def query(self, sql):
self.sql = sql
try:
self.cursor.execute(sql)
self.queryResult = self.cursor.fetchall()
self.queryRows = len(self.queryResult)
self.queryCols = len(self.cursor.description)
except Exception as e:
# caller = inspect.getframeinfo(inspect.stack()[1][0])
# args = (caller.filename, caller.lineno, sql, repr(e))
# tdLog.exit("%s(%d) failed: sql:%s, %s" % args)
raise
return self.queryRows
def execute(self, sql):
self.sql = sql
try:
self.affectedRows = self.cursor.execute(sql)
except Exception as e:
# caller = inspect.getframeinfo(inspect.stack()[1][0])
# args = (caller.filename, caller.lineno, sql, repr(e))
# tdLog.exit("%s(%d) failed: sql:%s, %s" % args)
raise
return self.affectedRows
class DbConnNative(DbConn):
def __init__(self):
super().__init__()
@ -623,7 +664,7 @@ class DbConnNative(DbConn):
# self._cursor.execute('use db') # do this at the beginning of every step
# Open connection
self._tdSql = TDSql()
self._tdSql = MyTDSql()
self._tdSql.init(self._cursor)
def close(self):
@ -1213,9 +1254,15 @@ class Task():
self._executeInternal(te, wt) # TODO: no return value?
except taos.error.ProgrammingError as err:
errno2 = err.errno if (err.errno > 0) else 0x80000000 + err.errno # correct error scheme
if ( errno2 in [
if ( gConfig.continue_on_exception ): # user choose to continue
self.logDebug("[=] Continue after TAOS exception: errno=0x{:X}, msg: {}, SQL: {}".format(errno2, err, self._lastSql))
self._err = err
elif ( errno2 in [
0x05, # TSDB_CODE_RPC_NOT_READY
0x200, 0x360, 0x362, 0x36A, 0x36B, 0x36D, 0x381, 0x380, 0x383, 0x503,
0x200, 0x360, 0x362, 0x36A, 0x36B, 0x36D,
0x381, 0x380, 0x383,
0x386, # DB is being dropped?!
0x503,
0x510, # vnode not in ready state
0x600,
1000 # REST catch-all error
@ -1227,8 +1274,8 @@ class Task():
errMsg = "[=] Unexpected Taos library exception: errno=0x{:X}, msg: {}, SQL: {}".format(errno2, err, self._lastSql)
self.logDebug(errMsg)
if gConfig.debug :
raise # so that we see full stack
else: # non-debug
# raise # so that we see full stack
traceback.print_exc()
print("\n\n----------------------------\nProgram ABORTED Due to Unexpected TAOS Error: \n\n{}\n".format(errMsg) +
"----------------------------\n")
# sys.exit(-1)
@ -1239,6 +1286,11 @@ class Task():
self._err = e
self._aborted = True
traceback.print_exc()
except BaseException as e :
self.logInfo("Python base exception encountered")
self._err = e
self._aborted = True
traceback.print_exc()
except :
self.logDebug("[=] Unexpected exception, SQL: {}".format(self._lastSql))
raise
@ -1591,31 +1643,6 @@ class Dice():
raise RuntimeError("Cannot throw dice before seeding it")
return random.randrange(start, stop)
# Anyone needing to carry out work should simply come here
# class WorkDispatcher():
# def __init__(self, dbState):
# # self.totalNumMethods = 2
# self.tasks = [
# # CreateTableTask(dbState), # Obsolete
# # DropTableTask(dbState),
# # AddDataTask(dbState),
# ]
# def throwDice(self):
# max = len(self.tasks) - 1
# dRes = random.randint(0, max)
# # logger.debug("Threw the dice in range [{},{}], and got: {}".format(0,max,dRes))
# return dRes
# def pickTask(self):
# dice = self.throwDice()
# return self.tasks[dice]
# def doWork(self, workerThread):
# task = self.pickTask()
# task.execute(workerThread)
class LoggingFilter(logging.Filter):
def filter(self, record: logging.LogRecord):
if ( record.levelno >= logging.INFO ) :
@ -1633,46 +1660,15 @@ class MyLoggingAdapter(logging.LoggerAdapter):
# return '[%s] %s' % (self.extra['connid'], msg), kwargs
class SvcManager:
MAX_QUEUE_SIZE = 10000
def __init__(self):
print("Starting service manager")
print("Starting TDengine Service Manager")
signal.signal(signal.SIGTERM, self.sigIntHandler)
signal.signal(signal.SIGINT, self.sigIntHandler)
signal.signal(signal.SIGUSR1, self.sigUsrHandler)
self.ioThread = None
self.subProcess = None
self.shouldStop = False
# self.status = MainExec.STATUS_RUNNING # set inside _startTaosService()
signal.signal(signal.SIGUSR1, self.sigUsrHandler) # different handler!
def svcOutputReader(self, out: IO, queue):
# Important Reference: https://stackoverflow.com/questions/375427/non-blocking-read-on-a-subprocess-pipe-in-python
print("This is the svcOutput Reader...")
# for line in out :
for line in iter(out.readline, b''):
# print("Finished reading a line: {}".format(line))
# print("Adding item to queue...")
line = line.decode("utf-8").rstrip()
queue.put(line) # This might block, and then causing "out" buffer to block
print("_i", end="", flush=True)
# Trim the queue if necessary
oneTenthQSize = self.MAX_QUEUE_SIZE // 10
if (queue.qsize() >= (self.MAX_QUEUE_SIZE - oneTenthQSize) ) : # 90% full?
print("Triming IPC queue by: {}".format(oneTenthQSize))
for i in range(0, oneTenthQSize) :
try:
queue.get_nowait()
except Empty:
break # break out of for loop, no more trimming
if self.shouldStop :
print("Stopping to read output from sub process")
break
# queue.put(line)
print("\nNo more output (most likely) from IO thread managing TDengine service") # meaning sub process must have died
out.close()
self.inSigHandler = False
# self._status = MainExec.STATUS_RUNNING # set inside _startTaosService()
self.svcMgrThread = None
def _doMenu(self):
choice = ""
@ -1695,10 +1691,10 @@ class SvcManager:
def sigUsrHandler(self, signalNumber, frame) :
print("Interrupting main thread execution upon SIGUSR1")
if self.status != MainExec.STATUS_RUNNING :
if self.inSigHandler : # already
print("Ignoring repeated SIG...")
return # do nothing if it's already not running
self.status = MainExec.STATUS_STOPPING
self.inSigHandler = True
choice = self._doMenu()
if choice == "1" :
@ -1711,67 +1707,227 @@ class SvcManager:
else:
raise RuntimeError("Invalid menu choice: {}".format(choice))
self.inSigHandler = False
def sigIntHandler(self, signalNumber, frame):
print("Sig INT Handler starting...")
if self.status != MainExec.STATUS_RUNNING :
if self.inSigHandler :
print("Ignoring repeated SIG_INT...")
return
self.inSigHandler = True
self.status = MainExec.STATUS_STOPPING # immediately set our status
self.stopTaosService()
print("INT signal handler returning...")
self.inSigHandler = False
def sigHandlerResume(self) :
print("Resuming TDengine service manager thread (main thread)...\n\n")
self.status = MainExec.STATUS_RUNNING
def joinIoThread(self):
if self.ioThread :
self.ioThread.join()
self.ioThread = None
def _checkServiceManagerThread(self):
if self.svcMgrThread: # valid svc mgr thread
if self.svcMgrThread.isStopped(): # done?
self.svcMgrThread.procIpcBatch() # one last time. TODO: appropriate?
self.svcMgrThread = None # no more
def _procIpcAll(self):
while self.svcMgrThread : # for as long as the svc mgr thread is still here
self.svcMgrThread.procIpcBatch() # regular processing,
time.sleep(0.5) # pause, before next round
self._checkServiceManagerThread()
print("Service Manager Thread (with subprocess) has ended, main thread now exiting...")
def startTaosService(self):
if self.svcMgrThread:
raise RuntimeError("Cannot start TAOS service when one may already be running")
self.svcMgrThread = ServiceManagerThread() # create the object
self.svcMgrThread.start()
print("TAOS service started, printing out output...")
self.svcMgrThread.procIpcBatch(trimToTarget=10, forceOutput=True) # for printing 10 lines
print("TAOS service started")
def stopTaosService(self, outputLines = 20):
print("Terminating Service Manager Thread (SMT) execution...")
if not self.svcMgrThread:
raise RuntimeError("Unexpected empty svc mgr thread")
self.svcMgrThread.stop()
if self.svcMgrThread.isStopped():
self.svcMgrThread.procIpcBatch(outputLines) # one last time
self.svcMgrThread = None
print("----- End of TDengine Service Output -----\n")
print("SMT execution terminated")
else:
print("WARNING: SMT did not terminate as expected")
def run(self):
self.startTaosService()
self._procIpcAll() # pump/process all the messages
if self.svcMgrThread: # if sig handler hasn't destroyed it by now
self.stopTaosService() # should have started already
class ServiceManagerThread:
MAX_QUEUE_SIZE = 10000
def __init__(self):
self._tdeSubProcess = None
self._thread = None
self._status = None
def getStatus(self):
return self._status
def isRunning(self):
# return self._thread and self._thread.is_alive()
return self._status == MainExec.STATUS_RUNNING
def isStopping(self):
return self._status == MainExec.STATUS_STOPPING
def isStopped(self):
return self._status == MainExec.STATUS_STOPPED
# Start the thread (with sub process), and wait for the sub service
# to become fully operational
def start(self):
if self._thread :
raise RuntimeError("Unexpected _thread")
if self._tdeSubProcess :
raise RuntimeError("TDengine sub process already created/running")
self._status = MainExec.STATUS_STARTING
self._tdeSubProcess = TdeSubProcess()
self._tdeSubProcess.start()
self._ipcQueue = Queue()
self._thread = threading.Thread(
target=self.svcOutputReader,
args=(self._tdeSubProcess.getStdOut(), self._ipcQueue))
self._thread.daemon = True # thread dies with the program
self._thread.start()
# wait for service to start
for i in range(0, 10) :
time.sleep(1.0)
# self.procIpcBatch() # don't pump message during start up
print("_zz_", end="", flush=True)
if self._status == MainExec.STATUS_RUNNING :
logger.info("[] TDengine service READY to process requests")
return # now we've started
raise RuntimeError("TDengine service did not start successfully") # TODO: handle this better?
def stop(self):
# can be called from both main thread or signal handler
print("Terminating TDengine service running as the sub process...")
if self.isStopped():
print("Service already stopped")
return
if self.isStopping():
print("Service is already being stopped")
return
# Linux will send Control-C generated SIGINT to the TDengine process already, ref: https://unix.stackexchange.com/questions/176235/fork-and-how-signals-are-delivered-to-processes
if not self._tdeSubProcess :
raise RuntimeError("sub process object missing")
self._status = MainExec.STATUS_STOPPING
self._tdeSubProcess.stop()
if self._tdeSubProcess.isRunning(): # still running
print("FAILED to stop sub process, it is still running... pid = {}".format(self.subProcess.pid))
else:
self._tdeSubProcess = None # not running any more
self.join() # stop the thread, change the status, etc.
def join(self):
# TODO: sanity check
if not self.isStopping():
raise RuntimeError("Unexpected status when ending svc mgr thread: {}".format(self._status))
if self._thread :
self._thread.join()
self._thread = None
self._status = MainExec.STATUS_STOPPED
else :
print("Joining empty thread, doing nothing")
def _trimQueue(self, targetSize):
if targetSize <= 0:
return # do nothing
q = self._ipcQueue
if (q.qsize() <= targetSize ) : # no need to trim
return
logger.debug("Triming IPC queue to target size: {}".format(targetSize))
itemsToTrim = q.qsize() - targetSize
for i in range(0, itemsToTrim) :
try:
q.get_nowait()
except Empty:
break # break out of for loop, no more trimming
TD_READY_MSG = "TDengine is initialized successfully"
def _procIpcBatch(self):
def procIpcBatch(self, trimToTarget = 0, forceOutput = False):
self._trimQueue(trimToTarget) # trim if necessary
# Process all the output generated by the underlying sub process, managed by IO thread
print("<", end="", flush=True)
while True :
try:
line = self.ipcQueue.get_nowait() # getting output at fast speed
print("_o", end="", flush=True)
if self.status == MainExec.STATUS_STARTING : # we are starting, let's see if we have started
if line.find(self.TD_READY_MSG) != -1 : # found
self.status = MainExec.STATUS_RUNNING
line = self._ipcQueue.get_nowait() # getting output at fast speed
self._printProgress("_o")
except Empty:
# time.sleep(2.3) # wait only if there's no output
# no more output
print(".>", end="", flush=True)
return # we are done with THIS BATCH
else: # got line
print(line)
def _procIpcAll(self):
while True :
print("<", end="", flush=True)
self._procIpcBatch() # process one batch
# check if the ioThread is still running
if (not self.ioThread) or (not self.ioThread.is_alive()):
print("IO Thread (with subprocess) has ended, main thread now exiting...")
self.stopTaosService()
self._procIpcBatch() # one more batch
return # TODO: maybe one last batch?
# Maybe handler says we should exit now
if self.shouldStop:
print("Main thread ending all IPC processing with IOThread/SubProcess")
self._procIpcBatch() # one more batch
return
else: # got line, printing out
if forceOutput:
logger.info(line)
else:
logger.debug(line)
print(">", end="", flush=True)
time.sleep(0.5)
def startTaosService(self):
_ProgressBars = ["--", "//", "||", "\\\\"]
def _printProgress(self, msg): # TODO: assuming 2 chars
print(msg, end="", flush=True)
pBar = self._ProgressBars[Dice.throw(4)]
print(pBar, end="", flush=True)
print('\b\b\b\b', end="", flush=True)
def svcOutputReader(self, out: IO, queue):
# Important Reference: https://stackoverflow.com/questions/375427/non-blocking-read-on-a-subprocess-pipe-in-python
# print("This is the svcOutput Reader...")
# for line in out :
for line in iter(out.readline, b''):
# print("Finished reading a line: {}".format(line))
# print("Adding item to queue...")
line = line.decode("utf-8").rstrip()
queue.put(line) # This might block, and then causing "out" buffer to block
self._printProgress("_i")
if self._status == MainExec.STATUS_STARTING : # we are starting, let's see if we have started
if line.find(self.TD_READY_MSG) != -1 : # found
self._status = MainExec.STATUS_RUNNING
# Trim the queue if necessary: TODO: try this 1 out of 10 times
self._trimQueue(self.MAX_QUEUE_SIZE * 9 // 10) # trim to 90% size
if self.isStopping() : # TODO: use thread status instead
print("_w", end="", flush=True) # WAITING for stopping sub process to finish its outptu
# queue.put(line)
print("\nNo more output from IO thread managing TDengine service") # meaning sub process must have died
out.close()
class TdeSubProcess:
def __init__(self):
self.subProcess = None
def getStdOut(self):
return self.subProcess.stdout
def isRunning(self):
return self.subProcess != None
def start(self):
ON_POSIX = 'posix' in sys.builtin_module_names
svcCmd = ['../../build/build/bin/taosd', '-c', '../../build/test/cfg']
# svcCmd = ['vmstat', '1']
@ -1783,40 +1939,17 @@ class SvcManager:
stdout=subprocess.PIPE,
# bufsize=1, # not supported in binary mode
close_fds=ON_POSIX) # had text=True, which interferred with reading EOF
self.ipcQueue = Queue()
if self.ioThread :
raise RuntimeError("Corrupt thread state")
self.ioThread = threading.Thread(target=self.svcOutputReader, args=(self.subProcess.stdout, self.ipcQueue))
self.ioThread.daemon = True # thread dies with the program
self.ioThread.start()
self.shouldStop = False # don't let the main loop stop
self.status = MainExec.STATUS_STARTING
# wait for service to start
for i in range(0, 10) :
time.sleep(1.0)
self._procIpcBatch() # pump messages
print("_zz_", end="", flush=True)
if self.status == MainExec.STATUS_RUNNING :
print("TDengine service READY to process requests")
return # now we've started
raise RuntimeError("TDengine service did not start successfully") # TODO: handle this better?
def stopTaosService(self):
# can be called from both main thread or signal handler
print("Terminating TDengine service running as the sub process...")
# Linux will send Control-C generated SIGINT to the TDengine process already, ref: https://unix.stackexchange.com/questions/176235/fork-and-how-signals-are-delivered-to-processes
def stop(self):
if not self.subProcess:
print("Process already stopped")
print("Sub process already stopped")
return
retCode = self.subProcess.poll()
if retCode : # valid return code, process ended
self.subProcess = None
else: # process still alive, let's interrupt it
print("Sub process still running, sending SIG_INT and waiting for it to stop...")
print("Sub process is running, sending SIG_INT and waiting for it to terminate...")
self.subProcess.send_signal(signal.SIGINT) # sub process should end, then IPC queue should end, causing IO thread to end
try :
self.subProcess.wait(10)
@ -1826,41 +1959,20 @@ class SvcManager:
print("TDengine service process terminated successfully from SIG_INT")
self.subProcess = None
if self.subProcess and (not self.subProcess.poll()):
print("Sub process is still running... pid = {}".format(self.subProcess.pid))
self.shouldStop = True
self.joinIoThread()
def run(self):
self.startTaosService()
# proc = subprocess.Popen(['echo', '"to stdout"'],
# stdout=subprocess.PIPE,
# )
# stdout_value = proc.communicate()[0]
# print('\tstdout: {}'.format(repr(stdout_value)))
self._procIpcAll()
print("End of loop reading from IPC queue")
self.joinIoThread() # should have started already
print("SvcManager Run Finished")
class ClientManager:
def __init__(self):
print("Starting service manager")
signal.signal(signal.SIGTERM, self.sigIntHandler)
signal.signal(signal.SIGINT, self.sigIntHandler)
self.status = MainExec.STATUS_RUNNING
self._status = MainExec.STATUS_RUNNING
self.tc = None
def sigIntHandler(self, signalNumber, frame):
if self.status != MainExec.STATUS_RUNNING :
if self._status != MainExec.STATUS_RUNNING :
print("Ignoring repeated SIGINT...")
return # do nothing if it's already not running
self.status = MainExec.STATUS_STOPPING # immediately set our status
self._status = MainExec.STATUS_STOPPING # immediately set our status
print("Terminating program...")
self.tc.requestToStop()
@ -1904,16 +2016,16 @@ class ClientManager:
self._printLastNumbers()
dbManager = DbManager() # Regular function
Dice.seed(0) # initial seeding of dice
thPool = ThreadPool(gConfig.num_threads, gConfig.max_steps)
self.tc = ThreadCoordinator(thPool, dbManager)
self.tc.run()
# print("exec stats: {}".format(self.tc.getExecStats()))
# print("TC failed = {}".format(self.tc.isFailed()))
self.conclude()
if gConfig.auto_start_service :
svcMgr.stopTaosService()
# Print exec status, etc., AFTER showing messages from the server
self.conclude()
# print("TC failed (2) = {}".format(self.tc.isFailed()))
return 1 if self.tc.isFailed() else 0 # Linux return code: ref https://shapeshed.com/unix-exit-codes/
@ -1926,7 +2038,7 @@ class MainExec:
STATUS_STARTING = 1
STATUS_RUNNING = 2
STATUS_STOPPING = 3
# STATUS_STOPPED = 3 # Not used yet
STATUS_STOPPED = 4
@classmethod
def runClient(cls):
@ -2012,14 +2124,12 @@ def main():
help='Maximum number of steps to run (default: 100)')
parser.add_argument('-t', '--num-threads', action='store', default=5, type=int,
help='Number of threads to run (default: 10)')
parser.add_argument('-x', '--continue-on-exception', action='store_true',
help='Continue execution after encountering unexpected/disallowed errors/exceptions (default: false)')
global gConfig
gConfig = parser.parse_args()
# if len(sys.argv) == 1:
# parser.print_help()
# sys.exit()
# Logging Stuff
global logger
_logger = logging.getLogger('CrashGen') # real logger
@ -2034,15 +2144,14 @@ def main():
else:
logger.setLevel(logging.INFO)
Dice.seed(0) # initial seeding of dice
# Run server or client
if gConfig.run_tdengine : # run server
MainExec.runService()
else :
return MainExec.runClient()
# logger.info("Crash_Gen execution finished")
if __name__ == "__main__":
exitCode = main()
# print("Exiting with code: {}".format(exitCode))

View File

@ -0,0 +1,94 @@
###################################################################
# Copyright (c) 2016 by TAOS Technologies, Inc.
# All rights reserved.
#
# This file is proprietary and confidential to TAOS Technologies.
# No part of this file may be reproduced, stored, transmitted,
# disclosed or used in any form or by any means other than as
# expressly provided by the written permission from Jianhui Tao
#
###################################################################
# -*- coding: utf-8 -*-
import sys
import os
import csv
import random
from util.log import *
from util.cases import *
from util.sql import *
from util.dnodes import *
class TDTestCase:
def init(self, conn, logSql):
tdLog.debug("start to execute %s" % __file__)
tdSql.init(conn.cursor(), logSql)
self.csvfile = "/tmp/file.csv"
self.rows = 10000
self.ntables = 1
self.startTime = 1520000010000
def genRandomStr(self, maxLen):
H = 'ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789'
salt = ''
if maxLen <= 1:
maxLen = 2
l = random.randint(1,maxLen)
for i in range(l):
salt += random.choice(H)
return salt
def createCSVFile(self):
f = open(self.csvfile,'w',encoding='utf-8')
csv_writer = csv.writer(f, quoting=csv.QUOTE_NONNUMERIC)
for i in range(self.rows):
csv_writer.writerow([self.startTime + i,
self.genRandomStr(5),
self.genRandomStr(6),
self.genRandomStr(7),
self.genRandomStr(8),
self.genRandomStr(9),
self.genRandomStr(10),
self.genRandomStr(11),
self.genRandomStr(12),
self.genRandomStr(13),
self.genRandomStr(14)])
f.close()
def destroyCSVFile(self):
os.remove(self.csvfile)
def run(self):
self.createCSVFile()
tdDnodes.stop(1)
tdDnodes.deploy(1)
tdDnodes.start(1)
tdSql.execute('reset query cache')
tdSql.execute('drop database if exists db')
tdSql.execute('create database db')
tdSql.execute('use db')
tdSql.execute('''create table tbx (ts TIMESTAMP,
collect_area NCHAR(5),
device_id BINARY(6),
imsi BINARY(7),
imei BINARY(8),
mdn BINARY(9),
net_type BINARY(10),
mno NCHAR(11),
province NCHAR(12),
city NCHAR(13),
alarm BINARY(14))''')
tdSql.execute("import into tbx file \'%s\'"%(self.csvfile))
tdSql.query('select * from tbx')
tdSql.checkRows(self.rows)
def stop(self):
self.destroyCSVFile()
tdSql.close()
tdLog.success("%s successfully executed" % __file__)
tdDnodes.stop(1)
tdCases.addWindows(__file__, TDTestCase())
tdCases.addLinux(__file__, TDTestCase())

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

View File

@ -22,23 +22,47 @@ class TDSimClient:
def __init__(self):
self.testCluster = False
self.cfgDict = {
"numOfLogLines": "100000000",
"numOfThreadsPerCore": "2.0",
"locale": "en_US.UTF-8",
"charset": "UTF-8",
"asyncLog": "0",
"anyIp": "0",
"sdbDebugFlag": "135",
"rpcDebugFlag": "135",
"tmrDebugFlag": "131",
"cDebugFlag": "135",
"udebugFlag": "135",
"jnidebugFlag": "135",
"qdebugFlag": "135",
}
def init(self, path):
self.__init__()
self.path = path
def getLogDir(self):
self.logDir = "%s/sim/psim/log" % (self.path)
return self.logDir
def getCfgDir(self):
self.cfgDir = "%s/sim/psim/cfg" % (self.path)
return self.cfgDir
def setTestCluster(self, value):
self.testCluster = value
def addExtraCfg(self, option, value):
self.cfgDict.update({option: value})
def cfg(self, option, value):
cmd = "echo '%s %s' >> %s" % (option, value, self.cfgPath)
if os.system(cmd) != 0:
tdLog.exit(cmd)
def deploy(self):
self.logDir = "%s/sim/psim/log" % (self.path,)
self.logDir = "%s/sim/psim/log" % (self.path)
self.cfgDir = "%s/sim/psim/cfg" % (self.path)
self.cfgPath = "%s/sim/psim/cfg/taos.cfg" % (self.path)
@ -46,11 +70,11 @@ class TDSimClient:
if os.system(cmd) != 0:
tdLog.exit(cmd)
cmd = "rm -rf " + self.cfgDir
cmd = "mkdir -p " + self.logDir
if os.system(cmd) != 0:
tdLog.exit(cmd)
cmd = "mkdir -p " + self.logDir
cmd = "rm -rf " + self.cfgDir
if os.system(cmd) != 0:
tdLog.exit(cmd)
@ -66,19 +90,10 @@ class TDSimClient:
self.cfg("masterIp", "192.168.0.1")
self.cfg("secondIp", "192.168.0.2")
self.cfg("logDir", self.logDir)
self.cfg("numOfLogLines", "100000000")
self.cfg("numOfThreadsPerCore", "2.0")
self.cfg("locale", "en_US.UTF-8")
self.cfg("charset", "UTF-8")
self.cfg("asyncLog", "0")
self.cfg("anyIp", "0")
self.cfg("sdbDebugFlag", "135")
self.cfg("rpcDebugFlag", "135")
self.cfg("tmrDebugFlag", "131")
self.cfg("cDebugFlag", "135")
self.cfg("udebugFlag", "135")
self.cfg("jnidebugFlag", "135")
self.cfg("qdebugFlag", "135")
for key, value in self.cfgDict.items():
self.cfg(key, value)
tdLog.debug("psim is deployed and configured by %s" % (self.cfgPath))
@ -378,6 +393,9 @@ class TDDnodes:
for i in range(len(self.dnodes)):
self.dnodes[i].init(self.path)
self.sim = TDSimClient()
self.sim.init(self.path)
def setTestCluster(self, value):
self.testCluster = value
@ -385,8 +403,6 @@ class TDDnodes:
self.valgrind = value
def deploy(self, index):
self.sim = TDSimClient()
self.sim.init(self.path)
self.sim.setTestCluster(self.testCluster)
if (self.simDeployed == False):
@ -474,5 +490,11 @@ class TDDnodes:
def getSimCfgPath(self):
return self.sim.getCfgDir()
def getSimLogPath(self):
return self.sim.getLogDir()
def addSimExtraCfg(self, option, value):
self.sim.addExtraCfg(option, value)
tdDnodes = TDDnodes()

View File

@ -71,7 +71,8 @@ class TDSql:
except Exception as e:
caller = inspect.getframeinfo(inspect.stack()[1][0])
args = (caller.filename, caller.lineno, sql, repr(e))
tdLog.exit("%s(%d) failed: sql:%s, %s" % args)
tdLog.notice("%s(%d) failed: sql:%s, %s" % args)
raise Exception(repr(e))
return self.queryRows
def waitedQuery(self, sql, expectRows, timeout):
@ -89,7 +90,8 @@ class TDSql:
except Exception as e:
caller = inspect.getframeinfo(inspect.stack()[1][0])
args = (caller.filename, caller.lineno, sql, repr(e))
tdLog.exit("%s(%d) failed: sql:%s, %s" % args)
tdLog.notice("%s(%d) failed: sql:%s, %s" % args)
raise Exception(repr(e))
return (self.queryRows, timeout)
def checkRows(self, expectRows):
@ -158,7 +160,8 @@ class TDSql:
except Exception as e:
caller = inspect.getframeinfo(inspect.stack()[1][0])
args = (caller.filename, caller.lineno, sql, repr(e))
tdLog.exit("%s(%d) failed: sql:%s, %s" % args)
tdLog.notice("%s(%d) failed: sql:%s, %s" % args)
raise Exception(repr(e))
return self.affectedRows
def checkAffectedRows(self, expectAffectedRows):

View File

@ -1,7 +1,7 @@
system sh/stop_dnodes.sh
system sh/ip.sh -i 1 -s up
system sh/deploy.sh -n dnode1 -m 192.168.0.1 -i 192.168.0.1
system sh/cfg.sh -n dnode1 -c commitLog -v 0
system sh/deploy.sh -n dnode1 -i 1
system sh/cfg.sh -n dnode1 -c walLevel -v 0
system sh/exec.sh -n dnode1 -s start
sleep 3000
sql connect

View File

@ -1,3 +1,4 @@
sleep 2000
run general/parser/alter.sim
sleep 2000
run general/parser/alter1.sim
@ -7,7 +8,6 @@ sleep 2000
run general/parser/auto_create_tb.sim
sleep 2000
run general/parser/auto_create_tb_drop_tb.sim
sleep 2000
run general/parser/col_arithmetic_operation.sim
sleep 2000
@ -23,77 +23,81 @@ run general/parser/create_tb.sim
sleep 2000
run general/parser/dbtbnameValidate.sim
sleep 2000
run general/parser/fill.sim
sleep 2000
run general/parser/fill_stb.sim
sleep 2000
#run general/parser/fill_us.sim #
sleep 2000
run general/parser/first_last.sim
sleep 2000
run general/parser/import_commit1.sim
sleep 2000
run general/parser/import_commit2.sim
sleep 2000
run general/parser/import_commit3.sim
sleep 2000
run general/parser/insert_tb.sim
sleep 2000
run general/parser/first_last.sim
sleep 2000
#run general/parser/import_file.sim
sleep 2000
run general/parser/lastrow.sim
sleep 2000
run general/parser/nchar.sim
sleep 2000
#run general/parser/null_char.sim
sleep 2000
run general/parser/single_row_in_tb.sim
sleep 2000
run general/parser/select_from_cache_disk.sim
sleep 2000
run general/parser/selectResNum.sim
sleep 2000
run general/parser/mixed_blocks.sim
sleep 2000
run general/parser/limit1.sim
sleep 2000
run general/parser/limit.sim
sleep 2000
run general/parser/limit1_tblocks100.sim
sleep 2000
run general/parser/select_across_vnodes.sim
sleep 2000
run general/parser/slimit1.sim
sleep 2000
run general/parser/tbnameIn.sim
sleep 2000
run general/parser/projection_limit_offset.sim
sleep 2000
run general/parser/limit2.sim
sleep 2000
run general/parser/fill.sim
sleep 2000
run general/parser/fill_stb.sim
sleep 2000
run general/parser/where.sim
sleep 2000
run general/parser/slimit.sim
sleep 2000
run general/parser/select_with_tags.sim
sleep 2000
run general/parser/interp.sim
run general/parser/insert_tb.sim
sleep 2000
run general/parser/tags_dynamically_specifiy.sim
sleep 2000
run general/parser/groupby.sim
run general/parser/interp.sim
sleep 2000
run general/parser/lastrow.sim
sleep 2000
run general/parser/limit.sim
sleep 2000
run general/parser/limit1.sim
sleep 2000
run general/parser/limit1_tblocks100.sim
sleep 2000
run general/parser/limit2.sim
sleep 2000
run general/parser/mixed_blocks.sim
sleep 2000
run general/parser/nchar.sim
sleep 2000
run general/parser/null_char.sim
sleep 2000
run general/parser/selectResNum.sim
sleep 2000
run general/parser/select_across_vnodes.sim
sleep 2000
run general/parser/select_from_cache_disk.sim
sleep 2000
run general/parser/set_tag_vals.sim
sleep 2000
run general/parser/single_row_in_tb.sim
sleep 2000
run general/parser/slimit.sim
sleep 2000
run general/parser/slimit1.sim
sleep 2000
run general/parser/slimit_alter_tags.sim
sleep 2000
run general/parser/tbnameIn.sim
sleep 2000
run general/parser/slimit_alter_tags.sim # persistent failed
sleep 2000
run general/parser/join.sim
sleep 2000
run general/parser/join_multivnode.sim
sleep 2000
run general/parser/repeatAlter.sim
run general/parser/projection_limit_offset.sim
sleep 2000
run general/parser/binary_escapeCharacter.sim
run general/parser/select_with_tags.sim
sleep 2000
run general/parser/bug.sim
run general/parser/groupby.sim
sleep 2000
run general/parser/union.sim
sleep 2000
run general/parser/sliding.sim
sleep 2000
run general/parser/fill_us.sim
sleep 2000
run general/parser/tags_filter.sim
#sleep 2000
#run general/parser/repeatStream.sim

View File

@ -1,5 +1,21 @@
#system sh/stop_dnodes.sh
#system sh/deploy.sh -n dnode1 -i 1
#system sh/cfg.sh -n dnode5 -c maxtablesPerVnode -v 10000
#system sh/cfg.sh -n dnode1 -c numOfTotalVnodes -v 256
#system sh/exec.sh -n dnode1 -s start
#sql connect
#$db = db1
#sql create database $db
#sql use $db
#$stb = stb1
#sql create table $stb (ts timestamp, c1 int) tags(t1 int, t2 binary(8))
$tblStart = 0
$tblEnd = 10000
$tblEnd = 1000
$tsStart = 1325347200000 # 2012-01-01 00:00:00.000
###############################################################
@ -10,7 +26,6 @@ $stb = stb1
sql use $db
######sql create table $stb (ts timestamp, c1 int) tags(t1 int, t2 binary(8))
$tagPrex = ' . tag
@ -19,12 +34,15 @@ while $i < $tblEnd
$tb = tb . $i
$tagBinary = $tagPrex . $i
$tagBinary = $tagBinary . '
# print create table if not exists $tb using $stb tags ( $i , $tagBinary )
sql create table if not exists $tb using $stb tags ( $i , $tagBinary )
$i = $i + 1
endw
print ====================== client1_0 create table end, start insert data ............
sql select count(tbname) from $stb
print select count(tbname) from $stb
print data00 $data00
$rowsPerLoop = 100
$ts = $tsStart
@ -54,3 +72,4 @@ while $i < $tblEnd
print ====================== client1_0 insert data complete once ............
endi
endw
print ====================== client1_0 success and auto end =====================

View File

@ -3,41 +3,49 @@ system sh/deploy.sh -n dnode1 -i 1
system sh/deploy.sh -n dnode2 -i 2
system sh/deploy.sh -n dnode3 -i 3
system sh/deploy.sh -n dnode4 -i 4
system sh/deploy.sh -n dnode5 -i 5
system sh/cfg.sh -n dnode1 -c numOfMnodes -v 3
system sh/cfg.sh -n dnode2 -c numOfMnodes -v 3
system sh/cfg.sh -n dnode3 -c numOfMnodes -v 3
system sh/cfg.sh -n dnode4 -c numOfMnodes -v 3
system sh/cfg.sh -n dnode5 -c numOfMnodes -v 3
system sh/cfg.sh -n dnode1 -c walLevel -v 1
system sh/cfg.sh -n dnode2 -c walLevel -v 1
system sh/cfg.sh -n dnode3 -c walLevel -v 1
system sh/cfg.sh -n dnode4 -c walLevel -v 1
system sh/cfg.sh -n dnode5 -c walLevel -v 1
system sh/cfg.sh -n dnode1 -c balanceInterval -v 10
system sh/cfg.sh -n dnode2 -c balanceInterval -v 10
system sh/cfg.sh -n dnode3 -c balanceInterval -v 10
system sh/cfg.sh -n dnode4 -c balanceInterval -v 10
system sh/cfg.sh -n dnode5 -c balanceInterval -v 10
system sh/cfg.sh -n dnode1 -c numOfTotalVnodes -v 4
system sh/cfg.sh -n dnode2 -c numOfTotalVnodes -v 4
system sh/cfg.sh -n dnode3 -c numOfTotalVnodes -v 4
system sh/cfg.sh -n dnode4 -c numOfTotalVnodes -v 4
system sh/cfg.sh -n dnode1 -c numOfTotalVnodes -v 256
system sh/cfg.sh -n dnode2 -c numOfTotalVnodes -v 256
system sh/cfg.sh -n dnode3 -c numOfTotalVnodes -v 256
system sh/cfg.sh -n dnode4 -c numOfTotalVnodes -v 256
system sh/cfg.sh -n dnode5 -c numOfTotalVnodes -v 256
system sh/cfg.sh -n dnode1 -c alternativeRole -v 0
system sh/cfg.sh -n dnode2 -c alternativeRole -v 0
system sh/cfg.sh -n dnode3 -c alternativeRole -v 0
system sh/cfg.sh -n dnode4 -c alternativeRole -v 0
system sh/cfg.sh -n dnode5 -c alternativeRole -v 0
system sh/cfg.sh -n dnode1 -c maxtablesPerVnode -v 1000
system sh/cfg.sh -n dnode2 -c maxtablesPerVnode -v 1000
system sh/cfg.sh -n dnode3 -c maxtablesPerVnode -v 1000
system sh/cfg.sh -n dnode4 -c maxtablesPerVnode -v 1000
system sh/cfg.sh -n dnode1 -c maxtablesPerVnode -v 5000
system sh/cfg.sh -n dnode2 -c maxtablesPerVnode -v 5000
system sh/cfg.sh -n dnode3 -c maxtablesPerVnode -v 5000
system sh/cfg.sh -n dnode4 -c maxtablesPerVnode -v 5000
system sh/cfg.sh -n dnode5 -c maxtablesPerVnode -v 5000
system sh/cfg.sh -n dnode1 -c arbitrator -v $arbitrator
system sh/cfg.sh -n dnode2 -c arbitrator -v $arbitrator
system sh/cfg.sh -n dnode3 -c arbitrator -v $arbitrator
system sh/cfg.sh -n dnode4 -c arbitrator -v $arbitrator
system sh/cfg.sh -n dnode5 -c arbitrator -v $arbitrator
print ============== step0: start tarbitrator
system sh/exec_tarbitrator.sh -s start
@ -64,407 +72,159 @@ $stb = stb1
sql create table $stb (ts timestamp, c1 int) tags(t1 int, t2 binary(8))
print ============== step4: start 10 client1/ 10 client2/ 10 client3/ 10 client4/ 1 client5
#run_back unique/cluster/client_test.sim
run_back unique/cluster/client1_0.sim
run_back unique/cluster/client1_1.sim
run_back unique/cluster/client1_2.sim
run_back unique/cluster/client1_3.sim
run_back unique/cluster/client2_0.sim
run_back unique/cluster/client2_1.sim
run_back unique/cluster/client2_2.sim
run_back unique/cluster/client2_3.sim
run_back unique/cluster/client3.sim
run_back unique/cluster/client4.sim
#run_back unique/cluster/client1_1.sim
#run_back unique/cluster/client1_2.sim
#run_back unique/cluster/client1_3.sim
#run_back unique/cluster/client2_0.sim
#run_back unique/cluster/client2_1.sim
#run_back unique/cluster/client2_2.sim
#run_back unique/cluster/client2_3.sim
#run_back unique/cluster/client3.sim
#run_back unique/cluster/client4.sim
sleep 20000
wait_subsim_insert_complete_create_tables:
sql select count(tbname) from $stb
print select count(tbname) from $stb
print data00 $data00
if $data00 < 1000 then
sleep 3000
goto wait_subsim_insert_complete_create_tables
endi
wait_subsim_insert_data:
print select count(*) from $stb
sql select count(*) from $stb
print data00 $data00
if $data00 < 1 then
if $data00 < 1000 then
sleep 3000
goto wait_subsim_insert_data
endi
print wait for a while to let clients start insert data
sleep 5000
$loop_cnt = 0
loop_cluster_do:
print **** **** **** START loop cluster do (loop_cnt: $loop_cnt )**** **** **** ****
print ============== step5: start dnode4 and add into cluster, then wait dnode4 ready
print ============== step5: start dnode4/dnode5 and add into cluster, then wait ready
system sh/exec.sh -n dnode4 -s start
system sh/exec.sh -n dnode5 -s start
sql create dnode $hostname4
sql create dnode $hostname5
wait_dnode4_ready_0:
$cnt = $cnt + 1
if $cnt == 10 then
return -1
endi
sql show dnodes
if $rows != 4 then
sleep 2000
goto wait_dnode4_ready_0
endi
print $data0_1 $data1_1 $data2_1 $data3_1 $data4_1
print $data0_2 $data1_2 $data2_2 $data3_2 $data4_2
print $data0_3 $data1_3 $data2_3 $data3_3 $data4_3
print $data0_4 $data1_4 $data2_4 $data3_4 $data4_4
$dnode1Status = $data4_1
$dnode2Status = $data4_2
$dnode3Status = $data4_3
#$dnode4Status = $data4_4
if $loop_cnt == 0 then
$dnode4Status = $data4_4
elif $loop_cnt == 1 then
$dnode4Status = $data4_6
elif $loop_cnt == 2 then
$dnode4Status = $data4_8
else then
print **** **** **** END loop cluster do (loop_cnt: $loop_cnt )**** **** **** ****
return
endi
if $dnode4Status != ready then
sleep 2000
goto wait_dnode4_ready_0
endi
sleep 5000
print ============== step6: stop and drop dnode1, then remove data dir of dnode1
system sh/exec.sh -n dnode1 -s stop -x SIGINT
$cnt = 0
wait_dnode1_offline_0:
$cnt = $cnt + 1
if $cnt == 10 then
return -1
endi
sql show dnodes
if $rows != 4 then
sleep 2000
goto wait_dnode1_offline_0
endi
print $data0_1 $data1_1 $data2_1 $data3_1 $data4_1
print $data0_2 $data1_2 $data2_2 $data3_2 $data4_2
print $data0_3 $data1_3 $data2_3 $data3_3 $data4_3
print $data0_4 $data1_4 $data2_4 $data3_4 $data4_4
$dnode2Status = $data4_2
$dnode3Status = $data4_3
$dnode4Status = $data4_4
if $loop_cnt == 0 then
$dnode1Status = $data4_1
elif $loop_cnt == 1 then
$dnode1Status = $data4_5
elif $loop_cnt == 2 then
$dnode1Status = $data4_7
elif $loop_cnt == 3 then
$dnode1Status = $data4_9
else then
print **** **** **** END loop cluster do (loop_cnt: $loop_cnt )**** **** **** ****
return
endi
if $dnode1Status != offline then
sleep 2000
goto wait_dnode1_offline_0
endi
$cnt = 0
wait_mnode1_offline_0:
$cnt = $cnt + 1
if $cnt == 10 then
return -1
endi
print show mnodes
sql show mnodes
if $rows != 3 then
sleep 2000
goto wait_mnode1_offline_0
endi
print $data0_1 $data1_1 $data2_1 $data3_1 $data4_1
print $data0_2 $data1_2 $data2_2 $data3_2 $data4_2
print $data0_3 $data1_3 $data2_3 $data3_3 $data4_3
print $data0_4 $data1_4 $data2_4 $data3_4 $data4_4
$mnode1Status = $data2_1
$mnode2Status = $data2_2
$mnode3Status = $data2_3
$mnode4Status = $data2_4
if $loop_cnt == 0 then
$mnode1Status = $data2_1
elif $loop_cnt == 1 then
$mnode1Status = $data2_5
elif $loop_cnt == 2 then
$mnode1Status = $data2_7
elif $loop_cnt == 3 then
$mnode1Status = $data2_9
else then
print **** **** **** END loop cluster do (loop_cnt: $loop_cnt )**** **** **** ****
return
endi
if $mnode1Status != offline then
sleep 2000
goto wait_mnode1_offline_0
endi
sleep 5000
sql drop dnode $hostname1
sleep 5000
system rm -rf ../../../sim/dnode1/data
sleep 20000
$cnt = 0
wait_mnode4_slave_0:
$cnt = $cnt + 1
if $cnt == 10 then
return -1
endi
print show mnodes
sql show mnodes
if $rows != 3 then
sleep 2000
goto wait_mnode4_slave_0
endi
print show mnodes
print rows: $rows
print $data0_1 $data1_1 $data2_1 $data3_1 $data4_1
print $data0_2 $data1_2 $data2_2 $data3_2 $data4_2
print $data0_3 $data1_3 $data2_3 $data3_3 $data4_3
print $data0_4 $data1_4 $data2_4 $data3_4 $data4_4
$mnode1Status = $data2_1
$mnode2Status = $data2_2
$mnode3Status = $data2_3
$mnode4Status = $data2_4
print $data0_5 $data1_5 $data2_5 $data3_5 $data4_5
print $data0_6 $data1_6 $data2_6 $data3_6 $data4_6
print $data0_7 $data1_7 $data2_7 $data3_7 $data4_7
print $data0_8 $data1_8 $data2_8 $data3_8 $data4_8
print $data0_9 $data1_9 $data2_9 $data3_9 $data4_9
return -1
if $loop_cnt == 0 then
$mnode4Status = $data2_4
elif $loop_cnt == 1 then
$mnode4Status = $data2_6
elif $loop_cnt == 2 then
$mnode4Status = $data2_8
else then
print **** **** **** END loop cluster do (loop_cnt: $loop_cnt )**** **** **** ****
return
endi
if $mnode4Status != slave then
sleep 2000
goto wait_mnode4_slave_0
endi
print ============== step7: stop dnode2, waiting dnode4
print ============== step7: stop dnode2
system sh/exec.sh -n dnode2 -s stop -x SIGINT
sleep 5000
$cnt = 0
wait_dnode2_offline_0:
$cnt = $cnt + 1
if $cnt == 10 then
return -1
endi
sql show dnodes
if $rows != 3 then
sleep 2000
goto wait_dnode2_offline_0
endi
print $data0_1 $data1_1 $data2_1 $data3_1 $data4_1
print $data0_2 $data1_2 $data2_2 $data3_2 $data4_2
print $data0_3 $data1_3 $data2_3 $data3_3 $data4_3
print $data0_4 $data1_4 $data2_4 $data3_4 $data4_4
$dnode1Status = $data4_1
$dnode2Status = $data4_2
$dnode3Status = $data4_3
$dnode4Status = $data4_4
if $dnode2Status != offline then
sleep 2000
goto wait_dnode2_offline_0
endi
sleep 3000
print show mnodes
sql show mnodes
print show mnodes
print rows: $rows
print $data0_1 $data1_1 $data2_1 $data3_1 $data4_1
print $data0_2 $data1_2 $data2_2 $data3_2 $data4_2
print $data0_3 $data1_3 $data2_3 $data3_3 $data4_3
print $data0_4 $data1_4 $data2_4 $data3_4 $data4_4
print $data0_5 $data1_5 $data2_5 $data3_5 $data4_5
print $data0_6 $data1_6 $data2_6 $data3_6 $data4_6
print $data0_7 $data1_7 $data2_7 $data3_7 $data4_7
print $data0_8 $data1_8 $data2_8 $data3_8 $data4_8
print $data0_9 $data1_9 $data2_9 $data3_9 $data4_9
print ============== step8: restart dnode2, then wait sync end
system sh/exec.sh -n dnode2 -s start
$cnt = 0
wait_dnode2_ready_0:
$cnt = $cnt + 1
if $cnt == 10 then
return -1
endi
sql show dnodes
if $rows != 3 then
sleep 2000
goto wait_dnode2_ready_0
endi
print $data0_1 $data1_1 $data2_1 $data3_1 $data4_1
print $data0_2 $data1_2 $data2_2 $data3_2 $data4_2
print $data0_3 $data1_3 $data2_3 $data3_3 $data4_3
print $data0_4 $data1_4 $data2_4 $data3_4 $data4_4
$dnode1Status = $data4_1
$dnode2Status = $data4_2
$dnode3Status = $data4_3
$dnode4Status = $data4_4
if $dnode2Status != ready then
sleep 2000
goto wait_dnode2_ready_0
endi
sleep 3000
print show mnodes
sleep 20000
sql show mnodes
print show mnodes
print rows: $rows
print $data0_1 $data1_1 $data2_1 $data3_1 $data4_1
print $data0_2 $data1_2 $data2_2 $data3_2 $data4_2
print $data0_3 $data1_3 $data2_3 $data3_3 $data4_3
print $data0_4 $data1_4 $data2_4 $data3_4 $data4_4
print $data0_5 $data1_5 $data2_5 $data3_5 $data4_5
print $data0_6 $data1_6 $data2_6 $data3_6 $data4_6
print $data0_7 $data1_7 $data2_7 $data3_7 $data4_7
print $data0_8 $data1_8 $data2_8 $data3_8 $data4_8
print $data0_9 $data1_9 $data2_9 $data3_9 $data4_9
print ============== step9: stop dnode3, then wait sync end
system sh/exec.sh -n dnode3 -s stop -x SIGINT
sleep 3000
$cnt = 0
wait_dnode3_offline_0:
$cnt = $cnt + 1
if $cnt == 10 then
return -1
endi
sql show dnodes
if $rows != 3 then
sleep 2000
goto wait_dnode3_offline_0
endi
sleep 20000
sql show mnodes
print show mnodes
print rows: $rows
print $data0_1 $data1_1 $data2_1 $data3_1 $data4_1
print $data0_2 $data1_2 $data2_2 $data3_2 $data4_2
print $data0_3 $data1_3 $data2_3 $data3_3 $data4_3
print $data0_4 $data1_4 $data2_4 $data3_4 $data4_4
$dnode1Status = $data4_1
$dnode2Status = $data4_2
$dnode3Status = $data4_3
$dnode4Status = $data4_4
if $dnode3Status != offline then
sleep 2000
goto wait_dnode3_offline_0
endi
print $data0_5 $data1_5 $data2_5 $data3_5 $data4_5
print $data0_6 $data1_6 $data2_6 $data3_6 $data4_6
print $data0_7 $data1_7 $data2_7 $data3_7 $data4_7
print $data0_8 $data1_8 $data2_8 $data3_8 $data4_8
print $data0_9 $data1_9 $data2_9 $data3_9 $data4_9
print ============== step10: restart dnode3, then wait sync end
system sh/exec.sh -n dnode3 -s start
sleep 3000
$cnt = 0
wait_dnode3_ready_0:
$cnt = $cnt + 1
if $cnt == 10 then
return -1
endi
sql show dnodes
if $rows != 3 then
sleep 2000
goto wait_dnode3_ready_0
endi
sleep 20000
sql show mnodes
print show mnodes
print rows: $rows
print $data0_1 $data1_1 $data2_1 $data3_1 $data4_1
print $data0_2 $data1_2 $data2_2 $data3_2 $data4_2
print $data0_3 $data1_3 $data2_3 $data3_3 $data4_3
print $data0_4 $data1_4 $data2_4 $data3_4 $data4_4
$dnode1Status = $data4_1
$dnode2Status = $data4_2
$dnode3Status = $data4_3
$dnode4Status = $data4_4
if $dnode3Status != ready then
sleep 2000
goto wait_dnode3_ready_0
endi
print $data0_5 $data1_5 $data2_5 $data3_5 $data4_5
print $data0_6 $data1_6 $data2_6 $data3_6 $data4_6
print $data0_7 $data1_7 $data2_7 $data3_7 $data4_7
print $data0_8 $data1_8 $data2_8 $data3_8 $data4_8
print $data0_9 $data1_9 $data2_9 $data3_9 $data4_9
print ============== step11: stop dnode4, then wait sync end
system sh/exec.sh -n dnode4 -s stop -x SIGINT
sleep 3000
$cnt = 0
wait_dnode4_offline_0:
$cnt = $cnt + 1
if $cnt == 10 then
return -1
endi
sql show dnodes
if $rows != 3 then
sleep 2000
goto wait_dnode4_offline_0
endi
print $data0_1 $data1_1 $data2_1 $data3_1 $data4_1
print $data0_2 $data1_2 $data2_2 $data3_2 $data4_2
print $data0_3 $data1_3 $data2_3 $data3_3 $data4_3
print $data0_4 $data1_4 $data2_4 $data3_4 $data4_4
$dnode1Status = $data4_1
$dnode2Status = $data4_2
$dnode3Status = $data4_3
#$dnode4Status = $data4_4
if $loop_cnt == 0 then
$dnode4Status = $data4_4
elif $loop_cnt == 1 then
$dnode4Status = $data4_6
elif $loop_cnt == 2 then
$dnode4Status = $data4_8
else then
print **** **** **** END loop cluster do (loop_cnt: $loop_cnt )**** **** **** ****
return
endi
if $dnode4Status != offline then
sleep 2000
goto wait_dnode4_offline_0
endi
sleep 20000
print ============== step12: restart dnode4, then wait sync end
system sh/exec.sh -n dnode4 -s start
sleep 3000
$cnt = 0
wait_dnode4_ready_1:
$cnt = $cnt + 1
if $cnt == 10 then
return -1
endi
sql show dnodes
if $rows != 3 then
sleep 2000
goto wait_dnode4_ready_1
endi
sleep 20000
sql show mnodes
print show mnodes
print rows: $rows
print $data0_1 $data1_1 $data2_1 $data3_1 $data4_1
print $data0_2 $data1_2 $data2_2 $data3_2 $data4_2
print $data0_3 $data1_3 $data2_3 $data3_3 $data4_3
print $data0_4 $data1_4 $data2_4 $data3_4 $data4_4
$dnode1Status = $data4_1
$dnode2Status = $data4_2
$dnode3Status = $data4_3
#$dnode4Status = $data4_4
if $loop_cnt == 0 then
$dnode4Status = $data4_4
elif $loop_cnt == 1 then
$dnode4Status = $data4_6
elif $loop_cnt == 2 then
$dnode4Status = $data4_8
else then
print **** **** **** END loop cluster do (loop_cnt: $loop_cnt )**** **** **** ****
return
endi
if $dnode4Status != ready then
sleep 2000
goto wait_dnode4_ready_1
endi
print $data0_5 $data1_5 $data2_5 $data3_5 $data4_5
print $data0_6 $data1_6 $data2_6 $data3_6 $data4_6
print $data0_7 $data1_7 $data2_7 $data3_7 $data4_7
print $data0_8 $data1_8 $data2_8 $data3_8 $data4_8
print $data0_9 $data1_9 $data2_9 $data3_9 $data4_9
print ============== step13: alter replica 2
sql alter database $db replica 2
@ -476,50 +236,14 @@ if $data04 != 2 then
return -1
endi
print ============== step14: stop and drop dnode4, then remove data dir of dnode4
print ============== step14: stop and drop dnode4/dnode5, then remove data dir of dnode4/dnode5
system sh/exec.sh -n dnode4 -s stop -x SIGINT
sleep 3000
$cnt = 0
wait_dnode4_offline_1:
$cnt = $cnt + 1
if $cnt == 10 then
return -1
endi
sql show dnodes
if $rows != 3 then
sleep 2000
goto wait_dnode4_offline_1
endi
print $data0_1 $data1_1 $data2_1 $data3_1 $data4_1
print $data0_2 $data1_2 $data2_2 $data3_2 $data4_2
print $data0_3 $data1_3 $data2_3 $data3_3 $data4_3
print $data0_4 $data1_4 $data2_4 $data3_4 $data4_4
$dnode2Status = $data4_2
$dnode3Status = $data4_3
#$dnode4Status = $data4_4
if $loop_cnt == 0 then
$dnode4Status = $data4_4
elif $loop_cnt == 1 then
$dnode4Status = $data4_6
elif $loop_cnt == 2 then
$dnode4Status = $data4_8
else then
print **** **** **** END loop cluster do (loop_cnt: $loop_cnt )**** **** **** ****
return
endi
if $dnode4Status != offline then
sleep 2000
goto wait_dnode4_offline_1
endi
sleep 3000
system sh/exec.sh -n dnode5 -s stop -x SIGINT
sleep 20000
sql drop dnode $hostname4
sql drop dnode $hostname5
system rm -rf ../../../sim/dnode4/data
system rm -rf ../../../sim/dnode5/data
print ============== step15: alter replica 1
sql alter database $db replica 1
@ -530,7 +254,6 @@ if $data04 != 1 then
return -1
endi
print ============== step16: alter replica 2
sql alter database $db replica 2
sql show databases
@ -546,42 +269,7 @@ system sh/cfg.sh -n dnode1 -c second -v $hostname3
system sh/exec.sh -n dnode1 -s start
sql create dnode $hostname1
wait_dnode1_ready_0:
$cnt = $cnt + 1
if $cnt == 10 then
return -1
endi
sql show dnodes
if $rows != 3 then
sleep 2000
goto wait_dnode1_ready_0
endi
print $data0_1 $data1_1 $data2_1 $data3_1 $data4_1
print $data0_2 $data1_2 $data2_2 $data3_2 $data4_2
print $data0_3 $data1_3 $data2_3 $data3_3 $data4_3
print $data0_4 $data1_4 $data2_4 $data3_4 $data4_4
print $data0_5 $data1_5 $data2_5 $data3_5 $data4_5
#$dnode1Status = $data4_1
$dnode2Status = $data4_2
$dnode3Status = $data4_3
$dnode4Status = $data4_4
if $loop_cnt == 0 then
$dnode1Status = $data4_5
elif $loop_cnt == 1 then
$dnode1Status = $data4_7
elif $loop_cnt == 2 then
$dnode1Status = $data4_9
else then
print **** **** **** END loop cluster do (loop_cnt: $loop_cnt )**** **** **** ****
return
endi
if $dnode1Status != ready then
sleep 2000
goto wait_dnode1_ready_0
endi
sleep 20000
print ============== step18: alter replica 3
sql alter database $db replica 3