diff --git a/src/client/inc/tscUtil.h b/src/client/inc/tscUtil.h index 3df493349e..d86e1aa0fb 100644 --- a/src/client/inc/tscUtil.h +++ b/src/client/inc/tscUtil.h @@ -82,6 +82,7 @@ typedef struct SJoinSupporter { char* pIdTagList; // result of first stage tags int32_t totalLen; int32_t num; + SArray* pVgroupTables; } SJoinSupporter; typedef struct SVgroupTableInfo { @@ -215,7 +216,7 @@ SQueryInfo *tscGetQueryInfoDetailSafely(SSqlCmd *pCmd, int32_t subClauseIndex); void tscClearTableMetaInfo(STableMetaInfo* pTableMetaInfo, bool removeFromCache); STableMetaInfo* tscAddTableMetaInfo(SQueryInfo* pQueryInfo, const char* name, STableMeta* pTableMeta, - SVgroupsInfo* vgroupList, SArray* pTagCols); + SVgroupsInfo* vgroupList, SArray* pTagCols, SArray* pVgroupTables); STableMetaInfo* tscAddEmptyMetaInfo(SQueryInfo *pQueryInfo); int32_t tscAddSubqueryInfo(SSqlCmd *pCmd); @@ -224,6 +225,8 @@ void tscInitQueryInfo(SQueryInfo* pQueryInfo); void tscClearSubqueryInfo(SSqlCmd* pCmd); void tscFreeVgroupTableInfo(SArray* pVgroupTables); +SArray* tscCloneVgroupTableInfo(SArray* pVgroupTables); +void tscRemoveVgroupTableGroup(SArray* pVgroupTable, int32_t index); int tscGetSTableVgroupInfo(SSqlObj* pSql, int32_t clauseIndex); int tscGetTableMeta(SSqlObj* pSql, STableMetaInfo* pTableMetaInfo); diff --git a/src/client/src/tscFunctionImpl.c b/src/client/src/tscFunctionImpl.c index 1b4f92d3fc..74ddcbd3fd 100644 --- a/src/client/src/tscFunctionImpl.c +++ b/src/client/src/tscFunctionImpl.c @@ -4025,11 +4025,11 @@ static void ts_comp_function(SQLFunctionCtx *pCtx) { // primary ts must be existed, so no need to check its existance if (pCtx->order == TSDB_ORDER_ASC) { - tsBufAppend(pTSbuf, 0, &pCtx->tag, input, pCtx->size * TSDB_KEYSIZE); + tsBufAppend(pTSbuf, (int32_t)pCtx->param[0].i64Key, &pCtx->tag, input, pCtx->size * TSDB_KEYSIZE); } else { for (int32_t i = pCtx->size - 1; i >= 0; --i) { char *d = GET_INPUT_CHAR_INDEX(pCtx, i); - tsBufAppend(pTSbuf, 0, &pCtx->tag, d, TSDB_KEYSIZE); + tsBufAppend(pTSbuf, (int32_t)pCtx->param[0].i64Key, &pCtx->tag, d, (int32_t)TSDB_KEYSIZE); } } @@ -4048,7 +4048,7 @@ static void ts_comp_function_f(SQLFunctionCtx *pCtx, int32_t index) { STSBuf *pTSbuf = pInfo->pTSBuf; - tsBufAppend(pTSbuf, 0, &pCtx->tag, pData, TSDB_KEYSIZE); + tsBufAppend(pTSbuf, (int32_t)pCtx->param[0].i64Key, &pCtx->tag, pData, TSDB_KEYSIZE); SET_VAL(pCtx, pCtx->size, 1); pResInfo->hasResult = DATA_SET_FLAG; diff --git a/src/client/src/tscLocalMerge.c b/src/client/src/tscLocalMerge.c index 44ccb2471a..18d72e2d1e 100644 --- a/src/client/src/tscLocalMerge.c +++ b/src/client/src/tscLocalMerge.c @@ -698,7 +698,8 @@ int32_t tscLocalReducerEnvCreate(SSqlObj *pSql, tExtMemBuffer ***pMemBuffer, tOr pg *= 2; } - size_t numOfSubs = pTableMetaInfo->vgroupList->numOfVgroups; + size_t numOfSubs = pSql->subState.numOfSub; + assert(numOfSubs <= pTableMetaInfo->vgroupList->numOfVgroups); for (int32_t i = 0; i < numOfSubs; ++i) { (*pMemBuffer)[i] = createExtMemBuffer(nBufferSizes, rlen, pg, pModel); (*pMemBuffer)[i]->flushModel = MULTIPLE_APPEND_MODEL; diff --git a/src/client/src/tscSQLParser.c b/src/client/src/tscSQLParser.c index e2573f7e19..815af79d8f 100644 --- a/src/client/src/tscSQLParser.c +++ b/src/client/src/tscSQLParser.c @@ -877,22 +877,13 @@ static bool validateTableColumnInfo(tFieldList* pFieldList, SSqlCmd* pCmd) { int32_t nLen = 0; for (int32_t i = 0; i < pFieldList->nField; ++i) { - if (pFieldList->p[i].bytes == 0) { + TAOS_FIELD* pField = &pFieldList->p[i]; + + if (pField->bytes == 0) { invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg5); return false; } - nLen += pFieldList->p[i].bytes; - } - // max row length must be less than TSDB_MAX_BYTES_PER_ROW - if (nLen > TSDB_MAX_BYTES_PER_ROW) { - invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg2); - return false; - } - - // field name must be unique - for (int32_t i = 0; i < pFieldList->nField; ++i) { - TAOS_FIELD* pField = &pFieldList->p[i]; if (pField->type < TSDB_DATA_TYPE_BOOL || pField->type > TSDB_DATA_TYPE_NCHAR) { invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg4); return false; @@ -909,10 +900,19 @@ static bool validateTableColumnInfo(tFieldList* pFieldList, SSqlCmd* pCmd) { return false; } + // field name must be unique if (has(pFieldList, i + 1, pFieldList->p[i].name) == true) { invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg3); return false; } + + nLen += pField->bytes; + } + + // max row length must be less than TSDB_MAX_BYTES_PER_ROW + if (nLen > TSDB_MAX_BYTES_PER_ROW) { + invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg2); + return false; } return true; diff --git a/src/client/src/tscSchemaUtil.c b/src/client/src/tscSchemaUtil.c index dfd707344c..ac740555af 100644 --- a/src/client/src/tscSchemaUtil.c +++ b/src/client/src/tscSchemaUtil.c @@ -177,7 +177,7 @@ STableMeta* tscCreateTableMetaFromMsg(STableMetaMsg* pTableMetaMsg, size_t* size pVgroupInfo->epAddr[i].port = pEpMsg->port; } - tscInitCorVgroupInfo(&pTableMeta->corVgroupInfo, &pTableMeta->vgroupInfo); + tscInitCorVgroupInfo(&pTableMeta->corVgroupInfo, pVgroupInfo); pTableMeta->sversion = pTableMetaMsg->sversion; pTableMeta->tversion = pTableMetaMsg->tversion; diff --git a/src/client/src/tscServer.c b/src/client/src/tscServer.c index a0841fa234..581271f845 100644 --- a/src/client/src/tscServer.c +++ b/src/client/src/tscServer.c @@ -481,14 +481,25 @@ int tscBuildFetchMsg(SSqlObj *pSql, SSqlInfo *pInfo) { if (UTIL_TABLE_IS_SUPER_TABLE(pTableMetaInfo)) { int32_t vgIndex = pTableMetaInfo->vgroupIndex; - - SVgroupsInfo* pVgroupInfo = pTableMetaInfo->vgroupList; - assert(pVgroupInfo->vgroups[vgIndex].vgId > 0 && vgIndex < pTableMetaInfo->vgroupList->numOfVgroups); + if (pTableMetaInfo->pVgroupTables == NULL) { + SVgroupsInfo *pVgroupInfo = pTableMetaInfo->vgroupList; + assert(pVgroupInfo->vgroups[vgIndex].vgId > 0 && vgIndex < pTableMetaInfo->vgroupList->numOfVgroups); - pRetrieveMsg->header.vgId = htonl(pVgroupInfo->vgroups[vgIndex].vgId); + pRetrieveMsg->header.vgId = htonl(pVgroupInfo->vgroups[vgIndex].vgId); + tscDebug("%p build fetch msg from vgId:%d, vgIndex:%d", pSql, pVgroupInfo->vgroups[vgIndex].vgId, vgIndex); + } else { + int32_t numOfVgroups = (int32_t)taosArrayGetSize(pTableMetaInfo->pVgroupTables); + assert(vgIndex >= 0 && vgIndex < numOfVgroups); + + SVgroupTableInfo* pTableIdList = taosArrayGet(pTableMetaInfo->pVgroupTables, vgIndex); + + pRetrieveMsg->header.vgId = htonl(pTableIdList->vgInfo.vgId); + tscDebug("%p build fetch msg from vgId:%d, vgIndex:%d", pSql, pTableIdList->vgInfo.vgId, vgIndex); + } } else { STableMeta* pTableMeta = pTableMetaInfo->pTableMeta; pRetrieveMsg->header.vgId = htonl(pTableMeta->vgroupInfo.vgId); + tscDebug("%p build fetch msg from only one vgroup, vgId:%d", pSql, pTableMeta->vgroupInfo.vgId); } pSql->cmd.payloadLen = sizeof(SRetrieveTableMsg); @@ -662,12 +673,12 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) { pQueryMsg->limit = htobe64(pQueryInfo->limit.limit); pQueryMsg->offset = htobe64(pQueryInfo->limit.offset); pQueryMsg->numOfCols = htons((int16_t)taosArrayGetSize(pQueryInfo->colList)); - pQueryMsg->interval.interval = htobe64(pQueryInfo->interval.interval); - pQueryMsg->interval.sliding = htobe64(pQueryInfo->interval.sliding); + pQueryMsg->interval.interval = htobe64(pQueryInfo->interval.interval); + pQueryMsg->interval.sliding = htobe64(pQueryInfo->interval.sliding); pQueryMsg->interval.offset = htobe64(pQueryInfo->interval.offset); pQueryMsg->interval.intervalUnit = pQueryInfo->interval.intervalUnit; - pQueryMsg->interval.slidingUnit = pQueryInfo->interval.slidingUnit; - pQueryMsg->interval.offsetUnit = pQueryInfo->interval.offsetUnit; + pQueryMsg->interval.slidingUnit = pQueryInfo->interval.slidingUnit; + pQueryMsg->interval.offsetUnit = pQueryInfo->interval.offsetUnit; pQueryMsg->numOfGroupCols = htons(pQueryInfo->groupbyExpr.numOfGroupCols); pQueryMsg->numOfTags = htonl(numOfTags); pQueryMsg->tagNameRelType = htons(pQueryInfo->tagCond.relType); @@ -850,7 +861,8 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) { int32_t numOfBlocks = 0; if (pQueryInfo->tsBuf != NULL) { - STSVnodeBlockInfo *pBlockInfo = tsBufGetVnodeBlockInfo(pQueryInfo->tsBuf, pTableMetaInfo->vgroupIndex); + int32_t vnodeId = htonl(pQueryMsg->head.vgId); + STSVnodeBlockInfo *pBlockInfo = tsBufGetVnodeBlockInfo(pQueryInfo->tsBuf, vnodeId); assert(QUERY_IS_JOIN_QUERY(pQueryInfo->type) && pBlockInfo != NULL); // this query should not be sent // todo refactor @@ -2271,7 +2283,7 @@ int tscGetSTableVgroupInfo(SSqlObj *pSql, int32_t clauseIndex) { for (int32_t i = 0; i < pQueryInfo->numOfTables; ++i) { STableMetaInfo *pMInfo = tscGetMetaInfo(pQueryInfo, i); STableMeta *pTableMeta = taosCacheAcquireByData(tscMetaCache, pMInfo->pTableMeta); - tscAddTableMetaInfo(pNewQueryInfo, pMInfo->name, pTableMeta, NULL, pMInfo->tagColList); + tscAddTableMetaInfo(pNewQueryInfo, pMInfo->name, pTableMeta, NULL, pMInfo->tagColList, pMInfo->pVgroupTables); } if ((code = tscAllocPayload(&pNew->cmd, TSDB_DEFAULT_PAYLOAD_SIZE)) != TSDB_CODE_SUCCESS) { diff --git a/src/client/src/tscSubquery.c b/src/client/src/tscSubquery.c index 6b615c3a9b..794b7a068b 100644 --- a/src/client/src/tscSubquery.c +++ b/src/client/src/tscSubquery.c @@ -23,7 +23,6 @@ #include "tscSubquery.h" #include "tschemautil.h" #include "tsclient.h" -#include "tscSubquery.h" typedef struct SInsertSupporter { SSqlObj* pSql; @@ -59,6 +58,8 @@ static int64_t doTSBlockIntersect(SSqlObj* pSql, SJoinSupporter* pSupporter1, SJ pSubQueryInfo1->tsBuf = output1; pSubQueryInfo2->tsBuf = output2; + TSKEY st = taosGetTimestampUs(); + // no result generated, return directly if (pSupporter1->pTSBuf == NULL || pSupporter2->pTSBuf == NULL) { tscDebug("%p at least one ts-comp is empty, 0 for secondary query after ts blocks intersecting", pSql); @@ -95,7 +96,7 @@ static int64_t doTSBlockIntersect(SSqlObj* pSql, SJoinSupporter* pSupporter1, SJ tscInfo("%" PRId64 ", tags:%"PRId64" \t %" PRId64 ", tags:%"PRId64, elem1.ts, elem1.tag.i64Key, elem2.ts, elem2.tag.i64Key); #endif - int32_t res = tVariantCompare(&elem1.tag, &elem2.tag); + int32_t res = tVariantCompare(elem1.tag, elem2.tag); if (res == -1 || (res == 0 && tsCompare(order, elem1.ts, elem2.ts))) { if (!tsBufNextPos(pSupporter1->pTSBuf)) { break; @@ -122,8 +123,9 @@ static int64_t doTSBlockIntersect(SSqlObj* pSql, SJoinSupporter* pSupporter1, SJ win->ekey = elem1.ts; } - tsBufAppend(output1, elem1.vnode, &elem1.tag, (const char*)&elem1.ts, sizeof(elem1.ts)); - tsBufAppend(output2, elem2.vnode, &elem2.tag, (const char*)&elem2.ts, sizeof(elem2.ts)); + tsBufAppend(output1, elem1.vnode, elem1.tag, (const char*)&elem1.ts, sizeof(elem1.ts)); + tsBufAppend(output2, elem2.vnode, elem2.tag, (const char*)&elem2.ts, sizeof(elem2.ts)); + } else { pLimit->offset -= 1; } @@ -158,9 +160,10 @@ static int64_t doTSBlockIntersect(SSqlObj* pSql, SJoinSupporter* pSupporter1, SJ tsBufDestroy(pSupporter1->pTSBuf); tsBufDestroy(pSupporter2->pTSBuf); - tscDebug("%p input1:%" PRId64 ", input2:%" PRId64 ", final:%" PRId64 " for secondary query after ts blocks " - "intersecting, skey:%" PRId64 ", ekey:%" PRId64, pSql, numOfInput1, numOfInput2, output1->numOfTotal, - win->skey, win->ekey); + TSKEY et = taosGetTimestampUs(); + tscDebug("%p input1:%" PRId64 ", input2:%" PRId64 ", final:%" PRId64 " in %d vnodes for secondary query after ts blocks " + "intersecting, skey:%" PRId64 ", ekey:%" PRId64 ", numOfVnode:%d, elasped time:%"PRId64" us", pSql, numOfInput1, numOfInput2, output1->numOfTotal, + output1->numOfVnodes, win->skey, win->ekey, tsBufGetNumOfVnodes(output1), et - st); return output1->numOfTotal; } @@ -216,6 +219,11 @@ static void tscDestroyJoinSupporter(SJoinSupporter* pSupporter) { pSupporter->f = NULL; } + if (pSupporter->pVgroupTables != NULL) { + taosArrayDestroy(pSupporter->pVgroupTables); + pSupporter->pVgroupTables = NULL; + } + taosTFree(pSupporter->pIdTagList); tscTagCondRelease(&pSupporter->tagCond); free(pSupporter); @@ -305,7 +313,6 @@ static int32_t tscLaunchRealSubqueries(SSqlObj* pSql) { // set the second stage sub query for join process TSDB_QUERY_SET_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_JOIN_SEC_STAGE); - memcpy(&pQueryInfo->interval, &pSupporter->interval, sizeof(pQueryInfo->interval)); tscTagCondCopy(&pQueryInfo->tagCond, &pSupporter->tagCond); @@ -324,7 +331,9 @@ static int32_t tscLaunchRealSubqueries(SSqlObj* pSql) { tscFieldInfoUpdateOffset(pNewQueryInfo); STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pNewQueryInfo, 0); - + pTableMetaInfo->pVgroupTables = pSupporter->pVgroupTables; + pSupporter->pVgroupTables = NULL; + /* * When handling the projection query, the offset value will be modified for table-table join, which is changed * during the timestamp intersection. @@ -356,10 +365,39 @@ static int32_t tscLaunchRealSubqueries(SSqlObj* pSql) { int16_t colId = tscGetJoinTagColIdByUid(&pQueryInfo->tagCond, pTableMetaInfo->pTableMeta->id.uid); // set the tag column id for executor to extract correct tag value - pExpr->param[0].i64Key = colId; + pExpr->param[0] = (tVariant) {.i64Key = colId, .nType = TSDB_DATA_TYPE_BIGINT, .nLen = sizeof(int64_t)}; pExpr->numOfParams = 1; } + int32_t num = 0; + int32_t *list = NULL; + tsBufGetVnodeIdList(pNewQueryInfo->tsBuf, &num, &list); + + if (pTableMetaInfo->pVgroupTables != NULL) { + for(int32_t k = 0; k < taosArrayGetSize(pTableMetaInfo->pVgroupTables);) { + SVgroupTableInfo* p = taosArrayGet(pTableMetaInfo->pVgroupTables, k); + + bool found = false; + for(int32_t f = 0; f < num; ++f) { + if (p->vgInfo.vgId == list[f]) { + found = true; + break; + } + } + + if (!found) { + tscRemoveVgroupTableGroup(pTableMetaInfo->pVgroupTables, k); + } else { + k++; + } + } + + assert(taosArrayGetSize(pTableMetaInfo->pVgroupTables) > 0); + TSDB_QUERY_SET_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_MULTITABLE_QUERY); + } + + taosTFree(list); + size_t numOfCols = taosArrayGetSize(pNewQueryInfo->colList); tscDebug("%p subquery:%p tableIndex:%d, vgroupIndex:%d, type:%d, exprInfo:%" PRIzu ", colList:%" PRIzu ", fieldsInfo:%d, name:%s", pSql, pNew, 0, pTableMetaInfo->vgroupIndex, pNewQueryInfo->type, taosArrayGetSize(pNewQueryInfo->exprList), @@ -418,6 +456,8 @@ static void quitAllSubquery(SSqlObj* pSqlObj, SJoinSupporter* pSupporter) { static void updateQueryTimeRange(SQueryInfo* pQueryInfo, STimeWindow* win) { assert(pQueryInfo->window.skey <= win->skey && pQueryInfo->window.ekey >= win->ekey); pQueryInfo->window = *win; + + } int32_t tscCompareTidTags(const void* p1, const void* p2) { @@ -474,10 +514,11 @@ static void issueTSCompQuery(SSqlObj* pSql, SJoinSupporter* pSupporter, SSqlObj* SSqlCmd* pCmd = &pSql->cmd; tscClearSubqueryInfo(pCmd); tscFreeSqlResult(pSql); - + SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, 0); + assert(pQueryInfo->numOfTables == 1); + STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0); - tscInitQueryInfo(pQueryInfo); TSDB_QUERY_CLEAR_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_TAG_FILTER_QUERY); @@ -524,13 +565,7 @@ static void issueTSCompQuery(SSqlObj* pSql, SJoinSupporter* pSupporter, SSqlObj* tscProcessSql(pSql); } -static bool checkForDuplicateTagVal(SQueryInfo* pQueryInfo, SJoinSupporter* p1, SSqlObj* pPSqlObj) { - STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0); - - SSchema* pSchema = tscGetTableTagSchema(pTableMetaInfo->pTableMeta);// todo: tags mismatch, tags not completed - SColumn *pCol = taosArrayGetP(pTableMetaInfo->tagColList, 0); - SSchema *pColSchema = &pSchema[pCol->colIndex.columnIndex]; - +static bool checkForDuplicateTagVal(SSchema* pColSchema, SJoinSupporter* p1, SSqlObj* pPSqlObj) { for(int32_t i = 1; i < p1->num; ++i) { STidTags* prev = (STidTags*) varDataVal(p1->pIdTagList + (i - 1) * p1->tagSize); STidTags* p = (STidTags*) varDataVal(p1->pIdTagList + i * p1->tagSize); @@ -564,7 +599,7 @@ static int32_t getIntersectionOfTableTuple(SQueryInfo* pQueryInfo, SSqlObj* pPar *s1 = taosArrayInit(p1->num, p1->tagSize - sizeof(int16_t)); *s2 = taosArrayInit(p2->num, p2->tagSize - sizeof(int16_t)); - if (!(checkForDuplicateTagVal(pQueryInfo, p1, pParentSql) && checkForDuplicateTagVal(pQueryInfo, p2, pParentSql))) { + if (!(checkForDuplicateTagVal(pColSchema, p1, pParentSql) && checkForDuplicateTagVal(pColSchema, p2, pParentSql))) { return TSDB_CODE_QRY_DUP_JOIN_KEY; } @@ -708,6 +743,12 @@ static void tidTagRetrieveCallback(void* param, TAOS_RES* tres, int32_t numOfRow STableMetaInfo* pTableMetaInfo2 = tscGetMetaInfo(pQueryInfo2, 0); tscBuildVgroupTableInfo(pParentSql, pTableMetaInfo2, s2); + SSqlObj* psub1 = pParentSql->pSubs[0]; + ((SJoinSupporter*)psub1->param)->pVgroupTables = tscCloneVgroupTableInfo(pTableMetaInfo1->pVgroupTables); + + SSqlObj* psub2 = pParentSql->pSubs[1]; + ((SJoinSupporter*)psub2->param)->pVgroupTables = tscCloneVgroupTableInfo(pTableMetaInfo2->pVgroupTables); + pParentSql->subState.numOfSub = 2; pParentSql->subState.numOfRemain = pParentSql->subState.numOfSub; @@ -766,9 +807,7 @@ static void tsCompRetrieveCallback(void* param, TAOS_RES* tres, int32_t numOfRow pSupporter->pTSBuf = pBuf; } else { assert(pQueryInfo->numOfTables == 1); // for subquery, only one - STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0); - - tsBufMerge(pSupporter->pTSBuf, pBuf, pTableMetaInfo->vgroupIndex); + tsBufMerge(pSupporter->pTSBuf, pBuf); tsBufDestroy(pBuf); } @@ -835,6 +874,8 @@ static void tsCompRetrieveCallback(void* param, TAOS_RES* tres, int32_t numOfRow // launch the query the retrieve actual results from vnode along with the filtered timestamp SQueryInfo* pPQueryInfo = tscGetQueryInfoDetail(&pParentSql->cmd, pParentSql->cmd.clauseIndex); updateQueryTimeRange(pPQueryInfo, &win); + + //update the vgroup that involved in real data query tscLaunchRealSubqueries(pParentSql); } @@ -868,20 +909,27 @@ static void joinRetrieveFinalResCallback(void* param, TAOS_RES* tres, int numOfR assert(pQueryInfo->numOfTables == 1); // for projection query, need to try next vnode if current vnode is exhausted - if ((++pTableMetaInfo->vgroupIndex) < pTableMetaInfo->vgroupList->numOfVgroups) { - pState->numOfRemain = 1; - pState->numOfSub = 1; + int32_t numOfVgroups = 0; // TODO refactor + if (pTableMetaInfo->pVgroupTables != NULL) { + numOfVgroups = (int32_t)taosArrayGetSize(pTableMetaInfo->pVgroupTables); + } else { + numOfVgroups = pTableMetaInfo->vgroupList->numOfVgroups; + } + if ((++pTableMetaInfo->vgroupIndex) < numOfVgroups) { + tscDebug("%p no result in current vnode anymore, try next vnode, vgIndex:%d", pSql, pTableMetaInfo->vgroupIndex); pSql->cmd.command = TSDB_SQL_SELECT; pSql->fp = tscJoinQueryCallback; - tscProcessSql(pSql); + tscProcessSql(pSql); return; + } else { + tscDebug("%p no result in current subquery anymore", pSql); } } - if (atomic_sub_fetch_32(&pParentSql->subState.numOfRemain, 1) > 0) { - tscDebug("%p sub:%p completed, remain:%d, total:%d", pParentSql, tres, pParentSql->subState.numOfRemain, pState->numOfSub); + if (atomic_sub_fetch_32(&pState->numOfRemain, 1) > 0) { + tscDebug("%p sub:%p completed, remain:%d, total:%d", pParentSql, tres, pState->numOfRemain, pState->numOfSub); return; } @@ -895,60 +943,60 @@ static void joinRetrieveFinalResCallback(void* param, TAOS_RES* tres, int numOfR // update the records for each subquery in parent sql object. for (int32_t i = 0; i < pState->numOfSub; ++i) { if (pParentSql->pSubs[i] == NULL) { + tscDebug("%p %p sub:%d not retrieve data", pParentSql, NULL, i); continue; } SSqlRes* pRes1 = &pParentSql->pSubs[i]->res; - pRes1->numOfClauseTotal += pRes1->numOfRows; + + if (pRes1->row > 0 && pRes1->numOfRows > 0) { + tscDebug("%p sub:%p index:%d numOfRows:%"PRId64" total:%"PRId64 " (not retrieve)", pParentSql, pParentSql->pSubs[i], i, + pRes1->numOfRows, pRes1->numOfTotal); + assert(pRes1->row < pRes1->numOfRows); + } else { + pRes1->numOfClauseTotal += pRes1->numOfRows; + tscDebug("%p sub:%p index:%d numOfRows:%"PRId64" total:%"PRId64, pParentSql, pParentSql->pSubs[i], i, + pRes1->numOfRows, pRes1->numOfTotal); + } } // data has retrieved to client, build the join results tscBuildResFromSubqueries(pParentSql); } -static SJoinSupporter* tscUpdateSubqueryStatus(SSqlObj* pSql, int32_t numOfFetch) { - int32_t notInvolved = 0; - SJoinSupporter* pSupporter = NULL; - SSubqueryState* pState = &pSql->subState; - - for(int32_t i = 0; i < pSql->subState.numOfSub; ++i) { - if (pSql->pSubs[i] == NULL) { - notInvolved++; - } else { - pSupporter = (SJoinSupporter*)pSql->pSubs[i]->param; - } - } - - pState->numOfRemain = numOfFetch; - return pSupporter; -} - void tscFetchDatablockFromSubquery(SSqlObj* pSql) { assert(pSql->subState.numOfSub >= 1); int32_t numOfFetch = 0; - bool hasData = true; + bool hasData = true; + bool reachLimit = false; + + // if the subquery is NULL, it does not involved in the final result generation for (int32_t i = 0; i < pSql->subState.numOfSub; ++i) { - // if the subquery is NULL, it does not involved in the final result generation SSqlObj* pSub = pSql->pSubs[i]; if (pSub == NULL) { continue; } - + SSqlRes *pRes = &pSub->res; + SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(&pSub->cmd, 0); if (!tscHasReachLimitation(pQueryInfo, pRes)) { if (pRes->row >= pRes->numOfRows) { + // no data left in current result buffer hasData = false; + // The current query is completed for the active vnode, try next vnode if exists + // If it is completed, no need to fetch anymore. if (!pRes->completed) { numOfFetch++; } } } else { // has reach the limitation, no data anymore if (pRes->row >= pRes->numOfRows) { - hasData = false; + reachLimit = true; + hasData = false; break; } } @@ -958,29 +1006,102 @@ void tscFetchDatablockFromSubquery(SSqlObj* pSql) { if (hasData) { tscBuildResFromSubqueries(pSql); return; - } else if (numOfFetch <= 0) { + } + + // If at least one subquery is completed in current vnode, try the next vnode in case of multi-vnode + // super table projection query. + if (reachLimit) { pSql->res.completed = true; freeJoinSubqueryObj(pSql); - + if (pSql->res.code == TSDB_CODE_SUCCESS) { (*pSql->fp)(pSql->param, pSql, 0); } else { tscQueueAsyncRes(pSql); } - + + return; + } + + if (numOfFetch <= 0) { + bool tryNextVnode = false; + + SSqlObj* pp = pSql->pSubs[0]; + SQueryInfo* pi = tscGetQueryInfoDetail(&pp->cmd, 0); + + // get the number of subquery that need to retrieve the next vnode. + if (tscNonOrderedProjectionQueryOnSTable(pi, 0)) { + for (int32_t i = 0; i < pSql->subState.numOfSub; ++i) { + SSqlObj* pSub = pSql->pSubs[i]; + if (pSub != NULL && pSub->res.row >= pSub->res.numOfRows && pSub->res.completed) { + pSql->subState.numOfRemain++; + } + } + } + + for (int32_t i = 0; i < pSql->subState.numOfSub; ++i) { + SSqlObj* pSub = pSql->pSubs[i]; + if (pSub == NULL) { + continue; + } + + SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(&pSub->cmd, 0); + + if (tscNonOrderedProjectionQueryOnSTable(pQueryInfo, 0) && pSub->res.row >= pSub->res.numOfRows && + pSub->res.completed) { + STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0); + assert(pQueryInfo->numOfTables == 1); + + // for projection query, need to try next vnode if current vnode is exhausted + int32_t numOfVgroups = 0; // TODO refactor + if (pTableMetaInfo->pVgroupTables != NULL) { + numOfVgroups = (int32_t)taosArrayGetSize(pTableMetaInfo->pVgroupTables); + } else { + numOfVgroups = pTableMetaInfo->vgroupList->numOfVgroups; + } + + if ((++pTableMetaInfo->vgroupIndex) < numOfVgroups) { + tscDebug("%p no result in current vnode anymore, try next vnode, vgIndex:%d", pSub, + pTableMetaInfo->vgroupIndex); + pSub->cmd.command = TSDB_SQL_SELECT; + pSub->fp = tscJoinQueryCallback; + + tscProcessSql(pSub); + tryNextVnode = true; + } else { + tscDebug("%p no result in current subquery anymore", pSub); + } + } + } + + if (tryNextVnode) { + return; + } + + pSql->res.completed = true; + freeJoinSubqueryObj(pSql); + + if (pSql->res.code == TSDB_CODE_SUCCESS) { + (*pSql->fp)(pSql->param, pSql, 0); + } else { + tscQueueAsyncRes(pSql); + } + return; } // TODO multi-vnode retrieve for projection query with limitation has bugs, since the global limiation is not handled + // retrieve data from current vnode. tscDebug("%p retrieve data from %d subqueries", pSql, numOfFetch); - SJoinSupporter* pSupporter = tscUpdateSubqueryStatus(pSql, numOfFetch); - + SJoinSupporter* pSupporter = NULL; + pSql->subState.numOfRemain = numOfFetch; + for (int32_t i = 0; i < pSql->subState.numOfSub; ++i) { SSqlObj* pSql1 = pSql->pSubs[i]; if (pSql1 == NULL) { continue; } - + SSqlRes* pRes1 = &pSql1->res; SSqlCmd* pCmd1 = &pSql1->cmd; @@ -1122,7 +1243,7 @@ void tscJoinQueryCallback(void* param, TAOS_RES* tres, int code) { * data instead of returning to its invoker */ if (pTableMetaInfo->vgroupIndex > 0 && tscNonOrderedProjectionQueryOnSTable(pQueryInfo, 0)) { - pParentSql->subState.numOfRemain = pParentSql->subState.numOfSub; // reset the record value +// pParentSql->subState.numOfRemain = pParentSql->subState.numOfSub; // reset the record value pSql->fp = joinRetrieveFinalResCallback; // continue retrieve data pSql->cmd.command = TSDB_SQL_FETCH; @@ -1386,7 +1507,13 @@ int32_t tscHandleMasterSTableQuery(SSqlObj *pSql) { STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0); SSubqueryState *pState = &pSql->subState; - pState->numOfSub = pTableMetaInfo->vgroupList->numOfVgroups; + pState->numOfSub = 0; + if (pTableMetaInfo->pVgroupTables == NULL) { + pState->numOfSub = pTableMetaInfo->vgroupList->numOfVgroups; + } else { + pState->numOfSub = (int32_t)taosArrayGetSize(pTableMetaInfo->pVgroupTables); + } + assert(pState->numOfSub > 0); int32_t ret = tscLocalReducerEnvCreate(pSql, &pMemoryBuf, &pDesc, &pModel, nBufferSize); @@ -2017,7 +2144,7 @@ static char* getResultBlockPosition(SSqlCmd* pCmd, SSqlRes* pRes, int32_t column assert(pInfo->pSqlExpr != NULL); *bytes = pInfo->pSqlExpr->resBytes; - char* pData = pRes->data + pInfo->pSqlExpr->offset * pRes->numOfRows; + char* pData = pRes->data + pInfo->pSqlExpr->offset * pRes->numOfRows + pRes->row * (*bytes); return pData; } @@ -2029,11 +2156,13 @@ static void doBuildResFromSubqueries(SSqlObj* pSql) { int32_t numOfRes = INT32_MAX; for (int32_t i = 0; i < pSql->subState.numOfSub; ++i) { - if (pSql->pSubs[i] == NULL) { + SSqlObj* pSub = pSql->pSubs[i]; + if (pSub == NULL) { continue; } - numOfRes = (int32_t)(MIN(numOfRes, pSql->pSubs[i]->res.numOfRows)); + int32_t remain = (int32_t)(pSub->res.numOfRows - pSub->res.row); + numOfRes = (int32_t)(MIN(numOfRes, remain)); } if (numOfRes == 0) { @@ -2059,14 +2188,23 @@ static void doBuildResFromSubqueries(SSqlObj* pSql) { size_t numOfExprs = tscSqlExprNumOfExprs(pQueryInfo); for(int32_t i = 0; i < numOfExprs; ++i) { SColumnIndex* pIndex = &pRes->pColumnIndex[i]; - SSqlRes *pRes1 = &pSql->pSubs[pIndex->tableIndex]->res; - SSqlCmd *pCmd1 = &pSql->pSubs[pIndex->tableIndex]->cmd; + SSqlRes* pRes1 = &pSql->pSubs[pIndex->tableIndex]->res; + SSqlCmd* pCmd1 = &pSql->pSubs[pIndex->tableIndex]->cmd; char* pData = getResultBlockPosition(pCmd1, pRes1, pIndex->columnIndex, &bytes); memcpy(data, pData, bytes * numOfRes); data += bytes * numOfRes; - pRes1->row = numOfRes; + } + + for(int32_t i = 0; i < pSql->subState.numOfSub; ++i) { + SSqlObj* pSub = pSql->pSubs[i]; + if (pSub == NULL) { + continue; + } + + pSub->res.row += numOfRes; + assert(pSub->res.row <= pSub->res.numOfRows); } pRes->numOfRows = numOfRes; @@ -2085,6 +2223,8 @@ void tscBuildResFromSubqueries(SSqlObj *pSql) { SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, pSql->cmd.clauseIndex); size_t numOfExprs = tscSqlExprNumOfExprs(pQueryInfo); + pRes->numOfCols = (int32_t)numOfExprs; + pRes->tsrow = calloc(numOfExprs, POINTER_BYTES); pRes->buffer = calloc(numOfExprs, POINTER_BYTES); pRes->length = calloc(numOfExprs, sizeof(int32_t)); diff --git a/src/client/src/tscUtil.c b/src/client/src/tscUtil.c index b60bf958a8..07bd9d1b07 100644 --- a/src/client/src/tscUtil.c +++ b/src/client/src/tscUtil.c @@ -1121,6 +1121,8 @@ int32_t tscSqlExprCopy(SArray* dst, const SArray* src, uint64_t uid, bool deepco } *p1 = *pExpr; + memset(p1->param, 0, sizeof(tVariant) * tListLen(p1->param)); + for (int32_t j = 0; j < pExpr->numOfParams; ++j) { tVariantAssign(&p1->param[j], &pExpr->param[j]); } @@ -1678,19 +1680,62 @@ void tscClearSubqueryInfo(SSqlCmd* pCmd) { } void tscFreeVgroupTableInfo(SArray* pVgroupTables) { - if (pVgroupTables != NULL) { - size_t num = taosArrayGetSize(pVgroupTables); - for (size_t i = 0; i < num; i++) { - SVgroupTableInfo* pInfo = taosArrayGet(pVgroupTables, i); - - for(int32_t j = 0; j < pInfo->vgInfo.numOfEps; ++j) { - taosTFree(pInfo->vgInfo.epAddr[j].fqdn); - } - - taosArrayDestroy(pInfo->itemList); - } - taosArrayDestroy(pVgroupTables); + if (pVgroupTables == NULL) { + return; } + + size_t num = taosArrayGetSize(pVgroupTables); + for (size_t i = 0; i < num; i++) { + SVgroupTableInfo* pInfo = taosArrayGet(pVgroupTables, i); + + for(int32_t j = 0; j < pInfo->vgInfo.numOfEps; ++j) { + taosTFree(pInfo->vgInfo.epAddr[j].fqdn); + } + + taosArrayDestroy(pInfo->itemList); + } + + taosArrayDestroy(pVgroupTables); +} + +void tscRemoveVgroupTableGroup(SArray* pVgroupTable, int32_t index) { + assert(pVgroupTable != NULL && index >= 0); + + size_t size = taosArrayGetSize(pVgroupTable); + assert(size > index); + + SVgroupTableInfo* pInfo = taosArrayGet(pVgroupTable, index); + for(int32_t j = 0; j < pInfo->vgInfo.numOfEps; ++j) { + taosTFree(pInfo->vgInfo.epAddr[j].fqdn); + } + + taosArrayDestroy(pInfo->itemList); + taosArrayRemove(pVgroupTable, index); +} + +SArray* tscCloneVgroupTableInfo(SArray* pVgroupTables) { + if (pVgroupTables == NULL) { + return NULL; + } + + size_t num = taosArrayGetSize(pVgroupTables); + SArray* pa = taosArrayInit(num, sizeof(SVgroupTableInfo)); + + SVgroupTableInfo info; + for (size_t i = 0; i < num; i++) { + SVgroupTableInfo* pInfo = taosArrayGet(pVgroupTables, i); + memset(&info, 0, sizeof(SVgroupTableInfo)); + + info.vgInfo = pInfo->vgInfo; + for(int32_t j = 0; j < pInfo->vgInfo.numOfEps; ++j) { + info.vgInfo.epAddr[j].fqdn = strdup(pInfo->vgInfo.epAddr[j].fqdn); + } + + info.itemList = taosArrayClone(pInfo->itemList); + taosArrayPush(pa, &info); + } + + return pa; } void clearAllTableMetaInfo(SQueryInfo* pQueryInfo, const char* address, bool removeFromCache) { @@ -1708,7 +1753,7 @@ void clearAllTableMetaInfo(SQueryInfo* pQueryInfo, const char* address, bool rem } STableMetaInfo* tscAddTableMetaInfo(SQueryInfo* pQueryInfo, const char* name, STableMeta* pTableMeta, - SVgroupsInfo* vgroupList, SArray* pTagCols) { + SVgroupsInfo* vgroupList, SArray* pTagCols, SArray* pVgroupTables) { void* pAlloc = realloc(pQueryInfo->pTableMetaInfo, (pQueryInfo->numOfTables + 1) * POINTER_BYTES); if (pAlloc == NULL) { terrno = TSDB_CODE_TSC_OUT_OF_MEMORY; @@ -1742,13 +1787,15 @@ STableMetaInfo* tscAddTableMetaInfo(SQueryInfo* pQueryInfo, const char* name, ST if (pTagCols != NULL) { tscColumnListCopy(pTableMetaInfo->tagColList, pTagCols, -1); } + + pTableMetaInfo->pVgroupTables = tscCloneVgroupTableInfo(pVgroupTables); pQueryInfo->numOfTables += 1; return pTableMetaInfo; } STableMetaInfo* tscAddEmptyMetaInfo(SQueryInfo* pQueryInfo) { - return tscAddTableMetaInfo(pQueryInfo, NULL, NULL, NULL, NULL); + return tscAddTableMetaInfo(pQueryInfo, NULL, NULL, NULL, NULL, NULL); } void tscClearTableMetaInfo(STableMetaInfo* pTableMetaInfo, bool removeFromCache) { @@ -1822,7 +1869,7 @@ SSqlObj* createSimpleSubObj(SSqlObj* pSql, void (*fp)(), void* param, int32_t cm assert(pSql->cmd.clauseIndex == 0); STableMetaInfo* pMasterTableMetaInfo = tscGetTableMetaInfoFromCmd(&pSql->cmd, pSql->cmd.clauseIndex, 0); - tscAddTableMetaInfo(pQueryInfo, pMasterTableMetaInfo->name, NULL, NULL, NULL); + tscAddTableMetaInfo(pQueryInfo, pMasterTableMetaInfo->name, NULL, NULL, NULL, NULL); registerSqlObj(pNew); return pNew; @@ -1987,14 +2034,16 @@ SSqlObj* createSubqueryObj(SSqlObj* pSql, int16_t tableIndex, void (*fp)(), void STableMeta* pTableMeta = taosCacheAcquireByData(tscMetaCache, pTableMetaInfo->pTableMeta); // get by name may failed due to the cache cleanup assert(pTableMeta != NULL); - pFinalInfo = tscAddTableMetaInfo(pNewQueryInfo, name, pTableMeta, pTableMetaInfo->vgroupList, pTableMetaInfo->tagColList); + pFinalInfo = tscAddTableMetaInfo(pNewQueryInfo, name, pTableMeta, pTableMetaInfo->vgroupList, + pTableMetaInfo->tagColList, pTableMetaInfo->pVgroupTables); } else { // transfer the ownership of pTableMeta to the newly create sql object. STableMetaInfo* pPrevInfo = tscGetTableMetaInfoFromCmd(&pPrevSql->cmd, pPrevSql->cmd.clauseIndex, 0); STableMeta* pPrevTableMeta = taosCacheTransfer(tscMetaCache, (void**)&pPrevInfo->pTableMeta); SVgroupsInfo* pVgroupsInfo = pPrevInfo->vgroupList; - pFinalInfo = tscAddTableMetaInfo(pNewQueryInfo, name, pPrevTableMeta, pVgroupsInfo, pTableMetaInfo->tagColList); + pFinalInfo = tscAddTableMetaInfo(pNewQueryInfo, name, pPrevTableMeta, pVgroupsInfo, pTableMetaInfo->tagColList, + pTableMetaInfo->pVgroupTables); } if (pFinalInfo->pTableMeta == NULL) { diff --git a/src/common/inc/tglobal.h b/src/common/inc/tglobal.h index 401e64acec..a30a94e35a 100644 --- a/src/common/inc/tglobal.h +++ b/src/common/inc/tglobal.h @@ -44,14 +44,17 @@ extern int32_t tsMaxShellConns; extern int32_t tsShellActivityTimer; extern uint32_t tsMaxTmrCtrl; extern float tsNumOfThreadsPerCore; -extern float tsRatioOfQueryThreads; +extern float tsRatioOfQueryThreads; // todo remove it extern int8_t tsDaylight; extern char tsTimezone[]; extern char tsLocale[]; -extern char tsCharset[]; // default encode string +extern char tsCharset[]; // default encode string extern int32_t tsEnableCoreFile; extern int32_t tsCompressMsgSize; +//query buffer management +extern int32_t tsQueryBufferSize; // maximum allowed usage buffer for each data node during query processing + // client extern int32_t tsTableMetaKeepTimer; extern int32_t tsMaxSQLStringLen; diff --git a/src/common/src/tglobal.c b/src/common/src/tglobal.c index 4fa66ffab0..9b12a65860 100644 --- a/src/common/src/tglobal.c +++ b/src/common/src/tglobal.c @@ -45,14 +45,14 @@ int32_t tsEnableTelemetryReporting = 1; char tsEmail[TSDB_FQDN_LEN] = {0}; // common -int32_t tsRpcTimer = 1000; -int32_t tsRpcMaxTime = 600; // seconds; -int32_t tsMaxShellConns = 5000; +int32_t tsRpcTimer = 1000; +int32_t tsRpcMaxTime = 600; // seconds; +int32_t tsMaxShellConns = 5000; int32_t tsMaxConnections = 5000; -int32_t tsShellActivityTimer = 3; // second -float tsNumOfThreadsPerCore = 1.0; -float tsRatioOfQueryThreads = 0.5; -int8_t tsDaylight = 0; +int32_t tsShellActivityTimer = 3; // second +float tsNumOfThreadsPerCore = 1.0f; +float tsRatioOfQueryThreads = 0.5f; +int8_t tsDaylight = 0; char tsTimezone[TSDB_TIMEZONE_LEN] = {0}; char tsLocale[TSDB_LOCALE_LEN] = {0}; char tsCharset[TSDB_LOCALE_LEN] = {0}; // default encode string @@ -99,6 +99,12 @@ float tsStreamComputDelayRatio = 0.1f; int32_t tsProjectExecInterval = 10000; // every 10sec, the projection will be executed once int64_t tsMaxRetentWindow = 24 * 3600L; // maximum time window tolerance +// the maximum allowed query buffer size during query processing for each data node. +// -1 no limit (default) +// 0 no query allowed, queries are disabled +// positive value (in MB) +int32_t tsQueryBufferSize = -1; + // db parameters int32_t tsCacheBlockSize = TSDB_DEFAULT_CACHE_BLOCK_SIZE; int32_t tsBlocksPerVnode = TSDB_DEFAULT_TOTAL_BLOCKS; @@ -715,7 +721,7 @@ static void doInitGlobalConfig(void) { cfg.minValue = TSDB_MIN_CACHE_BLOCK_SIZE; cfg.maxValue = TSDB_MAX_CACHE_BLOCK_SIZE; cfg.ptrLength = 0; - cfg.unitType = TAOS_CFG_UTYPE_Mb; + cfg.unitType = TAOS_CFG_UTYPE_MB; taosInitConfigOption(cfg); cfg.option = "blocks"; @@ -878,6 +884,16 @@ static void doInitGlobalConfig(void) { cfg.unitType = TAOS_CFG_UTYPE_NONE; taosInitConfigOption(cfg); + cfg.option = "queryBufferSize"; + cfg.ptr = &tsQueryBufferSize; + cfg.valType = TAOS_CFG_VTYPE_INT32; + cfg.cfgType = TSDB_CFG_CTYPE_B_CONFIG | TSDB_CFG_CTYPE_B_SHOW; + cfg.minValue = -1; + cfg.maxValue = 500000000000.0f; + cfg.ptrLength = 0; + cfg.unitType = TAOS_CFG_UTYPE_BYTE; + taosInitConfigOption(cfg); + // locale & charset cfg.option = "timezone"; cfg.ptr = tsTimezone; diff --git a/src/common/src/tvariant.c b/src/common/src/tvariant.c index 005def6dc5..9eb9924932 100644 --- a/src/common/src/tvariant.c +++ b/src/common/src/tvariant.c @@ -144,21 +144,24 @@ void tVariantDestroy(tVariant *pVar) { void tVariantAssign(tVariant *pDst, const tVariant *pSrc) { if (pSrc == NULL || pDst == NULL) return; - *pDst = *pSrc; - + pDst->nType = pSrc->nType; if (pSrc->nType == TSDB_DATA_TYPE_BINARY || pSrc->nType == TSDB_DATA_TYPE_NCHAR) { - int32_t len = pSrc->nLen + 1; - if (pSrc->nType == TSDB_DATA_TYPE_NCHAR) { - len = len * TSDB_NCHAR_SIZE; - } - - pDst->pz = calloc(1, len); - memcpy(pDst->pz, pSrc->pz, len); + int32_t len = pSrc->nLen + TSDB_NCHAR_SIZE; + char* p = realloc(pDst->pz, len); + assert(p); + + memset(p, 0, len); + pDst->pz = p; + + memcpy(pDst->pz, pSrc->pz, pSrc->nLen); + pDst->nLen = pSrc->nLen; return; + } - // this is only for string array - if (pSrc->nType == TSDB_DATA_TYPE_ARRAY) { + if (pSrc->nType >= TSDB_DATA_TYPE_BOOL && pSrc->nType <= TSDB_DATA_TYPE_DOUBLE) { + pDst->i64Key = pSrc->i64Key; + } else if (pSrc->nType == TSDB_DATA_TYPE_ARRAY) { // this is only for string array size_t num = taosArrayGetSize(pSrc->arr); pDst->arr = taosArrayInit(num, sizeof(char*)); for(size_t i = 0; i < num; i++) { @@ -166,8 +169,6 @@ void tVariantAssign(tVariant *pDst, const tVariant *pSrc) { char* n = strdup(p); taosArrayPush(pDst->arr, &n); } - - return; } pDst->nLen = tDataTypeDesc[pDst->nType].nSize; diff --git a/src/inc/query.h b/src/inc/query.h index 0c18f85dc3..5e1de77889 100644 --- a/src/inc/query.h +++ b/src/inc/query.h @@ -78,7 +78,6 @@ int32_t qKillQuery(qinfo_t qinfo); int32_t qQueryCompleted(qinfo_t qinfo); - /** * destroy query info structure * @param qHandle diff --git a/src/inc/taoserror.h b/src/inc/taoserror.h index b5ee795d03..b78a091951 100644 --- a/src/inc/taoserror.h +++ b/src/inc/taoserror.h @@ -238,6 +238,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_QRY_NOT_READY, 0, 0x0707, "Query not TAOS_DEFINE_ERROR(TSDB_CODE_QRY_HAS_RSP, 0, 0x0708, "Query should response") TAOS_DEFINE_ERROR(TSDB_CODE_QRY_IN_EXEC, 0, 0x0709, "Multiple retrieval of this query") TAOS_DEFINE_ERROR(TSDB_CODE_QRY_TOO_MANY_TIMEWINDOW, 0, 0x070A, "Too many time window in query") +TAOS_DEFINE_ERROR(TSDB_CODE_QRY_NOT_ENOUGH_BUFFER, 0, 0x070B, "Query buffer limit has reached") // grant TAOS_DEFINE_ERROR(TSDB_CODE_GRANT_EXPIRED, 0, 0x0800, "License expired") diff --git a/src/mnode/src/mnodeProfile.c b/src/mnode/src/mnodeProfile.c index f8f99e22c6..c29d1ec0b7 100644 --- a/src/mnode/src/mnodeProfile.c +++ b/src/mnode/src/mnodeProfile.c @@ -182,7 +182,7 @@ static int32_t mnodeGetConnsMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pC // app name pShow->bytes[cols] = TSDB_APPNAME_LEN + VARSTR_HEADER_SIZE; pSchema[cols].type = TSDB_DATA_TYPE_BINARY; - strcpy(pSchema[cols].name, "app_name"); + strcpy(pSchema[cols].name, "program"); pSchema[cols].bytes = htons(pShow->bytes[cols]); cols++; diff --git a/src/query/inc/qTsbuf.h b/src/query/inc/qTsbuf.h index 46e6f79014..6c2a955f47 100644 --- a/src/query/inc/qTsbuf.h +++ b/src/query/inc/qTsbuf.h @@ -35,16 +35,9 @@ typedef struct STSList { int32_t len; } STSList; -typedef struct STSRawBlock { - int32_t vnode; - int64_t tag; - TSKEY* ts; - int32_t len; -} STSRawBlock; - typedef struct STSElem { TSKEY ts; - tVariant tag; + tVariant* tag; int32_t vnode; } STSElem; @@ -84,6 +77,7 @@ typedef struct STSBuf { char path[PATH_MAX]; uint32_t fileSize; + // todo use array STSVnodeBlockInfoEx* pData; uint32_t numOfAlloc; uint32_t numOfVnodes; @@ -106,12 +100,12 @@ typedef struct STSBufFileHeader { STSBuf* tsBufCreate(bool autoDelete, int32_t order); STSBuf* tsBufCreateFromFile(const char* path, bool autoDelete); -STSBuf* tsBufCreateFromCompBlocks(const char* pData, int32_t numOfBlocks, int32_t len, int32_t tsOrder); +STSBuf* tsBufCreateFromCompBlocks(const char* pData, int32_t numOfBlocks, int32_t len, int32_t tsOrder, int32_t vnodeId); void* tsBufDestroy(STSBuf* pTSBuf); void tsBufAppend(STSBuf* pTSBuf, int32_t vnodeId, tVariant* tag, const char* pData, int32_t len); -int32_t tsBufMerge(STSBuf* pDestBuf, const STSBuf* pSrcBuf, int32_t vnodeIdx); +int32_t tsBufMerge(STSBuf* pDestBuf, const STSBuf* pSrcBuf); STSBuf* tsBufClone(STSBuf* pTSBuf); @@ -121,6 +115,7 @@ void tsBufFlush(STSBuf* pTSBuf); void tsBufResetPos(STSBuf* pTSBuf); STSElem tsBufGetElem(STSBuf* pTSBuf); + bool tsBufNextPos(STSBuf* pTSBuf); STSElem tsBufGetElemStartPos(STSBuf* pTSBuf, int32_t vnodeId, tVariant* tag); @@ -136,6 +131,10 @@ void tsBufSetCursor(STSBuf* pTSBuf, STSCursor* pCur); */ void tsBufDisplay(STSBuf* pTSBuf); +int32_t tsBufGetNumOfVnodes(STSBuf* pTSBuf); + +void tsBufGetVnodeIdList(STSBuf* pTSBuf, int32_t* num, int32_t** vnodeId); + #ifdef __cplusplus } #endif diff --git a/src/query/src/qExecutor.c b/src/query/src/qExecutor.c index 5ad52ef29e..d46beab2cb 100644 --- a/src/query/src/qExecutor.c +++ b/src/query/src/qExecutor.c @@ -184,7 +184,7 @@ static void resetMergeResultBuf(SQuery *pQuery, SQLFunctionCtx *pCtx, SResultInf static bool functionNeedToExecute(SQueryRuntimeEnv *pRuntimeEnv, SQLFunctionCtx *pCtx, int32_t functionId); static void setExecParams(SQuery *pQuery, SQLFunctionCtx *pCtx, void* inputData, TSKEY *tsCol, SDataBlockInfo* pBlockInfo, - SDataStatis *pStatis, void *param, int32_t colIndex); + SDataStatis *pStatis, void *param, int32_t colIndex, int32_t vgId); static void initCtxOutputBuf(SQueryRuntimeEnv *pRuntimeEnv); static void destroyTableQueryInfoImpl(STableQueryInfo *pTableQueryInfo); @@ -194,6 +194,8 @@ static void buildTagQueryResult(SQInfo *pQInfo); static int32_t setAdditionalInfo(SQInfo *pQInfo, void *pTable, STableQueryInfo *pTableQueryInfo); static int32_t flushFromResultBuf(SQueryRuntimeEnv* pRuntimeEnv, SGroupResInfo* pGroupResInfo); +static int32_t checkForQueryBuf(size_t numOfTables); +static void releaseQueryBuf(size_t numOfTables); bool doFilterData(SQuery *pQuery, int32_t elemPos) { for (int32_t k = 0; k < pQuery->numOfFilterCols; ++k) { @@ -1005,9 +1007,10 @@ static void blockwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SDataStatis * longjmp(pRuntimeEnv->env, TSDB_CODE_QRY_OUT_OF_MEMORY); } + SQInfo* pQInfo = GET_QINFO_ADDR(pRuntimeEnv); for (int32_t k = 0; k < pQuery->numOfOutput; ++k) { char *dataBlock = getDataBlock(pRuntimeEnv, &sasArray[k], k, pDataBlockInfo->rows, pDataBlock); - setExecParams(pQuery, &pCtx[k], dataBlock, tsCols, pDataBlockInfo, pStatis, &sasArray[k], k); + setExecParams(pQuery, &pCtx[k], dataBlock, tsCols, pDataBlockInfo, pStatis, &sasArray[k], k, pQInfo->vgId); } int32_t step = GET_FORWARD_DIRECTION_FACTOR(pQuery->order.order); @@ -1200,7 +1203,7 @@ static int32_t doTSJoinFilter(SQueryRuntimeEnv *pRuntimeEnv, int32_t offset) { SQLFunctionCtx *pCtx = pRuntimeEnv->pCtx; // compare tag first - if (tVariantCompare(&pCtx[0].tag, &elem.tag) != 0) { + if (tVariantCompare(&pCtx[0].tag, elem.tag) != 0) { return TS_JOIN_TAG_NOT_EQUALS; } @@ -1286,9 +1289,10 @@ static void rowwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SDataStatis *pS groupbyColumnData = getGroupbyColumnData(pQuery, &type, &bytes, pDataBlock); } + SQInfo* pQInfo = GET_QINFO_ADDR(pRuntimeEnv); for (int32_t k = 0; k < pQuery->numOfOutput; ++k) { char *dataBlock = getDataBlock(pRuntimeEnv, &sasArray[k], k, pDataBlockInfo->rows, pDataBlock); - setExecParams(pQuery, &pCtx[k], dataBlock, tsCols, pDataBlockInfo, pStatis, &sasArray[k], k); + setExecParams(pQuery, &pCtx[k], dataBlock, tsCols, pDataBlockInfo, pStatis, &sasArray[k], k, pQInfo->vgId); } // set the input column data @@ -1303,7 +1307,6 @@ static void rowwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SDataStatis *pS // from top to bottom in desc // from bottom to top in asc order if (pRuntimeEnv->pTSBuf != NULL) { - SQInfo *pQInfo = (SQInfo *)GET_QINFO_ADDR(pRuntimeEnv); qDebug("QInfo:%p process data rows, numOfRows:%d, query order:%d, ts comp order:%d", pQInfo, pDataBlockInfo->rows, pQuery->order.order, pRuntimeEnv->pTSBuf->cur.order); } @@ -1409,6 +1412,10 @@ static void rowwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SDataStatis *pS item->lastKey = (QUERY_IS_ASC_QUERY(pQuery)? pDataBlockInfo->window.ekey:pDataBlockInfo->window.skey) + step; } + if (pRuntimeEnv->pTSBuf != NULL) { + item->cur = tsBufGetCursor(pRuntimeEnv->pTSBuf); + } + // todo refactor: extract method for(int32_t i = 0; i < pQuery->numOfOutput; ++i) { if (pQuery->pSelectExpr[i].base.functionId != TSDB_FUNC_ARITHM) { @@ -1469,7 +1476,7 @@ static int32_t tableApplyFunctionsOnBlock(SQueryRuntimeEnv *pRuntimeEnv, SDataBl } void setExecParams(SQuery *pQuery, SQLFunctionCtx *pCtx, void* inputData, TSKEY *tsCol, SDataBlockInfo* pBlockInfo, - SDataStatis *pStatis, void *param, int32_t colIndex) { + SDataStatis *pStatis, void *param, int32_t colIndex, int32_t vgId) { int32_t functionId = pQuery->pSelectExpr[colIndex].base.functionId; int32_t colId = pQuery->pSelectExpr[colIndex].base.colInfo.colId; @@ -1542,6 +1549,9 @@ void setExecParams(SQuery *pQuery, SQLFunctionCtx *pCtx, void* inputData, TSKEY } } } + } else if (functionId == TSDB_FUNC_TS_COMP) { + pCtx->param[0].i64Key = vgId; + pCtx->param[0].nType = TSDB_DATA_TYPE_BIGINT; } #if defined(_DEBUG_VIEW) @@ -2621,12 +2631,19 @@ void setTagVal(SQueryRuntimeEnv *pRuntimeEnv, void *pTable, void *tsdb) { pFuncMsg->colInfo.colIndex == PRIMARYKEY_TIMESTAMP_COL_INDEX) { assert(pFuncMsg->numOfParams == 1); - int16_t tagColId = (int16_t)pExprInfo->base.arg->argValue.i64; - SColumnInfo* pColInfo = doGetTagColumnInfoById(pQuery->tagColList, pQuery->numOfTags, tagColId); + int16_t tagColId = (int16_t)pExprInfo->base.arg->argValue.i64; + SColumnInfo *pColInfo = doGetTagColumnInfoById(pQuery->tagColList, pQuery->numOfTags, tagColId); doSetTagValueInParam(tsdb, pTable, tagColId, &pRuntimeEnv->pCtx[0].tag, pColInfo->type, pColInfo->bytes); - qDebug("QInfo:%p set tag value for join comparison, colId:%" PRId64 ", val:%"PRId64, pQInfo, pExprInfo->base.arg->argValue.i64, - pRuntimeEnv->pCtx[0].tag.i64Key) + + int16_t tagType = pRuntimeEnv->pCtx[0].tag.nType; + if (tagType == TSDB_DATA_TYPE_BINARY || tagType == TSDB_DATA_TYPE_NCHAR) { + qDebug("QInfo:%p set tag value for join comparison, colId:%" PRId64 ", val:%s", pQInfo, + pExprInfo->base.arg->argValue.i64, pRuntimeEnv->pCtx[0].tag.pz); + } else { + qDebug("QInfo:%p set tag value for join comparison, colId:%" PRId64 ", val:%" PRId64, pQInfo, + pExprInfo->base.arg->argValue.i64, pRuntimeEnv->pCtx[0].tag.i64Key); + } } } } @@ -3860,14 +3877,40 @@ int32_t setAdditionalInfo(SQInfo *pQInfo, void* pTable, STableQueryInfo *pTableQ // both the master and supplement scan needs to set the correct ts comp start position if (pRuntimeEnv->pTSBuf != NULL) { + tVariant* pTag = &pRuntimeEnv->pCtx[0].tag; + if (pTableQueryInfo->cur.vgroupIndex == -1) { - tVariantAssign(&pTableQueryInfo->tag, &pRuntimeEnv->pCtx[0].tag); - tsBufGetElemStartPos(pRuntimeEnv->pTSBuf, 0, &pTableQueryInfo->tag); + tVariantAssign(&pTableQueryInfo->tag, pTag); + + STSElem elem = tsBufGetElemStartPos(pRuntimeEnv->pTSBuf, pQInfo->vgId, &pTableQueryInfo->tag); + + // failed to find data with the specified tag value and vnodeId + if (elem.vnode < 0) { + if (pTag->nType == TSDB_DATA_TYPE_BINARY || pTag->nType == TSDB_DATA_TYPE_NCHAR) { + qError("QInfo:%p failed to find tag:%s in ts_comp", pQInfo, pTag->pz); + } else { + qError("QInfo:%p failed to find tag:%" PRId64 " in ts_comp", pQInfo, pTag->i64Key); + } + + return false; + } // keep the cursor info of current meter - pTableQueryInfo->cur = pRuntimeEnv->pTSBuf->cur; + pTableQueryInfo->cur = tsBufGetCursor(pRuntimeEnv->pTSBuf); + if (pTag->nType == TSDB_DATA_TYPE_BINARY || pTag->nType == TSDB_DATA_TYPE_NCHAR) { + qDebug("QInfo:%p find tag:%s start pos in ts_comp, blockIndex:%d, tsIndex:%d", pQInfo, pTag->pz, pTableQueryInfo->cur.blockIndex, pTableQueryInfo->cur.tsIndex); + } else { + qDebug("QInfo:%p find tag:%"PRId64" start pos in ts_comp, blockIndex:%d, tsIndex:%d", pQInfo, pTag->i64Key, pTableQueryInfo->cur.blockIndex, pTableQueryInfo->cur.tsIndex); + } + } else { tsBufSetCursor(pRuntimeEnv->pTSBuf, &pTableQueryInfo->cur); + + if (pTag->nType == TSDB_DATA_TYPE_BINARY || pTag->nType == TSDB_DATA_TYPE_NCHAR) { + qDebug("QInfo:%p find tag:%s start pos in ts_comp, blockIndex:%d, tsIndex:%d", pQInfo, pTag->pz, pTableQueryInfo->cur.blockIndex, pTableQueryInfo->cur.tsIndex); + } else { + qDebug("QInfo:%p find tag:%"PRId64" start pos in ts_comp, blockIndex:%d, tsIndex:%d", pQInfo, pTag->i64Key, pTableQueryInfo->cur.blockIndex, pTableQueryInfo->cur.tsIndex); + } } } @@ -4763,15 +4806,62 @@ static bool multiTableMultioutputHelper(SQInfo *pQInfo, int32_t index) { } if (pRuntimeEnv->pTSBuf != NULL) { - if (pRuntimeEnv->cur.vgroupIndex == -1) { - STSElem elem = tsBufGetElemStartPos(pRuntimeEnv->pTSBuf, 0, &pRuntimeEnv->pCtx[0].tag); + tVariant* pTag = &pRuntimeEnv->pCtx[0].tag; - // failed to find data with the specified tag value + if (pRuntimeEnv->cur.vgroupIndex == -1) { + STSElem elem = tsBufGetElemStartPos(pRuntimeEnv->pTSBuf, pQInfo->vgId, pTag); + // failed to find data with the specified tag value and vnodeId if (elem.vnode < 0) { + if (pTag->nType == TSDB_DATA_TYPE_BINARY || pTag->nType == TSDB_DATA_TYPE_NCHAR) { + qError("QInfo:%p failed to find tag:%s in ts_comp", pQInfo, pTag->pz); + } else { + qError("QInfo:%p failed to find tag:%"PRId64" in ts_comp", pQInfo, pTag->i64Key); + } + return false; + } else { + STSCursor cur = tsBufGetCursor(pRuntimeEnv->pTSBuf); + + if (pTag->nType == TSDB_DATA_TYPE_BINARY || pTag->nType == TSDB_DATA_TYPE_NCHAR) { + qDebug("QInfo:%p find tag:%s start pos in ts_comp, blockIndex:%d, tsIndex:%d", pQInfo, pTag->pz, + cur.blockIndex, cur.tsIndex); + } else { + qDebug("QInfo:%p find tag:%"PRId64" start pos in ts_comp, blockIndex:%d, tsIndex:%d", pQInfo, pTag->i64Key, + cur.blockIndex, cur.tsIndex); + } } } else { - tsBufSetCursor(pRuntimeEnv->pTSBuf, &pRuntimeEnv->cur); + STSElem elem = tsBufGetElem(pRuntimeEnv->pTSBuf); + if (tVariantCompare(elem.tag, &pRuntimeEnv->pCtx[0].tag) != 0) { + + STSElem elem1 = tsBufGetElemStartPos(pRuntimeEnv->pTSBuf, pQInfo->vgId, pTag); + // failed to find data with the specified tag value and vnodeId + if (elem1.vnode < 0) { + if (pTag->nType == TSDB_DATA_TYPE_BINARY || pTag->nType == TSDB_DATA_TYPE_NCHAR) { + qError("QInfo:%p failed to find tag:%s in ts_comp", pQInfo, pTag->pz); + } else { + qError("QInfo:%p failed to find tag:%"PRId64" in ts_comp", pQInfo, pTag->i64Key); + } + + return false; + } else { + STSCursor cur = tsBufGetCursor(pRuntimeEnv->pTSBuf); + if (pTag->nType == TSDB_DATA_TYPE_BINARY || pTag->nType == TSDB_DATA_TYPE_NCHAR) { + qDebug("QInfo:%p find tag:%s start pos in ts_comp, blockIndex:%d, tsIndex:%d", pQInfo, pTag->pz, cur.blockIndex, cur.tsIndex); + } else { + qDebug("QInfo:%p find tag:%"PRId64" start pos in ts_comp, blockIndex:%d, tsIndex:%d", pQInfo, pTag->i64Key, cur.blockIndex, cur.tsIndex); + } + } + + } else { + tsBufSetCursor(pRuntimeEnv->pTSBuf, &pRuntimeEnv->cur); + STSCursor cur = tsBufGetCursor(pRuntimeEnv->pTSBuf); + if (pTag->nType == TSDB_DATA_TYPE_BINARY || pTag->nType == TSDB_DATA_TYPE_NCHAR) { + qDebug("QInfo:%p continue scan ts_comp file, tag:%s blockIndex:%d, tsIndex:%d", pQInfo, pTag->pz, cur.blockIndex, cur.tsIndex); + } else { + qDebug("QInfo:%p continue scan ts_comp file, tag:%"PRId64" blockIndex:%d, tsIndex:%d", pQInfo, pTag->i64Key, cur.blockIndex, cur.tsIndex); + } + } } } @@ -5027,6 +5117,10 @@ static void sequentialTableProcess(SQInfo *pQInfo) { break; } + if (pRuntimeEnv->pTSBuf != NULL) { + pRuntimeEnv->cur = pRuntimeEnv->pTSBuf->cur; + } + } else { // all data in the result buffer are skipped due to the offset, continue to retrieve data from current meter if (pQuery->rec.rows == 0) { @@ -6320,7 +6414,7 @@ static int32_t initQInfo(SQueryTableMsg *pQueryMsg, void *tsdb, int32_t vgId, SQ STSBuf *pTSBuf = NULL; if (pQueryMsg->tsLen > 0) { // open new file to save the result char *tsBlock = (char *) pQueryMsg + pQueryMsg->tsOffset; - pTSBuf = tsBufCreateFromCompBlocks(tsBlock, pQueryMsg->tsNumOfBlocks, pQueryMsg->tsLen, pQueryMsg->tsOrder); + pTSBuf = tsBufCreateFromCompBlocks(tsBlock, pQueryMsg->tsNumOfBlocks, pQueryMsg->tsLen, pQueryMsg->tsOrder, vgId); tsBufResetPos(pTSBuf); bool ret = tsBufNextPos(pTSBuf); @@ -6402,6 +6496,8 @@ static void freeQInfo(SQInfo *pQInfo) { qDebug("QInfo:%p start to free QInfo", pQInfo); + releaseQueryBuf(pQInfo->tableqinfoGroupInfo.numOfTables); + teardownQueryRuntimeEnv(&pQInfo->runtimeEnv); SQuery *pQuery = pQInfo->runtimeEnv.pQuery; @@ -6636,6 +6732,11 @@ int32_t qCreateQueryInfo(void* tsdb, int32_t vgId, SQueryTableMsg* pQueryMsg, qi assert(0); } + code = checkForQueryBuf(tableGroupInfo.numOfTables); + if (code != TSDB_CODE_SUCCESS) { // not enough query buffer, abort + goto _over; + } + (*pQInfo) = createQInfoImpl(pQueryMsg, pGroupbyExpr, pExprs, &tableGroupInfo, pTagColumnInfo, isSTableQuery); pExprs = NULL; pGroupbyExpr = NULL; @@ -7037,6 +7138,48 @@ static void buildTagQueryResult(SQInfo* pQInfo) { setQueryStatus(pQuery, QUERY_COMPLETED); } +static int64_t getQuerySupportBufSize(size_t numOfTables) { + size_t s1 = sizeof(STableQueryInfo); + size_t s2 = sizeof(SHashNode); + +// size_t s3 = sizeof(STableCheckInfo); buffer consumption in tsdb + return (int64_t)((s1 + s2) * 1.5 * numOfTables); +} + +int32_t checkForQueryBuf(size_t numOfTables) { + int64_t t = getQuerySupportBufSize(numOfTables); + if (tsQueryBufferSize < 0) { + return TSDB_CODE_SUCCESS; + } else if (tsQueryBufferSize > 0) { + + while(1) { + int64_t s = tsQueryBufferSize; + int64_t remain = s - t; + if (remain >= 0) { + if (atomic_val_compare_exchange_64(&tsQueryBufferSize, s, remain) == s) { + return TSDB_CODE_SUCCESS; + } + } else { + return TSDB_CODE_QRY_NOT_ENOUGH_BUFFER; + } + } + } + + // disable query processing if the value of tsQueryBufferSize is zero. + return TSDB_CODE_QRY_NOT_ENOUGH_BUFFER; +} + +void releaseQueryBuf(size_t numOfTables) { + if (tsQueryBufferSize <= 0) { + return; + } + + int64_t t = getQuerySupportBufSize(numOfTables); + + // restore value is not enough buffer available + atomic_add_fetch_64(&tsQueryBufferSize, t); +} + void* qGetResultRetrieveMsg(qinfo_t qinfo) { SQInfo* pQInfo = (SQInfo*) qinfo; assert(pQInfo != NULL); diff --git a/src/query/src/qTsbuf.c b/src/query/src/qTsbuf.c index b264f6cdc9..ad29cef5c2 100644 --- a/src/query/src/qTsbuf.c +++ b/src/query/src/qTsbuf.c @@ -403,7 +403,7 @@ void tsBufAppend(STSBuf* pTSBuf, int32_t vnodeId, tVariant* tag, const char* pDa } else { expandBuffer(ptsData, len); } - + tVariantAssign(&pTSBuf->block.tag, tag); memcpy(ptsData->rawBuf + ptsData->len, pData, (size_t)len); @@ -561,6 +561,19 @@ static void tsBufGetBlock(STSBuf* pTSBuf, int32_t vnodeIndex, int32_t blockIndex pCur->tsIndex = (pCur->order == TSDB_ORDER_ASC) ? 0 : pBlock->numOfElem - 1; } +static int32_t doUpdateVnodeInfo(STSBuf* pTSBuf, int64_t offset, STSVnodeBlockInfo* pVInfo) { + if (offset < 0 || offset >= getDataStartOffset()) { + return -1; + } + + if (fseek(pTSBuf->f, (int32_t)offset, SEEK_SET) != 0) { + return -1; + } + + fwrite(pVInfo, sizeof(STSVnodeBlockInfo), 1, pTSBuf->f); + return 0; +} + STSVnodeBlockInfo* tsBufGetVnodeBlockInfo(STSBuf* pTSBuf, int32_t vnodeId) { int32_t j = tsBufFindVnodeIndexFromId(pTSBuf->pData, pTSBuf->numOfVnodes, vnodeId); if (j == -1) { @@ -649,7 +662,7 @@ bool tsBufNextPos(STSBuf* pTSBuf) { return false; } - int32_t blockIndex = pCur->order == TSDB_ORDER_ASC ? 0 : pBlockInfo->numOfBlocks - 1; + int32_t blockIndex = (pCur->order == TSDB_ORDER_ASC) ? 0 : (pBlockInfo->numOfBlocks - 1); tsBufGetBlock(pTSBuf, pCur->vgroupIndex + step, blockIndex); break; @@ -675,8 +688,7 @@ void tsBufResetPos(STSBuf* pTSBuf) { } STSElem tsBufGetElem(STSBuf* pTSBuf) { - STSElem elem1 = {.vnode = -1}; - + STSElem elem1 = {.vnode = -1}; if (pTSBuf == NULL) { return elem1; } @@ -690,7 +702,7 @@ STSElem tsBufGetElem(STSBuf* pTSBuf) { elem1.vnode = pTSBuf->pData[pCur->vgroupIndex].info.vnode; elem1.ts = *(TSKEY*)(pTSBuf->tsData.rawBuf + pCur->tsIndex * TSDB_KEYSIZE); - tVariantAssign(&elem1.tag, &pBlock->tag); + elem1.tag = &pBlock->tag; return elem1; } @@ -702,7 +714,7 @@ STSElem tsBufGetElem(STSBuf* pTSBuf) { * @param vnodeId * @return */ -int32_t tsBufMerge(STSBuf* pDestBuf, const STSBuf* pSrcBuf, int32_t vnodeId) { +int32_t tsBufMerge(STSBuf* pDestBuf, const STSBuf* pSrcBuf) { if (pDestBuf == NULL || pSrcBuf == NULL || pSrcBuf->numOfVnodes <= 0) { return 0; } @@ -712,14 +724,13 @@ int32_t tsBufMerge(STSBuf* pDestBuf, const STSBuf* pSrcBuf, int32_t vnodeId) { } // src can only have one vnode index - if (pSrcBuf->numOfVnodes > 1) { - return -1; - } - + assert(pSrcBuf->numOfVnodes == 1); + // there are data in buffer, flush to disk first tsBufFlush(pDestBuf); // compared with the last vnode id + int32_t vnodeId = tsBufGetLastVnodeInfo((STSBuf*) pSrcBuf)->info.vnode; if (vnodeId != tsBufGetLastVnodeInfo(pDestBuf)->info.vnode) { int32_t oldSize = pDestBuf->numOfVnodes; int32_t newSize = oldSize + pSrcBuf->numOfVnodes; @@ -791,14 +802,14 @@ int32_t tsBufMerge(STSBuf* pDestBuf, const STSBuf* pSrcBuf, int32_t vnodeId) { return 0; } -STSBuf* tsBufCreateFromCompBlocks(const char* pData, int32_t numOfBlocks, int32_t len, int32_t order) { +STSBuf* tsBufCreateFromCompBlocks(const char* pData, int32_t numOfBlocks, int32_t len, int32_t order, int32_t vnodeId) { STSBuf* pTSBuf = tsBufCreate(true, order); STSVnodeBlockInfo* pBlockInfo = &(addOneVnodeInfo(pTSBuf, 0)->info); pBlockInfo->numOfBlocks = numOfBlocks; pBlockInfo->compLen = len; pBlockInfo->offset = getDataStartOffset(); - pBlockInfo->vnode = 0; + pBlockInfo->vnode = vnodeId; // update prev vnode length info in file TSBufUpdateVnodeInfo(pTSBuf, pTSBuf->numOfVnodes - 1, pBlockInfo); @@ -902,8 +913,8 @@ void tsBufDisplay(STSBuf* pTSBuf) { while (tsBufNextPos(pTSBuf)) { STSElem elem = tsBufGetElem(pTSBuf); - if (elem.tag.nType == TSDB_DATA_TYPE_BIGINT) { - printf("%d-%" PRId64 "-%" PRId64 "\n", elem.vnode, elem.tag.i64Key, elem.ts); + if (elem.tag->nType == TSDB_DATA_TYPE_BIGINT) { + printf("%d-%" PRId64 "-%" PRId64 "\n", elem.vnode, elem.tag->i64Key, elem.ts); } } @@ -915,19 +926,6 @@ static int32_t getDataStartOffset() { return sizeof(STSBufFileHeader) + TS_COMP_FILE_VNODE_MAX * sizeof(STSVnodeBlockInfo); } -static int32_t doUpdateVnodeInfo(STSBuf* pTSBuf, int64_t offset, STSVnodeBlockInfo* pVInfo) { - if (offset < 0 || offset >= getDataStartOffset()) { - return -1; - } - - if (fseek(pTSBuf->f, (int32_t)offset, SEEK_SET) != 0) { - return -1; - } - - fwrite(pVInfo, sizeof(STSVnodeBlockInfo), 1, pTSBuf->f); - return 0; -} - // update prev vnode length info in file static void TSBufUpdateVnodeInfo(STSBuf* pTSBuf, int32_t index, STSVnodeBlockInfo* pBlockInfo) { int32_t offset = sizeof(STSBufFileHeader) + index * sizeof(STSVnodeBlockInfo); @@ -969,3 +967,29 @@ static STSBuf* allocResForTSBuf(STSBuf* pTSBuf) { pTSBuf->fileSize += getDataStartOffset(); return pTSBuf; } + +int32_t tsBufGetNumOfVnodes(STSBuf* pTSBuf) { + if (pTSBuf == NULL) { + return 0; + } + + return pTSBuf->numOfVnodes; +} + +void tsBufGetVnodeIdList(STSBuf* pTSBuf, int32_t* num, int32_t** vnodeId) { + int32_t size = tsBufGetNumOfVnodes(pTSBuf); + if (num != NULL) { + *num = size; + } + + *vnodeId = NULL; + if (size == 0) { + return; + } + + (*vnodeId) = malloc(tsBufGetNumOfVnodes(pTSBuf) * sizeof(int32_t)); + + for(int32_t i = 0; i < size; ++i) { + (*vnodeId)[i] = pTSBuf->pData[i].info.vnode; + } +} \ No newline at end of file diff --git a/src/query/tests/tsBufTest.cpp b/src/query/tests/tsBufTest.cpp index b78c5314f2..8cd3a9cbef 100644 --- a/src/query/tests/tsBufTest.cpp +++ b/src/query/tests/tsBufTest.cpp @@ -304,7 +304,7 @@ void TSTraverse() { int32_t totalOutput = 10; while (1) { STSElem elem = tsBufGetElem(pTSBuf); - printf("%d-%" PRIu64 "-%" PRIu64 "\n", elem.vnode, elem.tag.i64Key, elem.ts); + printf("%d-%" PRIu64 "-%" PRIu64 "\n", elem.vnode, elem.tag->i64Key, elem.ts); if (!tsBufNextPos(pTSBuf)) { break; @@ -352,7 +352,7 @@ void TSTraverse() { totalOutput = 10; while (1) { STSElem elem = tsBufGetElem(pTSBuf); - printf("%d-%" PRIu64 "-%" PRIu64 "\n", elem.vnode, elem.tag.i64Key, elem.ts); + printf("%d-%" PRIu64 "-%" PRIu64 "\n", elem.vnode, elem.tag->i64Key, elem.ts); if (!tsBufNextPos(pTSBuf)) { break; @@ -416,8 +416,8 @@ void mergeDiffVnodeBufferTest() { int64_t* list = createTsList(num, start, step); t.i64Key = i; - tsBufAppend(pTSBuf1, 0, &t, (const char*)list, num * sizeof(int64_t)); - tsBufAppend(pTSBuf2, 0, &t, (const char*)list, num * sizeof(int64_t)); + tsBufAppend(pTSBuf1, 1, &t, (const char*)list, num * sizeof(int64_t)); + tsBufAppend(pTSBuf2, 9, &t, (const char*)list, num * sizeof(int64_t)); free(list); @@ -426,7 +426,7 @@ void mergeDiffVnodeBufferTest() { tsBufFlush(pTSBuf2); - tsBufMerge(pTSBuf1, pTSBuf2, 9); + tsBufMerge(pTSBuf1, pTSBuf2); EXPECT_EQ(pTSBuf1->numOfVnodes, 2); EXPECT_EQ(pTSBuf1->numOfTotal, numOfTags * 2 * num); @@ -459,8 +459,6 @@ void mergeIdenticalVnodeBufferTest() { start += step * num; } - - for (int32_t i = numOfTags; i < numOfTags * 2; ++i) { int64_t* list = createTsList(num, start, step); @@ -473,7 +471,7 @@ void mergeIdenticalVnodeBufferTest() { tsBufFlush(pTSBuf2); - tsBufMerge(pTSBuf1, pTSBuf2, 12); + tsBufMerge(pTSBuf1, pTSBuf2); EXPECT_EQ(pTSBuf1->numOfVnodes, 1); EXPECT_EQ(pTSBuf1->numOfTotal, numOfTags * 2 * num); @@ -482,7 +480,7 @@ void mergeIdenticalVnodeBufferTest() { STSElem elem = tsBufGetElem(pTSBuf1); EXPECT_EQ(elem.vnode, 12); - printf("%d-%" PRIu64 "-%" PRIu64 "\n", elem.vnode, elem.tag.i64Key, elem.ts); + printf("%d-%" PRIu64 "-%" PRIu64 "\n", elem.vnode, elem.tag->i64Key, elem.ts); } tsBufDestroy(pTSBuf1); diff --git a/src/util/inc/tconfig.h b/src/util/inc/tconfig.h index 8bd846531f..405c5ebdce 100644 --- a/src/util/inc/tconfig.h +++ b/src/util/inc/tconfig.h @@ -54,7 +54,7 @@ enum { TAOS_CFG_UTYPE_NONE, TAOS_CFG_UTYPE_PERCENT, TAOS_CFG_UTYPE_GB, - TAOS_CFG_UTYPE_Mb, + TAOS_CFG_UTYPE_MB, TAOS_CFG_UTYPE_BYTE, TAOS_CFG_UTYPE_SECOND, TAOS_CFG_UTYPE_MS diff --git a/src/util/src/tcache.c b/src/util/src/tcache.c index 6e20c1708d..5be7253f6d 100644 --- a/src/util/src/tcache.c +++ b/src/util/src/tcache.c @@ -335,7 +335,7 @@ void *taosCacheTransfer(SCacheObj *pCacheObj, void **data) { } void taosCacheRelease(SCacheObj *pCacheObj, void **data, bool _remove) { - if (pCacheObj == NULL || taosHashGetSize(pCacheObj->pHashTable) + pCacheObj->numOfElemsInTrash == 0) { + if (pCacheObj == NULL) { return; } @@ -343,7 +343,12 @@ void taosCacheRelease(SCacheObj *pCacheObj, void **data, bool _remove) { uError("cache:%s, NULL data to release", pCacheObj->name); return; } - + + + // The operation of removal from hash table and addition to trashcan is not an atomic operation, + // therefore the check for the empty of both the hash table and the trashcan has a race condition. + // It happens when there is only one object in the cache, and two threads which has referenced this object + // start to free the it simultaneously [TD-1569]. size_t offset = offsetof(SCacheDataNode, data); SCacheDataNode *pNode = (SCacheDataNode *)((char *)(*data) - offset); diff --git a/tests/pytest/cluster/bananceTest.py b/tests/pytest/cluster/bananceTest.py new file mode 100644 index 0000000000..ef25afa7d2 --- /dev/null +++ b/tests/pytest/cluster/bananceTest.py @@ -0,0 +1,57 @@ +################################################################### +# Copyright (c) 2016 by TAOS Technologies, Inc. +# All rights reserved. +# +# This file is proprietary and confidential to TAOS Technologies. +# No part of this file may be reproduced, stored, transmitted, +# disclosed or used in any form or by any means other than as +# expressly provided by the written permission from Jianhui Tao +# +################################################################### + +# -*- coding: utf-8 -*- + +import sys +from clusterSetup import * +from util.sql import tdSql +from util.log import tdLog +import random +import time + +class ClusterTestcase: + + ## test case 32 ## + def run(self): + + nodes = Nodes() + nodes.addConfigs("maxVgroupsPerDb", "10") + nodes.addConfigs("maxTablesPerVnode", "1000") + nodes.restartAllTaosd() + + ctest = ClusterTest(nodes.node1.hostName) + ctest.connectDB() + ctest.createSTable(1) + ctest.run() + tdSql.init(ctest.conn.cursor(), False) + + tdSql.execute("use %s" % ctest.dbName) + tdSql.query("show vgroups") + dnodes = [] + for i in range(10): + dnodes.append(int(tdSql.getData(i, 4))) + + s = set(dnodes) + if len(s) < 3: + tdLog.exit("cluster is not balanced") + + tdLog.info("cluster is balanced") + + nodes.removeConfigs("maxVgroupsPerDb", "10") + nodes.removeConfigs("maxTablesPerVnode", "1000") + nodes.restartAllTaosd() + + tdSql.close() + tdLog.success("%s successfully executed" % __file__) + +ct = ClusterTestcase() +ct.run() diff --git a/tests/pytest/cluster/basicTest.py b/tests/pytest/cluster/basicTest.py new file mode 100644 index 0000000000..b990d7fd98 --- /dev/null +++ b/tests/pytest/cluster/basicTest.py @@ -0,0 +1,47 @@ +################################################################### +# Copyright (c) 2016 by TAOS Technologies, Inc. +# All rights reserved. +# +# This file is proprietary and confidential to TAOS Technologies. +# No part of this file may be reproduced, stored, transmitted, +# disclosed or used in any form or by any means other than as +# expressly provided by the written permission from Jianhui Tao +# +################################################################### + +# -*- coding: utf-8 -*- + +import sys +from clusterSetup import * +from util.sql import tdSql +from util.log import tdLog +import random + +class ClusterTestcase: + + ## test case 1, 33 ## + def run(self): + + nodes = Nodes() + ctest = ClusterTest(nodes.node1.hostName) + + ctest.connectDB() + tdSql.init(ctest.conn.cursor(), False) + + ## Test case 1 ## + tdLog.info("Test case 1 repeat %d times" % ctest.repeat) + for i in range(ctest.repeat): + tdLog.info("Start Round %d" % (i + 1)) + replica = random.randint(1,3) + ctest.createSTable(replica) + ctest.run() + tdLog.sleep(10) + tdSql.query("select count(*) from %s.%s" %(ctest.dbName, ctest.stbName)) + tdSql.checkData(0, 0, ctest.numberOfRecords * ctest.numberOfTables) + tdLog.info("Round %d completed" % (i + 1)) + + tdSql.close() + tdLog.success("%s successfully executed" % __file__) + +ct = ClusterTestcase() +ct.run() \ No newline at end of file diff --git a/tests/pytest/cluster/changeReplicaTest.py b/tests/pytest/cluster/changeReplicaTest.py new file mode 100644 index 0000000000..7fa68edbfe --- /dev/null +++ b/tests/pytest/cluster/changeReplicaTest.py @@ -0,0 +1,51 @@ +################################################################### +# Copyright (c) 2016 by TAOS Technologies, Inc. +# All rights reserved. +# +# This file is proprietary and confidential to TAOS Technologies. +# No part of this file may be reproduced, stored, transmitted, +# disclosed or used in any form or by any means other than as +# expressly provided by the written permission from Jianhui Tao +# +################################################################### + +# -*- coding: utf-8 -*- + +import sys +from clusterSetup import * +from util.sql import tdSql +from util.log import tdLog +import random + +class ClusterTestcase: + + ## test case 7, ## + def run(self): + + nodes = Nodes() + ctest = ClusterTest(nodes.node1.hostName) + ctest.connectDB() + tdSql.init(ctest.conn.cursor(), False) + + tdSql.execute("use %s" % ctest.dbName) + tdSql.query("show vgroups") + for i in range(10): + tdSql.checkData(i, 5, "master") + + tdSql.execute("alter database %s replica 2" % ctest.dbName) + tdLog.sleep(30) + tdSql.query("show vgroups") + for i in range(10): + tdSql.checkData(i, 5, "master") + tdSql.checkData(i, 7, "slave") + + tdSql.execute("alter database %s replica 3" % ctest.dbName) + tdLog.sleep(30) + tdSql.query("show vgroups") + for i in range(10): + tdSql.checkData(i, 5, "master") + tdSql.checkData(i, 7, "slave") + tdSql.checkData(i, 9, "slave") + +ct = ClusterTestcase() +ct.run() \ No newline at end of file diff --git a/tests/pytest/cluster/clusterSetup.py b/tests/pytest/cluster/clusterSetup.py new file mode 100644 index 0000000000..36af8ac42e --- /dev/null +++ b/tests/pytest/cluster/clusterSetup.py @@ -0,0 +1,202 @@ +################################################################### +# Copyright (c) 2016 by TAOS Technologies, Inc. +# All rights reserved. +# +# This file is proprietary and confidential to TAOS Technologies. +# No part of this file may be reproduced, stored, transmitted, +# disclosed or used in any form or by any means other than as +# expressly provided by the written permission from Jianhui Tao +# +################################################################### + +# -*- coding: utf-8 -*- + +import os +import sys +sys.path.insert(0, os.getcwd()) +from fabric import Connection +from util.sql import * +from util.log import * +import taos +import random +import threading +import logging + +class Node: + def __init__(self, index, username, hostIP, hostName, password, homeDir): + self.index = index + self.username = username + self.hostIP = hostIP + self.hostName = hostName + self.homeDir = homeDir + self.conn = Connection("{}@{}".format(username, hostName), connect_kwargs={"password": "{}".format(password)}) + + def startTaosd(self): + try: + self.conn.run("sudo systemctl start taosd") + except Exception as e: + print("Start Taosd error for node %d " % self.index) + logging.exception(e) + + def stopTaosd(self): + try: + self.conn.run("sudo systemctl stop taosd") + except Exception as e: + print("Stop Taosd error for node %d " % self.index) + logging.exception(e) + + def restartTaosd(self): + try: + self.conn.run("sudo systemctl restart taosd") + except Exception as e: + print("Stop Taosd error for node %d " % self.index) + logging.exception(e) + + def removeTaosd(self): + try: + self.conn.run("rmtaos") + except Exception as e: + print("remove taosd error for node %d " % self.index) + logging.exception(e) + + def installTaosd(self, packagePath): + self.conn.put(packagePath, self.homeDir) + self.conn.cd(self.homeDir) + self.conn.run("tar -zxf $(basename '%s')" % packagePath) + with self.conn.cd("TDengine-enterprise-server"): + self.conn.run("yes|./install.sh") + + def configTaosd(self, taosConfigKey, taosConfigValue): + self.conn.run("sudo echo '%s %s' >> %s" % (taosConfigKey, taosConfigValue, "/etc/taos/taos.cfg")) + + def removeTaosConfig(self, taosConfigKey, taosConfigValue): + self.conn.run("sudo sed -in-place -e '/%s %s/d' %s" % (taosConfigKey, taosConfigValue, "/etc/taos/taos.cfg")) + + def configHosts(self, ip, name): + self.conn.run("echo '%s %s' >> %s" % (ip, name, '/etc/hosts')) + + def removeData(self): + try: + self.conn.run("sudo rm -rf /var/lib/taos/*") + except Exception as e: + print("remove taosd data error for node %d " % self.index) + logging.exception(e) + + def removeLog(self): + try: + self.conn.run("sudo rm -rf /var/log/taos/*") + except Exception as e: + print("remove taosd error for node %d " % self.index) + logging.exception(e) + + def removeDataForMnode(self): + try: + self.conn.run("sudo rm -rf /var/lib/taos/*") + except Exception as e: + print("remove taosd error for node %d " % self.index) + logging.exception(e) + + def removeDataForVnode(self, id): + try: + self.conn.run("sudo rm -rf /var/lib/taos/vnode%d/*.data" % id) + except Exception as e: + print("remove taosd error for node %d " % self.index) + logging.exception(e) + +class Nodes: + def __init__(self): + self.node1 = Node(1, 'ubuntu', '192.168.1.52', 'node1', 'tbase125!', '/home/ubuntu') + self.node2 = Node(2, 'ubuntu', '192.168.1.53', 'node2', 'tbase125!', '/home/ubuntu') + self.node3 = Node(3, 'ubuntu', '192.168.1.54', 'node3', 'tbase125!', '/home/ubuntu') + + def stopAllTaosd(self): + self.node1.stopTaosd() + self.node2.stopTaosd() + self.node3.stopTaosd() + + def startAllTaosd(self): + self.node1.startTaosd() + self.node2.startTaosd() + self.node3.startTaosd() + + def restartAllTaosd(self): + self.node1.restartTaosd() + self.node2.restartTaosd() + self.node3.restartTaosd() + + def addConfigs(self, configKey, configValue): + self.node1.configTaosd(configKey, configValue) + self.node2.configTaosd(configKey, configValue) + self.node3.configTaosd(configKey, configValue) + + def removeConfigs(self, configKey, configValue): + self.node1.removeTaosConfig(configKey, configValue) + self.node2.removeTaosConfig(configKey, configValue) + self.node3.removeTaosConfig(configKey, configValue) + + def removeAllDataFiles(self): + self.node1.removeData() + self.node2.removeData() + self.node3.removeData() + +class ClusterTest: + def __init__(self, hostName): + self.host = hostName + self.user = "root" + self.password = "taosdata" + self.config = "/etc/taos" + self.dbName = "mytest" + self.stbName = "meters" + self.numberOfThreads = 20 + self.numberOfTables = 10000 + self.numberOfRecords = 1000 + self.tbPrefix = "t" + self.ts = 1538548685000 + self.repeat = 1 + + def connectDB(self): + self.conn = taos.connect( + host=self.host, + user=self.user, + password=self.password, + config=self.config) + + def createSTable(self, replica): + cursor = self.conn.cursor() + tdLog.info("drop database if exists %s" % self.dbName) + cursor.execute("drop database if exists %s" % self.dbName) + tdLog.info("create database %s replica %d" % (self.dbName, replica)) + cursor.execute("create database %s replica %d" % (self.dbName, replica)) + tdLog.info("use %s" % self.dbName) + cursor.execute("use %s" % self.dbName) + tdLog.info("drop table if exists %s" % self.stbName) + cursor.execute("drop table if exists %s" % self.stbName) + tdLog.info("create table %s(ts timestamp, current float, voltage int, phase int) tags(id int)" % self.stbName) + cursor.execute("create table %s(ts timestamp, current float, voltage int, phase int) tags(id int)" % self.stbName) + cursor.close() + + def insertData(self, threadID): + print("Thread %d: starting" % threadID) + cursor = self.conn.cursor() + tablesPerThread = int(self.numberOfTables / self.numberOfThreads) + baseTableID = tablesPerThread * threadID + for i in range (tablesPerThread): + cursor.execute("create table %s%d using %s tags(%d)" % (self.tbPrefix, baseTableID + i, self.stbName, baseTableID + i)) + query = "insert into %s%d values" % (self.tbPrefix, baseTableID + i) + base = self.numberOfRecords * i + for j in range(self.numberOfRecords): + query += "(%d, %f, %d, %d)" % (self.ts + base + j, random.random(), random.randint(210, 230), random.randint(0, 10)) + cursor.execute(query) + cursor.close() + print("Thread %d: finishing" % threadID) + + def run(self): + threads = [] + tdLog.info("Inserting data") + for i in range(self.numberOfThreads): + thread = threading.Thread(target=self.insertData, args=(i,)) + threads.append(thread) + thread.start() + + for i in range(self.numberOfThreads): + threads[i].join() \ No newline at end of file diff --git a/tests/pytest/cluster/dataFileRecoveryTest.py b/tests/pytest/cluster/dataFileRecoveryTest.py new file mode 100644 index 0000000000..089d3fffc1 --- /dev/null +++ b/tests/pytest/cluster/dataFileRecoveryTest.py @@ -0,0 +1,53 @@ +################################################################### +# Copyright (c) 2016 by TAOS Technologies, Inc. +# All rights reserved. +# +# This file is proprietary and confidential to TAOS Technologies. +# No part of this file may be reproduced, stored, transmitted, +# disclosed or used in any form or by any means other than as +# expressly provided by the written permission from Jianhui Tao +# +################################################################### + +# -*- coding: utf-8 -*- + +import sys +from clusterSetup import * +from util.sql import tdSql +from util.log import tdLog +import random + +class ClusterTestcase: + + ## test case 20, 21, 22 ## + def run(self): + + nodes = Nodes() + ctest = ClusterTest(nodes.node1.hostName) + ctest.connectDB() + ctest.createSTable(3) + ctest.run() + tdSql.init(ctest.conn.cursor(), False) + + nodes.node2.stopTaosd() + tdSql.execute("use %s" % ctest.dbName) + tdSql.query("show vgroups") + vnodeID = tdSql.getData(0, 0) + nodes.node2.removeDataForVnode(vnodeID) + nodes.node2.startTaosd() + + # Wait for vnode file to recover + for i in range(10): + tdSql.query("select count(*) from t0") + + tdLog.sleep(10) + + for i in range(10): + tdSql.query("select count(*) from t0") + tdSql.checkData(0, 0, 1000) + + tdSql.close() + tdLog.success("%s successfully executed" % __file__) + +ct = ClusterTestcase() +ct.run() diff --git a/tests/pytest/cluster/fullDnodesTest.py b/tests/pytest/cluster/fullDnodesTest.py new file mode 100644 index 0000000000..3c4b10d97a --- /dev/null +++ b/tests/pytest/cluster/fullDnodesTest.py @@ -0,0 +1,47 @@ +################################################################### +# Copyright (c) 2016 by TAOS Technologies, Inc. +# All rights reserved. +# +# This file is proprietary and confidential to TAOS Technologies. +# No part of this file may be reproduced, stored, transmitted, +# disclosed or used in any form or by any means other than as +# expressly provided by the written permission from Jianhui Tao +# +################################################################### + +# -*- coding: utf-8 -*- + +import sys +from clusterSetup import * +from util.sql import tdSql +from util.log import tdLog +import random + +class ClusterTestcase: + + ##Cover test case 5 ## + def run(self): + # cluster environment set up + nodes = Nodes() + nodes.addConfigs("maxVgroupsPerDb", "10") + nodes.addConfigs("maxTablesPerVnode", "1000") + nodes.restartAllTaosd() + + ctest = ClusterTest(nodes.node1.hostName) + ctest.connectDB() + ctest.createSTable(1) + ctest.run() + + tdSql.init(ctest.conn.cursor(), False) + tdSql.execute("use %s" % ctest.dbName) + tdSql.error("create table tt1 using %s tags(1)" % ctest.stbName) + + nodes.removeConfigs("maxVgroupsPerDb", "10") + nodes.removeConfigs("maxTablesPerVnode", "1000") + nodes.restartAllTaosd() + + tdSql.close() + tdLog.success("%s successfully executed" % __file__) + +ct = ClusterTestcase() +ct.run() \ No newline at end of file diff --git a/tests/pytest/cluster/killAndRestartDnodesTest.py b/tests/pytest/cluster/killAndRestartDnodesTest.py new file mode 100644 index 0000000000..be927e862f --- /dev/null +++ b/tests/pytest/cluster/killAndRestartDnodesTest.py @@ -0,0 +1,75 @@ +################################################################### +# Copyright (c) 2016 by TAOS Technologies, Inc. +# All rights reserved. +# +# This file is proprietary and confidential to TAOS Technologies. +# No part of this file may be reproduced, stored, transmitted, +# disclosed or used in any form or by any means other than as +# expressly provided by the written permission from Jianhui Tao +# +################################################################### + +# -*- coding: utf-8 -*- + +import sys +from clusterSetup import * +from util.sql import tdSql +from util.log import tdLog +import random + +class ClusterTestcase: + + ## test case 7, 10 ## + def run(self): + # cluster environment set up + tdLog.info("Test case 7, 10") + + nodes = Nodes() + ctest = ClusterTest(nodes.node1.hostName) + ctest.connectDB() + tdSql.init(ctest.conn.cursor(), False) + + nodes.node1.stopTaosd() + tdSql.query("show dnodes") + tdSql.checkRows(3) + tdSql.checkData(0, 4, "offline") + tdSql.checkData(1, 4, "ready") + tdSql.checkData(2, 4, "ready") + + nodes.node1.startTaosd() + tdSql.checkRows(3) + tdSql.checkData(0, 4, "ready") + tdSql.checkData(1, 4, "ready") + tdSql.checkData(2, 4, "ready") + + nodes.node2.stopTaosd() + tdSql.query("show dnodes") + tdSql.checkRows(3) + tdSql.checkData(0, 4, "ready") + tdSql.checkData(1, 4, "offline") + tdSql.checkData(2, 4, "ready") + + nodes.node2.startTaosd() + tdSql.checkRows(3) + tdSql.checkData(0, 4, "ready") + tdSql.checkData(1, 4, "ready") + tdSql.checkData(2, 4, "ready") + + nodes.node3.stopTaosd() + tdSql.query("show dnodes") + tdSql.checkRows(3) + tdSql.checkData(0, 4, "ready") + tdSql.checkData(1, 4, "ready") + tdSql.checkData(2, 4, "offline") + + nodes.node3.startTaosd() + tdSql.checkRows(3) + tdSql.checkData(0, 4, "ready") + tdSql.checkData(1, 4, "ready") + tdSql.checkData(2, 4, "ready") + + tdSql.close() + tdLog.success("%s successfully executed" % __file__) + +ct = ClusterTestcase() +ct.run() \ No newline at end of file diff --git a/tests/pytest/cluster/offlineThresholdTest.py b/tests/pytest/cluster/offlineThresholdTest.py new file mode 100644 index 0000000000..8373424f93 --- /dev/null +++ b/tests/pytest/cluster/offlineThresholdTest.py @@ -0,0 +1,54 @@ +################################################################### +# Copyright (c) 2016 by TAOS Technologies, Inc. +# All rights reserved. +# +# This file is proprietary and confidential to TAOS Technologies. +# No part of this file may be reproduced, stored, transmitted, +# disclosed or used in any form or by any means other than as +# expressly provided by the written permission from Jianhui Tao +# +################################################################### + +# -*- coding: utf-8 -*- + +import sys +from clusterSetup import * +from util.sql import tdSql +from util.log import tdLog +import random + +class ClusterTestcase: + + ## cover test case 6, 8, 9, 11 ## + def run(self): + # cluster environment set up + nodes = Nodes() + ctest = ClusterTest(nodes.node1.hostName) + ctest.connectDB() + tdSql.init(ctest.conn.cursor(), False) + + nodes.addConfigs("offlineThreshold", "10") + nodes.removeAllDataFiles() + nodes.restartAllTaosd() + nodes.node3.stopTaosd() + + tdLog.sleep(10) + tdSql.query("show dnodes") + tdSql.checkRows(3) + tdSql.checkData(2, 4, "offline") + + tdLog.sleep(60) + tdSql.checkRows(3) + tdSql.checkData(2, 4, "dropping") + + tdLog.sleep(300) + tdSql.checkRows(2) + + nodes.removeConfigs("offlineThreshold", "10") + nodes.restartAllTaosd() + + tdSql.close() + tdLog.success("%s successfully executed" % __file__) + +ct = ClusterTestcase() +ct.run() \ No newline at end of file diff --git a/tests/pytest/cluster/oneReplicaOfflineTest.py b/tests/pytest/cluster/oneReplicaOfflineTest.py new file mode 100644 index 0000000000..0223dfe01a --- /dev/null +++ b/tests/pytest/cluster/oneReplicaOfflineTest.py @@ -0,0 +1,65 @@ +################################################################### +# Copyright (c) 2016 by TAOS Technologies, Inc. +# All rights reserved. +# +# This file is proprietary and confidential to TAOS Technologies. +# No part of this file may be reproduced, stored, transmitted, +# disclosed or used in any form or by any means other than as +# expressly provided by the written permission from Jianhui Tao +# +################################################################### + +# -*- coding: utf-8 -*- + +import sys +from clusterSetup import * +from util.sql import tdSql +from util.log import tdLog +import random + +class ClusterTestcase: + + ## test case 28, 29, 30, 31 ## + def run(self): + + nodes = Nodes() + ctest = ClusterTest(nodes.node1.hostName) + ctest.connectDB() + ctest.createSTable(3) + ctest.run() + tdSql.init(ctest.conn.cursor(), False) + + tdSql.execute("use %s" % ctest.dbName) + + nodes.node2.stopTaosd() + for i in range(100): + tdSql.execute("drop table t%d" % i) + + nodes.node2.startTaosd() + tdSql.query("show tables") + tdSql.checkRows(9900) + + nodes.node2.stopTaosd() + for i in range(10): + tdSql.execute("create table a%d using meters tags(2)" % i) + + nodes.node2.startTaosd() + tdSql.query("show tables") + tdSql.checkRows(9910) + + nodes.node2.stopTaosd() + tdSql.execute("alter table meters add col col6 int") + nodes.node2.startTaosd() + + nodes.node2.stopTaosd() + tdSql.execute("drop database %s" % ctest.dbName) + + nodes.node2.startTaosd() + tdSql.query("show databases") + tdSql.checkRows(0) + + tdSql.close() + tdLog.success("%s successfully executed" % __file__) + +ct = ClusterTestcase() +ct.run() diff --git a/tests/pytest/cluster/queryTimeTest.py b/tests/pytest/cluster/queryTimeTest.py new file mode 100644 index 0000000000..74a9081ccf --- /dev/null +++ b/tests/pytest/cluster/queryTimeTest.py @@ -0,0 +1,54 @@ +################################################################### +# Copyright (c) 2016 by TAOS Technologies, Inc. +# All rights reserved. +# +# This file is proprietary and confidential to TAOS Technologies. +# No part of this file may be reproduced, stored, transmitted, +# disclosed or used in any form or by any means other than as +# expressly provided by the written permission from Jianhui Tao +# +################################################################### + +# -*- coding: utf-8 -*- + +import sys +from clusterSetup import * +from util.sql import tdSql +from util.log import tdLog +import random +import time + +class ClusterTestcase: + + ## test case 32 ## + def run(self): + + nodes = Nodes() + ctest = ClusterTest(nodes.node1.hostName) + ctest.connectDB() + ctest.createSTable(1) + ctest.run() + tdSql.init(ctest.conn.cursor(), False) + + tdSql.execute("use %s" % ctest.dbName) + totalTime = 0 + for i in range(10): + startTime = time.time() + tdSql.query("select * from %s" % ctest.stbName) + totalTime += time.time() - startTime + print("replica 1: avarage query time for %d records: %f seconds" % (ctest.numberOfTables * ctest.numberOfRecords,totalTime / 10)) + + tdSql.execute("alter database %s replica 3" % ctest.dbName) + tdLog.sleep(60) + totalTime = 0 + for i in range(10): + startTime = time.time() + tdSql.query("select * from %s" % ctest.stbName) + totalTime += time.time() - startTime + print("replica 3: avarage query time for %d records: %f seconds" % (ctest.numberOfTables * ctest.numberOfRecords,totalTime / 10)) + + tdSql.close() + tdLog.success("%s successfully executed" % __file__) + +ct = ClusterTestcase() +ct.run() diff --git a/tests/pytest/cluster/stopAllDnodesTest.py b/tests/pytest/cluster/stopAllDnodesTest.py new file mode 100644 index 0000000000..a71ae52e3d --- /dev/null +++ b/tests/pytest/cluster/stopAllDnodesTest.py @@ -0,0 +1,45 @@ +################################################################### +# Copyright (c) 2016 by TAOS Technologies, Inc. +# All rights reserved. +# +# This file is proprietary and confidential to TAOS Technologies. +# No part of this file may be reproduced, stored, transmitted, +# disclosed or used in any form or by any means other than as +# expressly provided by the written permission from Jianhui Tao +# +################################################################### + +# -*- coding: utf-8 -*- + +import sys +from clusterSetup import * +from util.sql import tdSql +from util.log import tdLog +import random + +class ClusterTestcase: + + ## test case 19 ## + def run(self): + + nodes = Nodes() + ctest = ClusterTest(nodes.node1.hostName) + tdSql.init(ctest.conn.cursor(), False) + + tdSql.query("show databases") + count = tdSql.queryRows; + + nodes.stopAllTaosd() + nodes.node1.startTaosd() + tdSql.error("show databases") + + nodes.node2.startTaosd() + tdSql.error("show databases") + + nodes.node3.startTaosd() + tdLog.sleep(10) + tdSql.query("show databases") + tdSql.checkRows(count) + +ct = ClusterTestcase() +ct.run() diff --git a/tests/pytest/cluster/stopTwoDnodesTest.py b/tests/pytest/cluster/stopTwoDnodesTest.py new file mode 100644 index 0000000000..9e9958e2d3 --- /dev/null +++ b/tests/pytest/cluster/stopTwoDnodesTest.py @@ -0,0 +1,48 @@ +################################################################### +# Copyright (c) 2016 by TAOS Technologies, Inc. +# All rights reserved. +# +# This file is proprietary and confidential to TAOS Technologies. +# No part of this file may be reproduced, stored, transmitted, +# disclosed or used in any form or by any means other than as +# expressly provided by the written permission from Jianhui Tao +# +################################################################### + +# -*- coding: utf-8 -*- + +import sys +from clusterSetup import * +from util.sql import tdSql +from util.log import tdLog +import random + +class ClusterTestcase: + + ## test case 17, 18 ## + def run(self): + + nodes = Nodes() + ctest = ClusterTest(nodes.node1.hostName) + ctest.connectDB() + ctest.createSTable(1) + ctest.run() + tdSql.init(ctest.conn.cursor(), False) + + tdSql.query("show databases") + count = tdSql.queryRows; + tdSql.execute("use %s" % ctest.dbName) + tdSql.execute("alter database %s replica 3" % ctest.dbName) + nodes.node2.stopTaosd() + nodes.node3.stopTaosd() + tdSql.error("show databases") + + nodes.node2.startTaosd() + tdSql.error("show databases") + + nodes.node3.startTaosd() + tdSql.query("show databases") + tdSql.checkRows(count) + +ct = ClusterTestcase() +ct.run() diff --git a/tests/pytest/cluster/syncingTest.py b/tests/pytest/cluster/syncingTest.py new file mode 100644 index 0000000000..96be048d23 --- /dev/null +++ b/tests/pytest/cluster/syncingTest.py @@ -0,0 +1,50 @@ +################################################################### +# Copyright (c) 2016 by TAOS Technologies, Inc. +# All rights reserved. +# +# This file is proprietary and confidential to TAOS Technologies. +# No part of this file may be reproduced, stored, transmitted, +# disclosed or used in any form or by any means other than as +# expressly provided by the written permission from Jianhui Tao +# +################################################################### + +# -*- coding: utf-8 -*- + +import sys +from clusterSetup import * +from util.sql import tdSql +from util.log import tdLog +import random + +class ClusterTestcase: + + ## test case 24, 25, 26, 27 ## + def run(self): + + nodes = Nodes() + ctest = ClusterTest(nodes.node1.hostName) + ctest.connectDB() + ctest.createSTable(1) + ctest.run() + tdSql.init(ctest.conn.cursor(), False) + + + tdSql.execute("use %s" % ctest.dbName) + tdSql.execute("alter database %s replica 3" % ctest.dbName) + + for i in range(100): + tdSql.execute("drop table t%d" % i) + + for i in range(100): + tdSql.execute("create table a%d using meters tags(1)" % i) + + tdSql.execute("alter table meters add col col5 int") + tdSql.execute("alter table meters drop col col5 int") + tdSql.execute("drop database %s" % ctest.dbName) + + tdSql.close() + tdLog.success("%s successfully executed" % __file__) + +ct = ClusterTestcase() +ct.run() diff --git a/tests/pytest/cluster/testcluster.sh b/tests/pytest/cluster/testcluster.sh new file mode 100644 index 0000000000..6e15a498c0 --- /dev/null +++ b/tests/pytest/cluster/testcluster.sh @@ -0,0 +1,12 @@ +python3 basicTest.py +python3 bananceTest.py +python3 changeReplicaTest.py +python3 dataFileRecoveryTest.py +python3 fullDnodesTest.py +python3 killAndRestartDnodesTest.py +python3 offlineThresholdTest.py +python3 oneReplicaOfflineTest.py +python3 queryTimeTest.py +python3 stopAllDnodesTest.py +python3 stopTwoDnodesTest.py +python3 syncingTest.py \ No newline at end of file diff --git a/tests/pytest/crash_gen/crash_gen.py b/tests/pytest/crash_gen/crash_gen.py index 102d7d9bdd..739fb699d6 100755 --- a/tests/pytest/crash_gen/crash_gen.py +++ b/tests/pytest/crash_gen/crash_gen.py @@ -1231,6 +1231,7 @@ class Task(): 0x05, # TSDB_CODE_RPC_NOT_READY 0x0B, # Unable to establish connection, more details in TD-1648 0x200, # invalid SQL, TODO: re-examine with TD-934 + 0x20F, # query terminated, possibly due to vnoding being dropped, see TD-1776 0x217, # "db not selected", client side defined error code # 0x218, # "Table does not exist" client side defined error code 0x360, # Table already exists diff --git a/tests/pytest/stream/new.py b/tests/pytest/stream/new.py index eac93dc0e6..12ec6d4507 100644 --- a/tests/pytest/stream/new.py +++ b/tests/pytest/stream/new.py @@ -26,7 +26,6 @@ class TDTestCase: def run(self): rowNum = 200 - totalNum = 200 tdSql.prepare() tdLog.info("=============== step1") @@ -42,7 +41,9 @@ class TDTestCase: tdSql.execute("create table st as select count(*), count(tbcol), count(tbcol2) from mt interval(10s)") tdLog.info("=============== step3") + start = time.time() tdSql.waitedQuery("select * from st", 1, 120) + delay = int(time.time() - start) + 20 v = tdSql.getData(0, 3) if v >= 51: tdLog.exit("value is %d, which is larger than 51" % v) @@ -54,11 +55,18 @@ class TDTestCase: tdSql.execute("insert into tb%d values(now + %ds, %d, %d)" % (i, j, j, j)) tdLog.info("=============== step5") - tdLog.sleep(40) - tdSql.waitedQuery("select * from st order by ts desc", 1, 120) - v = tdSql.getData(0, 3) - if v <= 51: - tdLog.exit("value is %d, which is smaller than 51" % v) + maxValue = 0 + for i in range(delay): + time.sleep(1) + tdSql.query("select * from st order by ts desc") + v = tdSql.getData(0, 3) + if v > maxValue: + maxValue = v + if v > 51: + break + + if maxValue <= 51: + tdLog.exit("value is %d, which is smaller than 51" % maxValue) def stop(self): tdSql.close() diff --git a/tests/script/general/parser/groupby.sim b/tests/script/general/parser/groupby.sim index bd0d3c1a12..b70fe88e81 100644 --- a/tests/script/general/parser/groupby.sim +++ b/tests/script/general/parser/groupby.sim @@ -27,7 +27,7 @@ $mt = $mtPrefix . $i $tstart = 100000 -sql drop database if exits $db -x step1 +sql drop database if exists $db -x step1 step1: sql create database if not exists $db keep 36500 sql use $db diff --git a/tests/script/general/parser/join.sim b/tests/script/general/parser/join.sim index 254571bda1..79b30ffe92 100644 --- a/tests/script/general/parser/join.sim +++ b/tests/script/general/parser/join.sim @@ -24,7 +24,7 @@ $mt = $mtPrefix . $i $tstart = 100000 -sql drop database if exits $db -x step1 +sql drop database if exists $db -x step1 step1: sql create database if not exists $db keep 36500 sql use $db diff --git a/tests/script/general/parser/join_multivnode.sim b/tests/script/general/parser/join_multivnode.sim index 51f1ef11c7..5968a9cd5e 100644 --- a/tests/script/general/parser/join_multivnode.sim +++ b/tests/script/general/parser/join_multivnode.sim @@ -22,7 +22,7 @@ $mt = $mtPrefix . $i $tstart = 100000 -sql drop database if exits $db -x step1 +sql drop database if exists $db -x step1 step1: sql create database if not exists $db keep 36500 sql use $db diff --git a/tests/script/general/parser/projection_limit_offset.sim b/tests/script/general/parser/projection_limit_offset.sim index fbff99d58f..127ade66c5 100644 --- a/tests/script/general/parser/projection_limit_offset.sim +++ b/tests/script/general/parser/projection_limit_offset.sim @@ -21,7 +21,7 @@ $mt = $mtPrefix . $i $tstart = 100000 -sql drop database if exits $db -x step1 +sql drop database if exists $db -x step1 step1: sql create database if not exists $db keep 36500 sql use $db diff --git a/tests/script/general/parser/sliding.sim b/tests/script/general/parser/sliding.sim index f85211beb8..ec0e31311a 100644 --- a/tests/script/general/parser/sliding.sim +++ b/tests/script/general/parser/sliding.sim @@ -26,7 +26,7 @@ $i = 0 $db = $dbPrefix . $i $mt = $mtPrefix . $i -sql drop database if exits $db -x step1 +sql drop database if exists $db -x step1 step1: sql create database if not exists $db maxtables 4 keep 36500 sql use $db diff --git a/tests/script/general/parser/testSuite.sim b/tests/script/general/parser/testSuite.sim index 3dd80b8e38..b848408925 100644 --- a/tests/script/general/parser/testSuite.sim +++ b/tests/script/general/parser/testSuite.sim @@ -1,51 +1,51 @@ -sleep 2000 -run general/parser/alter.sim -sleep 2000 -run general/parser/alter1.sim -sleep 2000 -run general/parser/alter_stable.sim -sleep 2000 -run general/parser/auto_create_tb.sim -sleep 2000 -run general/parser/auto_create_tb_drop_tb.sim -sleep 2000 -run general/parser/col_arithmetic_operation.sim -sleep 2000 -run general/parser/columnValue.sim -sleep 2000 -run general/parser/commit.sim -sleep 2000 -run general/parser/create_db.sim -sleep 2000 -run general/parser/create_mt.sim -sleep 2000 -run general/parser/create_tb.sim -sleep 2000 -run general/parser/dbtbnameValidate.sim -sleep 2000 -run general/parser/fill.sim -sleep 2000 -run general/parser/fill_stb.sim -sleep 2000 -#run general/parser/fill_us.sim # -sleep 2000 -run general/parser/first_last.sim -sleep 2000 -run general/parser/import_commit1.sim -sleep 2000 -run general/parser/import_commit2.sim -sleep 2000 -run general/parser/import_commit3.sim -sleep 2000 -#run general/parser/import_file.sim -sleep 2000 -run general/parser/insert_tb.sim -sleep 2000 -run general/parser/tags_dynamically_specifiy.sim -sleep 2000 -run general/parser/interp.sim -sleep 2000 -run general/parser/lastrow.sim +#sleep 2000 +#run general/parser/alter.sim +#sleep 2000 +#run general/parser/alter1.sim +#sleep 2000 +#run general/parser/alter_stable.sim +#sleep 2000 +#run general/parser/auto_create_tb.sim +#sleep 2000 +#run general/parser/auto_create_tb_drop_tb.sim +#sleep 2000 +#run general/parser/col_arithmetic_operation.sim +#sleep 2000 +#run general/parser/columnValue.sim +#sleep 2000 +#run general/parser/commit.sim +#sleep 2000 +#run general/parser/create_db.sim +#sleep 2000 +#run general/parser/create_mt.sim +#sleep 2000 +#run general/parser/create_tb.sim +#sleep 2000 +#run general/parser/dbtbnameValidate.sim +#sleep 2000 +#run general/parser/fill.sim +#sleep 2000 +#run general/parser/fill_stb.sim +#sleep 2000 +##run general/parser/fill_us.sim # +#sleep 2000 +#run general/parser/first_last.sim +#sleep 2000 +#run general/parser/import_commit1.sim +#sleep 2000 +#run general/parser/import_commit2.sim +#sleep 2000 +#run general/parser/import_commit3.sim +#sleep 2000 +##run general/parser/import_file.sim +#sleep 2000 +#run general/parser/insert_tb.sim +#sleep 2000 +#run general/parser/tags_dynamically_specifiy.sim +#sleep 2000 +#run general/parser/interp.sim +#sleep 2000 +#run general/parser/lastrow.sim sleep 2000 run general/parser/limit.sim sleep 2000 diff --git a/tests/script/general/parser/union.sim b/tests/script/general/parser/union.sim index 4af482bde0..024b9c76ef 100644 --- a/tests/script/general/parser/union.sim +++ b/tests/script/general/parser/union.sim @@ -27,7 +27,7 @@ $j = 1 $mt1 = $mtPrefix . $j -sql drop database if exits $db -x step1 +sql drop database if exists $db -x step1 step1: sql create database if not exists $db sql use $db diff --git a/tests/script/general/parser/where.sim b/tests/script/general/parser/where.sim index 5cac3f4723..066fac43ad 100644 --- a/tests/script/general/parser/where.sim +++ b/tests/script/general/parser/where.sim @@ -20,7 +20,7 @@ $i = 0 $db = $dbPrefix . $i $mt = $mtPrefix . $i -sql drop database if exits $db -x step1 +sql drop database if exists $db -x step1 step1: sql create database if not exists $db sql use $db