diff --git a/src/client/inc/tscUtil.h b/src/client/inc/tscUtil.h index d86e1aa0fb..39299d3308 100644 --- a/src/client/inc/tscUtil.h +++ b/src/client/inc/tscUtil.h @@ -75,6 +75,7 @@ typedef struct SJoinSupporter { SArray* exprList; SFieldInfo fieldsInfo; STagCond tagCond; + SSqlGroupbyExpr groupInfo; // group by info struct STSBuf* pTSBuf; // the TSBuf struct that holds the compressed timestamp array FILE* f; // temporary file in order to create TSBuf char path[PATH_MAX]; // temporary file path, todo dynamic allocate memory @@ -225,7 +226,7 @@ void tscInitQueryInfo(SQueryInfo* pQueryInfo); void tscClearSubqueryInfo(SSqlCmd* pCmd); void tscFreeVgroupTableInfo(SArray* pVgroupTables); -SArray* tscCloneVgroupTableInfo(SArray* pVgroupTables); +SArray* tscVgroupTableInfoClone(SArray* pVgroupTables); void tscRemoveVgroupTableGroup(SArray* pVgroupTable, int32_t index); int tscGetSTableVgroupInfo(SSqlObj* pSql, int32_t clauseIndex); @@ -265,6 +266,7 @@ void addGroupInfoForSubquery(SSqlObj* pParentObj, SSqlObj* pSql, int32_t sub void doAddGroupColumnForSubquery(SQueryInfo* pQueryInfo, int32_t tagIndex); int16_t tscGetJoinTagColIdByUid(STagCond* pTagCond, uint64_t uid); +int16_t tscGetTagColIndexById(STableMeta* pTableMeta, int16_t colId); void tscPrintSelectClause(SSqlObj* pSql, int32_t subClauseIndex); diff --git a/src/client/inc/tschemautil.h b/src/client/inc/tschemautil.h index 67942ad42a..e5bdcecfa8 100644 --- a/src/client/inc/tschemautil.h +++ b/src/client/inc/tschemautil.h @@ -77,7 +77,7 @@ SSchema *tscGetTableColumnSchema(const STableMeta *pMeta, int32_t colIndex); * @param colId * @return */ -SSchema* tscGetTableColumnSchemaById(STableMeta* pTableMeta, int16_t colId); +SSchema* tscGetColumnSchemaById(STableMeta* pTableMeta, int16_t colId); /** * check if the schema is valid or not, including following aspects: diff --git a/src/client/inc/tsclient.h b/src/client/inc/tsclient.h index 78b0bcce9c..bbd029d675 100644 --- a/src/client/inc/tsclient.h +++ b/src/client/inc/tsclient.h @@ -128,7 +128,7 @@ typedef struct STableMetaInfo { typedef struct SSqlExpr { char aliasName[TSDB_COL_NAME_LEN]; // as aliasName SColIndex colInfo; - int64_t uid; // refactor use the pointer + uint64_t uid; // refactor use the pointer int16_t functionId; // function id in aAgg array int16_t resType; // return value type int16_t resBytes; // length of return value diff --git a/src/client/src/tscAsync.c b/src/client/src/tscAsync.c index 64ba0dbf32..a85a0ea570 100644 --- a/src/client/src/tscAsync.c +++ b/src/client/src/tscAsync.c @@ -405,7 +405,8 @@ void tscTableMetaCallBack(void *param, TAOS_RES *res, int code) { SSqlRes *pRes = &pSql->res; pRes->code = code; - const char* msg = (pCmd->command == TSDB_SQL_STABLEVGROUP)? "vgroup-list":"table-meta"; + SSqlObj *sub = (SSqlObj*) res; + const char* msg = (sub->cmd.command == TSDB_SQL_STABLEVGROUP)? "vgroup-list":"table-meta"; if (code != TSDB_CODE_SUCCESS) { tscError("%p get %s failed, code:%s", pSql, msg, tstrerror(code)); goto _error; diff --git a/src/client/src/tscLocalMerge.c b/src/client/src/tscLocalMerge.c index 18d72e2d1e..ea073e8192 100644 --- a/src/client/src/tscLocalMerge.c +++ b/src/client/src/tscLocalMerge.c @@ -1253,10 +1253,11 @@ bool genFinalResults(SSqlObj *pSql, SLocalReducer *pLocalReducer, bool noMoreCur return true; } -void resetOutputBuf(SQueryInfo *pQueryInfo, SLocalReducer *pLocalReducer) { // reset output buffer to the beginning - for (int32_t i = 0; i < pQueryInfo->fieldsInfo.numOfOutput; ++i) { - pLocalReducer->pCtx[i].aOutputBuf = - pLocalReducer->pResultBuf->data + tscFieldInfoGetOffset(pQueryInfo, i) * pLocalReducer->resColModel->capacity; +void resetOutputBuf(SQueryInfo *pQueryInfo, SLocalReducer *pLocalReducer) {// reset output buffer to the beginning + size_t t = tscSqlExprNumOfExprs(pQueryInfo); + for (int32_t i = 0; i < t; ++i) { + SSqlExpr* pExpr = tscSqlExprGet(pQueryInfo, i); + pLocalReducer->pCtx[i].aOutputBuf = pLocalReducer->pResultBuf->data + pExpr->offset * pLocalReducer->resColModel->capacity; } memset(pLocalReducer->pResultBuf, 0, pLocalReducer->nResultBufSize + sizeof(tFilePage)); @@ -1501,8 +1502,7 @@ int32_t tscDoLocalMerge(SSqlObj *pSql) { if (pLocalReducer->discard && sameGroup) { pLocalReducer->hasUnprocessedRow = false; tmpBuffer->num = 0; - } else { - // current row does not belongs to the previous group, so it is not be handled yet. + } else { // current row does not belongs to the previous group, so it is not be handled yet. pLocalReducer->hasUnprocessedRow = true; } diff --git a/src/client/src/tscSQLParser.c b/src/client/src/tscSQLParser.c index 8953a46e0c..f537db4e3d 100644 --- a/src/client/src/tscSQLParser.c +++ b/src/client/src/tscSQLParser.c @@ -2728,7 +2728,6 @@ static bool functionCompatibleCheck(SQueryInfo* pQueryInfo, bool joinQuery) { int32_t parseGroupbyClause(SQueryInfo* pQueryInfo, tVariantList* pList, SSqlCmd* pCmd) { const char* msg1 = "too many columns in group by clause"; const char* msg2 = "invalid column name in group by clause"; -// const char* msg3 = "group by columns must belong to one table"; const char* msg7 = "not support group by expression"; const char* msg8 = "not allowed column type for group by"; const char* msg9 = "tags not allowed for table query"; @@ -2803,7 +2802,7 @@ int32_t parseGroupbyClause(SQueryInfo* pQueryInfo, tVariantList* pList, SSqlCmd* tscColumnListInsert(pTableMetaInfo->tagColList, &index); } else { // check if the column type is valid, here only support the bool/tinyint/smallint/bigint group by - if (pSchema->type > TSDB_DATA_TYPE_BINARY) { + if (pSchema->type == TSDB_DATA_TYPE_TIMESTAMP || pSchema->type == TSDB_DATA_TYPE_FLOAT || pSchema->type == TSDB_DATA_TYPE_DOUBLE) { return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg8); } @@ -5282,20 +5281,26 @@ void addGroupInfoForSubquery(SSqlObj* pParentObj, SSqlObj* pSql, int32_t subClau if (pParentQueryInfo->groupbyExpr.numOfGroupCols > 0) { SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, subClauseIndex); + SSqlExpr* pExpr = NULL; + size_t size = taosArrayGetSize(pQueryInfo->exprList); - - SSqlExpr* pExpr = tscSqlExprGet(pQueryInfo, (int32_t)size - 1); + if (size > 0) { + pExpr = tscSqlExprGet(pQueryInfo, (int32_t)size - 1); + } - if (pExpr->functionId != TSDB_FUNC_TAG) { - STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, tableIndex); - int16_t columnInfo = tscGetJoinTagColIdByUid(&pQueryInfo->tagCond, pTableMetaInfo->pTableMeta->id.uid); - SColumnIndex index = {.tableIndex = 0, .columnIndex = columnInfo}; - SSchema* pSchema = tscGetTableTagSchema(pTableMetaInfo->pTableMeta); + if (pExpr == NULL || pExpr->functionId != TSDB_FUNC_TAG) { + STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pParentQueryInfo, tableIndex); + + int16_t colId = tscGetJoinTagColIdByUid(&pQueryInfo->tagCond, pTableMetaInfo->pTableMeta->id.uid); + + SSchema* pTagSchema = tscGetColumnSchemaById(pTableMetaInfo->pTableMeta, colId); + int16_t colIndex = tscGetTagColIndexById(pTableMetaInfo->pTableMeta, colId); + SColumnIndex index = {.tableIndex = 0, .columnIndex = colIndex}; + + char* name = pTagSchema->name; + int16_t type = pTagSchema->type; + int16_t bytes = pTagSchema->bytes; - int16_t type = pSchema[index.columnIndex].type; - int16_t bytes = pSchema[index.columnIndex].bytes; - char* name = pSchema[index.columnIndex].name; - pExpr = tscSqlExprAppend(pQueryInfo, TSDB_FUNC_TAG, &index, type, bytes, bytes, true); pExpr->colInfo.flag = TSDB_COL_TAG; diff --git a/src/client/src/tscSchemaUtil.c b/src/client/src/tscSchemaUtil.c index ac740555af..2a9a9de84b 100644 --- a/src/client/src/tscSchemaUtil.c +++ b/src/client/src/tscSchemaUtil.c @@ -118,7 +118,7 @@ SSchema* tscGetTableColumnSchema(const STableMeta* pTableMeta, int32_t colIndex) } // TODO for large number of columns, employ the binary search method -SSchema* tscGetTableColumnSchemaById(STableMeta* pTableMeta, int16_t colId) { +SSchema* tscGetColumnSchemaById(STableMeta* pTableMeta, int16_t colId) { STableComInfo tinfo = tscGetTableInfo(pTableMeta); for(int32_t i = 0; i < tinfo.numOfColumns + tinfo.numOfTags; ++i) { diff --git a/src/client/src/tscServer.c b/src/client/src/tscServer.c index a65e7da008..2e7a99f2fb 100644 --- a/src/client/src/tscServer.c +++ b/src/client/src/tscServer.c @@ -150,7 +150,7 @@ void tscProcessHeartBeatRsp(void *param, TAOS_RES *tres, int code) { if (pObj == NULL) return; if (pObj != pObj->signature) { - tscError("heart beat msg, pObj:%p, signature:%p invalid", pObj, pObj->signature); + tscError("heartbeat msg, pObj:%p, signature:%p invalid", pObj, pObj->signature); return; } @@ -175,12 +175,12 @@ void tscProcessHeartBeatRsp(void *param, TAOS_RES *tres, int code) { if (pRsp->streamId) tscKillStream(pObj, htonl(pRsp->streamId)); } } else { - tscDebug("heartbeat failed, code:%s", tstrerror(code)); + tscDebug("%p heartbeat failed, code:%s", pObj->pHb, tstrerror(code)); } if (pObj->pHb != NULL) { int32_t waitingDuring = tsShellActivityTimer * 500; - tscDebug("%p start heartbeat in %dms", pSql, waitingDuring); + tscDebug("%p send heartbeat in %dms", pSql, waitingDuring); taosTmrReset(tscProcessActivityTimer, waitingDuring, pObj, tscTmr, &pObj->pTimer); } else { @@ -208,6 +208,7 @@ void tscProcessActivityTimer(void *handle, void *tmrId) { assert(*pHB->self == pHB); + pHB->retry = 0; int32_t code = tscProcessSql(pHB); taosCacheRelease(tscObjCache, (void**) &p, false); @@ -552,11 +553,28 @@ static int32_t tscEstimateQueryMsgSize(SSqlCmd *pCmd, int32_t clauseIndex) { SQueryInfo * pQueryInfo = tscGetQueryInfoDetail(pCmd, clauseIndex); int32_t srcColListSize = (int32_t)(taosArrayGetSize(pQueryInfo->colList) * sizeof(SColumnInfo)); - - size_t numOfExprs = tscSqlExprNumOfExprs(pQueryInfo); + + size_t numOfExprs = tscSqlExprNumOfExprs(pQueryInfo); int32_t exprSize = (int32_t)(sizeof(SSqlFuncMsg) * numOfExprs); - - return MIN_QUERY_MSG_PKT_SIZE + minMsgSize() + sizeof(SQueryTableMsg) + srcColListSize + exprSize + 4096; + + int32_t tsBufSize = (pQueryInfo->tsBuf != NULL) ? pQueryInfo->tsBuf->fileSize : 0; + + int32_t tableSerialize = 0; + STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0); + if (pTableMetaInfo->pVgroupTables != NULL) { + size_t numOfGroups = taosArrayGetSize(pTableMetaInfo->pVgroupTables); + + int32_t totalTables = 0; + for (int32_t i = 0; i < numOfGroups; ++i) { + SVgroupTableInfo *pTableInfo = taosArrayGet(pTableMetaInfo->pVgroupTables, i); + totalTables += (int32_t) taosArrayGetSize(pTableInfo->itemList); + } + + tableSerialize = totalTables * sizeof(STableIdInfo); + } + + return MIN_QUERY_MSG_PKT_SIZE + minMsgSize() + sizeof(SQueryTableMsg) + srcColListSize + exprSize + tsBufSize + + tableSerialize + 4096; } static char *doSerializeTableInfo(SQueryTableMsg* pQueryMsg, SSqlObj *pSql, char *pMsg) { @@ -1641,11 +1659,14 @@ int tscBuildHeartBeatMsg(SSqlObj *pSql, SSqlInfo *pInfo) { int size = numOfQueries * sizeof(SQueryDesc) + numOfStreams * sizeof(SStreamDesc) + sizeof(SCMHeartBeatMsg) + 100; if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, size)) { pthread_mutex_unlock(&pObj->mutex); - tscError("%p failed to malloc for heartbeat msg", pSql); + tscError("%p failed to create heartbeat msg", pSql); return TSDB_CODE_TSC_OUT_OF_MEMORY; } + // TODO the expired hb and client can not be identified by server till now. SCMHeartBeatMsg *pHeartbeat = (SCMHeartBeatMsg *)pCmd->payload; + tstrncpy(pHeartbeat->clientVer, version, tListLen(pHeartbeat->clientVer)); + pHeartbeat->numOfQueries = numOfQueries; pHeartbeat->numOfStreams = numOfStreams; @@ -1998,10 +2019,11 @@ static void createHBObj(STscObj* pObj) { } int tscProcessConnectRsp(SSqlObj *pSql) { - char temp[TSDB_TABLE_FNAME_LEN * 2]; STscObj *pObj = pSql->pTscObj; SSqlRes *pRes = &pSql->res; + char temp[TSDB_TABLE_FNAME_LEN * 2] = {0}; + SCMConnectRsp *pConnect = (SCMConnectRsp *)pRes->pRsp; tstrncpy(pObj->acctId, pConnect->acctId, sizeof(pObj->acctId)); // copy acctId from response int32_t len = sprintf(temp, "%s%s%s", pObj->acctId, TS_PATH_DELIMITER, pObj->db); @@ -2020,6 +2042,8 @@ int tscProcessConnectRsp(SSqlObj *pSql) { pObj->connId = htonl(pConnect->connId); createHBObj(pObj); + + //launch a timer to send heartbeat to maintain the connection and send status to mnode taosTmrReset(tscProcessActivityTimer, tsShellActivityTimer * 500, pObj, tscTmr, &pObj->pTimer); return 0; diff --git a/src/client/src/tscSubquery.c b/src/client/src/tscSubquery.c index 794b7a068b..3e907e5d77 100644 --- a/src/client/src/tscSubquery.c +++ b/src/client/src/tscSubquery.c @@ -320,11 +320,8 @@ static int32_t tscLaunchRealSubqueries(SSqlObj* pSql) { pQueryInfo->colList = pSupporter->colList; pQueryInfo->exprList = pSupporter->exprList; pQueryInfo->fieldsInfo = pSupporter->fieldsInfo; + pQueryInfo->groupbyExpr = pSupporter->groupInfo; - pSupporter->exprList = NULL; - pSupporter->colList = NULL; - memset(&pSupporter->fieldsInfo, 0, sizeof(SFieldInfo)); - SQueryInfo *pNewQueryInfo = tscGetQueryInfoDetail(&pNew->cmd, 0); assert(pNew->subState.numOfSub == 0 && pNew->cmd.numOfClause == 1 && pNewQueryInfo->numOfTables == 1); @@ -332,7 +329,12 @@ static int32_t tscLaunchRealSubqueries(SSqlObj* pSql) { STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pNewQueryInfo, 0); pTableMetaInfo->pVgroupTables = pSupporter->pVgroupTables; + + pSupporter->exprList = NULL; + pSupporter->colList = NULL; pSupporter->pVgroupTables = NULL; + memset(&pSupporter->fieldsInfo, 0, sizeof(SFieldInfo)); + memset(&pSupporter->groupInfo, 0, sizeof(SSqlGroupbyExpr)); /* * When handling the projection query, the offset value will be modified for table-table join, which is changed @@ -460,18 +462,36 @@ static void updateQueryTimeRange(SQueryInfo* pQueryInfo, STimeWindow* win) { } -int32_t tscCompareTidTags(const void* p1, const void* p2) { - const STidTags* t1 = (const STidTags*) varDataVal(p1); - const STidTags* t2 = (const STidTags*) varDataVal(p2); +int32_t tidTagsCompar(const void* p1, const void* p2) { + const STidTags* t1 = (const STidTags*) (p1); + const STidTags* t2 = (const STidTags*) (p2); if (t1->vgId != t2->vgId) { return (t1->vgId > t2->vgId) ? 1 : -1; } - if (t1->tid != t2->tid) { - return (t1->tid > t2->tid) ? 1 : -1; + tstr* tag1 = (tstr*) t1->tag; + tstr* tag2 = (tstr*) t2->tag; + + if (tag1->len != tag2->len) { + return (tag1->len > tag2->len)? 1: -1; } - return 0; + + return strncmp(tag1->data, tag2->data, tag1->len); +} + +int32_t tagValCompar(const void* p1, const void* p2) { + const STidTags* t1 = (const STidTags*) varDataVal(p1); + const STidTags* t2 = (const STidTags*) varDataVal(p2); + + tstr* tag1 = (tstr*) t1->tag; + tstr* tag2 = (tstr*) t2->tag; + + if (tag1->len != tag2->len) { + return (tag1->len > tag2->len)? 1: -1; + } + + return strncmp(tag1->data, tag2->data, tag1->len); } void tscBuildVgroupTableInfo(SSqlObj* pSql, STableMetaInfo* pTableMetaInfo, SArray* tables) { @@ -587,17 +607,19 @@ static int32_t getIntersectionOfTableTuple(SQueryInfo* pQueryInfo, SSqlObj* pPar SJoinSupporter* p1 = pParentSql->pSubs[0]->param; SJoinSupporter* p2 = pParentSql->pSubs[1]->param; - qsort(p1->pIdTagList, p1->num, p1->tagSize, tscCompareTidTags); - qsort(p2->pIdTagList, p2->num, p2->tagSize, tscCompareTidTags); + // sort according to the tag value + qsort(p1->pIdTagList, p1->num, p1->tagSize, tagValCompar); + qsort(p2->pIdTagList, p2->num, p2->tagSize, tagValCompar); STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0); int16_t tagColId = tscGetJoinTagColIdByUid(&pQueryInfo->tagCond, pTableMetaInfo->pTableMeta->id.uid); - SSchema* pColSchema = tscGetTableColumnSchemaById(pTableMetaInfo->pTableMeta, tagColId); + SSchema* pColSchema = tscGetColumnSchemaById(pTableMetaInfo->pTableMeta, tagColId); // int16_t for padding - *s1 = taosArrayInit(p1->num, p1->tagSize - sizeof(int16_t)); - *s2 = taosArrayInit(p2->num, p2->tagSize - sizeof(int16_t)); + int32_t size = p1->tagSize - sizeof(int16_t); + *s1 = taosArrayInit(p1->num, size); + *s2 = taosArrayInit(p2->num, size); if (!(checkForDuplicateTagVal(pColSchema, p1, pParentSql) && checkForDuplicateTagVal(pColSchema, p2, pParentSql))) { return TSDB_CODE_QRY_DUP_JOIN_KEY; @@ -625,6 +647,14 @@ static int32_t getIntersectionOfTableTuple(SQueryInfo* pQueryInfo, SSqlObj* pPar } } + // reorganize the tid-tag value according to both the vgroup id and tag values + // sort according to the tag value + size_t t1 = taosArrayGetSize(*s1); + size_t t2 = taosArrayGetSize(*s2); + + qsort((*s1)->pData, t1, size, tidTagsCompar); + qsort((*s2)->pData, t2, size, tidTagsCompar); + return TSDB_CODE_SUCCESS; } @@ -744,10 +774,10 @@ static void tidTagRetrieveCallback(void* param, TAOS_RES* tres, int32_t numOfRow tscBuildVgroupTableInfo(pParentSql, pTableMetaInfo2, s2); SSqlObj* psub1 = pParentSql->pSubs[0]; - ((SJoinSupporter*)psub1->param)->pVgroupTables = tscCloneVgroupTableInfo(pTableMetaInfo1->pVgroupTables); + ((SJoinSupporter*)psub1->param)->pVgroupTables = tscVgroupTableInfoClone(pTableMetaInfo1->pVgroupTables); SSqlObj* psub2 = pParentSql->pSubs[1]; - ((SJoinSupporter*)psub2->param)->pVgroupTables = tscCloneVgroupTableInfo(pTableMetaInfo2->pVgroupTables); + ((SJoinSupporter*)psub2->param)->pVgroupTables = tscVgroupTableInfoClone(pTableMetaInfo2->pVgroupTables); pParentSql->subState.numOfSub = 2; pParentSql->subState.numOfRemain = pParentSql->subState.numOfSub; @@ -1313,6 +1343,9 @@ int32_t tscCreateJoinSubquery(SSqlObj *pSql, int16_t tableIndex, SJoinSupporter return TSDB_CODE_TSC_OUT_OF_MEMORY; } + pSupporter->groupInfo = pNewQueryInfo->groupbyExpr; + memset(&pNewQueryInfo->groupbyExpr, 0, sizeof(SSqlGroupbyExpr)); + pNew->cmd.numOfCols = 0; pNewQueryInfo->interval.interval = 0; pSupporter->limit = pNewQueryInfo->limit; @@ -1333,17 +1366,9 @@ int32_t tscCreateJoinSubquery(SSqlObj *pSql, int16_t tableIndex, SJoinSupporter assert(pTagCond->joinInfo.hasJoin); int32_t tagColId = tscGetJoinTagColIdByUid(pTagCond, pTableMetaInfo->pTableMeta->id.uid); - SSchema* s = tscGetTableColumnSchemaById(pTableMetaInfo->pTableMeta, tagColId); + SSchema* s = tscGetColumnSchemaById(pTableMetaInfo->pTableMeta, tagColId); - // get the tag colId column index - int32_t numOfTags = tscGetNumOfTags(pTableMetaInfo->pTableMeta); - SSchema* pSchema = tscGetTableTagSchema(pTableMetaInfo->pTableMeta); - for(int32_t i = 0; i < numOfTags; ++i) { - if (pSchema[i].colId == tagColId) { - colIndex.columnIndex = i; - break; - } - } + colIndex.columnIndex = tscGetTagColIndexById(pTableMetaInfo->pTableMeta, tagColId); int16_t bytes = 0; int16_t type = 0; @@ -2165,7 +2190,8 @@ static void doBuildResFromSubqueries(SSqlObj* pSql) { numOfRes = (int32_t)(MIN(numOfRes, remain)); } - if (numOfRes == 0) { + if (numOfRes == 0) { // no result any more, free all subquery objects + freeJoinSubqueryObj(pSql); return; } diff --git a/src/client/src/tscSystem.c b/src/client/src/tscSystem.c index bff5062f16..77d668c5af 100644 --- a/src/client/src/tscSystem.c +++ b/src/client/src/tscSystem.c @@ -80,7 +80,7 @@ int32_t tscInitRpc(const char *user, const char *secretEncrypt, void **pDnodeCon void taos_init_imp(void) { - char temp[128]; + char temp[128] = {0}; errno = TSDB_CODE_SUCCESS; srand(taosGetTimestampSec()); @@ -149,30 +149,42 @@ void taos_init_imp(void) { tscRefId = taosOpenRef(200, tscCloseTscObj); + // in other language APIs, taos_cleanup is not available yet. + // So, to make sure taos_cleanup will be invoked to clean up the allocated + // resource to suppress the valgrind warning. + atexit(taos_cleanup); tscDebug("client is initialized successfully"); } void taos_init() { pthread_once(&tscinit, taos_init_imp); } -void taos_cleanup() { - if (tscMetaCache != NULL) { - taosCacheCleanup(tscMetaCache); - tscMetaCache = NULL; +// this function may be called by user or system, or by both simultaneously. +void taos_cleanup(void) { + tscDebug("start to cleanup client environment"); - taosCacheCleanup(tscObjCache); - tscObjCache = NULL; + void* m = tscMetaCache; + if (m != NULL && atomic_val_compare_exchange_ptr(&tscMetaCache, m, 0) == m) { + taosCacheCleanup(m); } - - if (tscQhandle != NULL) { - taosCleanUpScheduler(tscQhandle); - tscQhandle = NULL; + + m = tscObjCache; + if (m != NULL && atomic_val_compare_exchange_ptr(&tscObjCache, m, 0) == m) { + taosCacheCleanup(m); + } + + m = tscQhandle; + if (m != NULL && atomic_val_compare_exchange_ptr(&tscQhandle, m, 0) == m) { + taosCleanUpScheduler(m); } taosCloseRef(tscRefId); taosCleanupKeywordsTable(); taosCloseLog(); - - taosTmrCleanUp(tscTmr); + + m = tscTmr; + if (m != NULL && atomic_val_compare_exchange_ptr(&tscTmr, m, 0) == m) { + taosTmrCleanUp(m); + } } static int taos_options_imp(TSDB_OPTION option, const char *pStr) { diff --git a/src/client/src/tscUtil.c b/src/client/src/tscUtil.c index 85e7122b9d..e2cc440335 100644 --- a/src/client/src/tscUtil.c +++ b/src/client/src/tscUtil.c @@ -1665,6 +1665,7 @@ static void freeQueryInfoImpl(SQueryInfo* pQueryInfo) { if (pQueryInfo->groupbyExpr.columnInfo != NULL) { taosArrayDestroy(pQueryInfo->groupbyExpr.columnInfo); pQueryInfo->groupbyExpr.columnInfo = NULL; + pQueryInfo->groupbyExpr.numOfGroupCols = 0; } pQueryInfo->tsBuf = tsBufDestroy(pQueryInfo->tsBuf); @@ -1713,7 +1714,7 @@ void tscRemoveVgroupTableGroup(SArray* pVgroupTable, int32_t index) { taosArrayRemove(pVgroupTable, index); } -SArray* tscCloneVgroupTableInfo(SArray* pVgroupTables) { +SArray* tscVgroupTableInfoClone(SArray* pVgroupTables) { if (pVgroupTables == NULL) { return NULL; } @@ -1739,7 +1740,7 @@ SArray* tscCloneVgroupTableInfo(SArray* pVgroupTables) { } void clearAllTableMetaInfo(SQueryInfo* pQueryInfo, const char* address, bool removeFromCache) { - tscDebug("%p deref the table meta in cache, numOfTables:%d", address, pQueryInfo->numOfTables); + tscDebug("%p unref %d tables in the tableMeta cache", address, pQueryInfo->numOfTables); for(int32_t i = 0; i < pQueryInfo->numOfTables; ++i) { STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, i); @@ -1779,6 +1780,7 @@ STableMetaInfo* tscAddTableMetaInfo(SQueryInfo* pQueryInfo, const char* name, ST pTableMetaInfo->vgroupList = tscVgroupInfoClone(vgroupList); } + // TODO handle malloc failure pTableMetaInfo->tagColList = taosArrayInit(4, POINTER_BYTES); if (pTableMetaInfo->tagColList == NULL) { return NULL; @@ -1788,7 +1790,7 @@ STableMetaInfo* tscAddTableMetaInfo(SQueryInfo* pQueryInfo, const char* name, ST tscColumnListCopy(pTableMetaInfo->tagColList, pTagCols, -1); } - pTableMetaInfo->pVgroupTables = tscCloneVgroupTableInfo(pVgroupTables); + pTableMetaInfo->pVgroupTables = tscVgroupTableInfoClone(pVgroupTables); pQueryInfo->numOfTables += 1; return pTableMetaInfo; @@ -2155,6 +2157,19 @@ int16_t tscGetJoinTagColIdByUid(STagCond* pTagCond, uint64_t uid) { } } +int16_t tscGetTagColIndexById(STableMeta* pTableMeta, int16_t colId) { + int32_t numOfTags = tscGetNumOfColumns(pTableMeta); + + SSchema* pSchema = tscGetTableTagSchema(pTableMeta); + for(int32_t i = 0; i < numOfTags; ++i) { + if (pSchema[i].colId == colId) { + return i; + } + } + + return -1; +} + bool tscIsUpdateQuery(SSqlObj* pSql) { if (pSql == NULL || pSql->signature != pSql) { terrno = TSDB_CODE_TSC_DISCONNECTED; @@ -2469,6 +2484,7 @@ void tscSCMVgroupInfoCopy(SCMVgroupInfo* dst, const SCMVgroupInfo* src) { dst->vgId = src->vgId; dst->numOfEps = src->numOfEps; for(int32_t i = 0; i < dst->numOfEps; ++i) { + taosTFree(dst->epAddr[i].fqdn); dst->epAddr[i].port = src->epAddr[i].port; dst->epAddr[i].fqdn = strdup(src->epAddr[i].fqdn); } diff --git a/src/connector/go b/src/connector/go index 8c58c512b6..8d7bf74385 160000 --- a/src/connector/go +++ b/src/connector/go @@ -1 +1 @@ -Subproject commit 8c58c512b6acda8bcdfa48fdc7140227b5221766 +Subproject commit 8d7bf743852897110cbdcc7c4322cd7a74d4167b diff --git a/src/inc/taos.h b/src/inc/taos.h index 3153137347..2c6454ced1 100644 --- a/src/inc/taos.h +++ b/src/inc/taos.h @@ -64,7 +64,7 @@ typedef struct taosField { #endif DLL_EXPORT void taos_init(); -DLL_EXPORT void taos_cleanup(); +DLL_EXPORT void taos_cleanup(void); DLL_EXPORT int taos_options(TSDB_OPTION option, const void *arg, ...); DLL_EXPORT TAOS *taos_connect(const char *ip, const char *user, const char *pass, const char *db, uint16_t port); DLL_EXPORT void taos_close(TAOS *taos); diff --git a/src/inc/taosmsg.h b/src/inc/taosmsg.h index 51bf7bbaef..1c36934688 100644 --- a/src/inc/taosmsg.h +++ b/src/inc/taosmsg.h @@ -788,6 +788,7 @@ typedef struct { } SStreamDesc; typedef struct { + char clientVer[TSDB_VERSION_LEN]; uint32_t connId; int32_t pid; int32_t numOfQueries; diff --git a/src/mnode/src/mnodeShow.c b/src/mnode/src/mnodeShow.c index 80909e99ae..655fe71259 100644 --- a/src/mnode/src/mnodeShow.c +++ b/src/mnode/src/mnodeShow.c @@ -232,12 +232,16 @@ static int32_t mnodeProcessRetrieveMsg(SMnodeMsg *pMsg) { } static int32_t mnodeProcessHeartBeatMsg(SMnodeMsg *pMsg) { - SCMHeartBeatRsp *pHBRsp = (SCMHeartBeatRsp *) rpcMallocCont(sizeof(SCMHeartBeatRsp)); - if (pHBRsp == NULL) { + SCMHeartBeatRsp *pRsp = (SCMHeartBeatRsp *) rpcMallocCont(sizeof(SCMHeartBeatRsp)); + if (pRsp == NULL) { return TSDB_CODE_MND_OUT_OF_MEMORY; } SCMHeartBeatMsg *pHBMsg = pMsg->rpcMsg.pCont; + if (taosCheckVersion(pHBMsg->clientVer, version, 3) != TSDB_CODE_SUCCESS) { + return TSDB_CODE_TSC_INVALID_VERSION; // todo change the error code + } + SRpcConnInfo connInfo = {0}; rpcGetConnInfo(pMsg->rpcMsg.handle, &connInfo); @@ -251,33 +255,33 @@ static int32_t mnodeProcessHeartBeatMsg(SMnodeMsg *pMsg) { if (pConn == NULL) { // do not close existing links, otherwise // mError("failed to create connId, close connect"); - // pHBRsp->killConnection = 1; + // pRsp->killConnection = 1; } else { - pHBRsp->connId = htonl(pConn->connId); + pRsp->connId = htonl(pConn->connId); mnodeSaveQueryStreamList(pConn, pHBMsg); if (pConn->killed != 0) { - pHBRsp->killConnection = 1; + pRsp->killConnection = 1; } if (pConn->streamId != 0) { - pHBRsp->streamId = htonl(pConn->streamId); + pRsp->streamId = htonl(pConn->streamId); pConn->streamId = 0; } if (pConn->queryId != 0) { - pHBRsp->queryId = htonl(pConn->queryId); + pRsp->queryId = htonl(pConn->queryId); pConn->queryId = 0; } } - pHBRsp->onlineDnodes = htonl(mnodeGetOnlineDnodesNum()); - pHBRsp->totalDnodes = htonl(mnodeGetDnodesNum()); - mnodeGetMnodeEpSetForShell(&pHBRsp->epSet); + pRsp->onlineDnodes = htonl(mnodeGetOnlineDnodesNum()); + pRsp->totalDnodes = htonl(mnodeGetDnodesNum()); + mnodeGetMnodeEpSetForShell(&pRsp->epSet); - pMsg->rpcRsp.rsp = pHBRsp; + pMsg->rpcRsp.rsp = pRsp; pMsg->rpcRsp.len = sizeof(SCMHeartBeatRsp); - + mnodeReleaseConn(pConn); return TSDB_CODE_SUCCESS; } diff --git a/src/query/src/qExecutor.c b/src/query/src/qExecutor.c index dd8f83a643..7a7c1ee2d0 100644 --- a/src/query/src/qExecutor.c +++ b/src/query/src/qExecutor.c @@ -940,7 +940,6 @@ static char *getDataBlock(SQueryRuntimeEnv *pRuntimeEnv, SArithmeticSupport *sas sas->data = calloc(pQuery->numOfCols, POINTER_BYTES); if (sas->data == NULL) { - finalizeQueryResult(pRuntimeEnv); // clean up allocated resource during query longjmp(pRuntimeEnv->env, TSDB_CODE_QRY_OUT_OF_MEMORY); } @@ -1003,7 +1002,6 @@ static void blockwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SDataStatis * SArithmeticSupport *sasArray = calloc((size_t)pQuery->numOfOutput, sizeof(SArithmeticSupport)); if (sasArray == NULL) { - finalizeQueryResult(pRuntimeEnv); // clean up allocated resource during query longjmp(pRuntimeEnv->env, TSDB_CODE_QRY_OUT_OF_MEMORY); } @@ -1277,7 +1275,6 @@ static void rowwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SDataStatis *pS SArithmeticSupport *sasArray = calloc((size_t)pQuery->numOfOutput, sizeof(SArithmeticSupport)); if (sasArray == NULL) { - finalizeQueryResult(pRuntimeEnv); // clean up allocated resource during query longjmp(pRuntimeEnv->env, TSDB_CODE_QRY_OUT_OF_MEMORY); } @@ -1857,8 +1854,11 @@ static bool needReverseScan(SQuery *pQuery) { } if (functionId == TSDB_FUNC_LAST || functionId == TSDB_FUNC_LAST_DST) { + // the scan order to acquire the last result of the specified column int32_t order = (int32_t)pQuery->pSelectExpr[i].base.arg->argValue.i64; - return order != pQuery->order.order; + if (order != pQuery->order.order) { + return true; + } } } @@ -3667,7 +3667,6 @@ void scanOneTableDataBlocks(SQueryRuntimeEnv *pRuntimeEnv, TSKEY start) { // check if query is killed or not if (IS_QUERY_KILLED(pQInfo)) { - finalizeQueryResult(pRuntimeEnv); // clean up allocated resource during query longjmp(pRuntimeEnv->env, TSDB_CODE_TSC_QUERY_CANCELLED); } } @@ -4310,7 +4309,6 @@ void skipBlocks(SQueryRuntimeEnv *pRuntimeEnv) { SDataBlockInfo blockInfo = SDATA_BLOCK_INITIALIZER; while (tsdbNextDataBlock(pQueryHandle)) { if (IS_QUERY_KILLED(GET_QINFO_ADDR(pRuntimeEnv))) { - finalizeQueryResult(pRuntimeEnv); // clean up allocated resource during query longjmp(pRuntimeEnv->env, TSDB_CODE_TSC_QUERY_CANCELLED); } @@ -5174,7 +5172,7 @@ static void doSaveContext(SQInfo *pQInfo) { SWITCH_ORDER(pQuery->order.order); if (pRuntimeEnv->pTSBuf != NULL) { - pRuntimeEnv->pTSBuf->cur.order = pQuery->order.order; + SWITCH_ORDER(pRuntimeEnv->pTSBuf->cur.order); } STsdbQueryCond cond = { @@ -5267,7 +5265,6 @@ static void multiTableQueryProcess(SQInfo *pQInfo) { // query error occurred or query is killed, abort current execution if (pQInfo->code != TSDB_CODE_SUCCESS || IS_QUERY_KILLED(pQInfo)) { qDebug("QInfo:%p query killed or error occurred, code:%s, abort", pQInfo, tstrerror(pQInfo->code)); - finalizeQueryResult(pRuntimeEnv); // clean up allocated resource during query longjmp(pRuntimeEnv->env, TSDB_CODE_TSC_QUERY_CANCELLED); } @@ -5289,7 +5286,8 @@ static void multiTableQueryProcess(SQInfo *pQInfo) { if (pQInfo->code != TSDB_CODE_SUCCESS || IS_QUERY_KILLED(pQInfo)) { qDebug("QInfo:%p query killed or error occurred, code:%s, abort", pQInfo, tstrerror(pQInfo->code)); - finalizeQueryResult(pRuntimeEnv); // clean up allocated resource during query + //TODO finalizeQueryResult may cause SEGSEV, since the memory may not allocated yet, add a cleanup function instead +// finalizeQueryResult(pRuntimeEnv); // clean up allocated resource during query longjmp(pRuntimeEnv->env, TSDB_CODE_TSC_QUERY_CANCELLED); } @@ -5329,7 +5327,6 @@ static void tableFixedOutputProcess(SQInfo *pQInfo, STableQueryInfo* pTableInfo) finalizeQueryResult(pRuntimeEnv); if (IS_QUERY_KILLED(pQInfo)) { - finalizeQueryResult(pRuntimeEnv); // clean up allocated resource during query longjmp(pRuntimeEnv->env, TSDB_CODE_TSC_QUERY_CANCELLED); } diff --git a/src/query/src/qTokenizer.c b/src/query/src/qTokenizer.c index 0c6ee25f13..98545c8ef3 100644 --- a/src/query/src/qTokenizer.c +++ b/src/query/src/qTokenizer.c @@ -252,16 +252,16 @@ static const char isIdChar[] = { 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 0, 0, 0, 0, 0, /* 7x */ }; -static void* KeywordHashTable = NULL; +static void* keywordHashTable = NULL; static void doInitKeywordsTable(void) { int numOfEntries = tListLen(keywordTable); - KeywordHashTable = taosHashInit(numOfEntries, MurmurHash3_32, true, false); + keywordHashTable = taosHashInit(numOfEntries, MurmurHash3_32, true, false); for (int32_t i = 0; i < numOfEntries; i++) { keywordTable[i].len = (uint8_t)strlen(keywordTable[i].name); void* ptr = &keywordTable[i]; - taosHashPut(KeywordHashTable, keywordTable[i].name, keywordTable[i].len, (void*)&ptr, POINTER_BYTES); + taosHashPut(keywordHashTable, keywordTable[i].name, keywordTable[i].len, (void*)&ptr, POINTER_BYTES); } } @@ -283,7 +283,7 @@ int tSQLKeywordCode(const char* z, int n) { } } - SKeyword** pKey = (SKeyword**)taosHashGet(KeywordHashTable, key, n); + SKeyword** pKey = (SKeyword**)taosHashGet(keywordHashTable, key, n); return (pKey != NULL)? (*pKey)->type:TK_ID; } @@ -661,5 +661,8 @@ SStrToken tStrGetToken(char* str, int32_t* i, bool isPrevOptr, uint32_t numOfIgn bool isKeyWord(const char* z, int32_t len) { return (tSQLKeywordCode((char*)z, len) != TK_ID); } void taosCleanupKeywordsTable() { - taosHashCleanup(KeywordHashTable); + void* m = keywordHashTable; + if (m != NULL && atomic_val_compare_exchange_ptr(&keywordHashTable, m, 0) == m) { + taosHashCleanup(m); + } } diff --git a/src/sync/src/syncMain.c b/src/sync/src/syncMain.c index ef635e6efc..6f5e3be8ab 100644 --- a/src/sync/src/syncMain.c +++ b/src/sync/src/syncMain.c @@ -20,6 +20,7 @@ #include "tlog.h" #include "tutil.h" #include "ttimer.h" +#include "tref.h" #include "tsocket.h" #include "tglobal.h" #include "taoserror.h" @@ -43,6 +44,7 @@ char tsNodeFqdn[TSDB_FQDN_LEN]; static ttpool_h tsTcpPool; static void * syncTmrCtrl = NULL; static void * vgIdHash; +static int tsSyncRefId = -1; // local functions static void syncProcessSyncRequest(char *pMsg, SSyncPeer *pPeer); @@ -54,13 +56,13 @@ static int syncProcessPeerMsg(void *param, void *buffer); static void syncProcessIncommingConnection(int connFd, uint32_t sourceIp); static void syncRemovePeer(SSyncPeer *pPeer); static void syncAddArbitrator(SSyncNode *pNode); -static void syncAddNodeRef(SSyncNode *pNode); -static void syncDecNodeRef(SSyncNode *pNode); +static void syncFreeNode(void *); static void syncRemoveConfirmedFwdInfo(SSyncNode *pNode); static void syncMonitorFwdInfos(void *param, void *tmrId); static void syncProcessFwdAck(SSyncNode *pNode, SFwdInfo *pFwdInfo, int32_t code); static void syncSaveFwdInfo(SSyncNode *pNode, uint64_t version, void *mhandle); static void syncRestartPeer(SSyncPeer *pPeer); +static int32_t syncForwardToPeerImpl(SSyncNode *pNode, void *data, void *mhandle, int qtyp); static SSyncPeer *syncAddPeer(SSyncNode *pNode, const SNodeInfo *pInfo); char* syncRole[] = { @@ -106,6 +108,12 @@ int32_t syncInit() { return -1; } + tsSyncRefId = taosOpenRef(200, syncFreeNode); + if (tsSyncRefId < 0) { + syncCleanUp(); + return -1; + } + tstrncpy(tsNodeFqdn, tsLocalFqdn, sizeof(tsNodeFqdn)); sInfo("sync module initialized successfully"); @@ -128,6 +136,9 @@ void syncCleanUp() { vgIdHash = NULL; } + taosCloseRef(tsSyncRefId); + tsSyncRefId = -1; + sInfo("sync module is cleaned up"); } @@ -159,6 +170,12 @@ void *syncStart(const SSyncInfo *pInfo) { pNode->quorum = pCfg->quorum; if (pNode->quorum > pNode->replica) pNode->quorum = pNode->replica; + int ret = taosAddRef(tsSyncRefId, pNode); + if (ret < 0) { + syncFreeNode(pNode); + return NULL; + } + for (int i = 0; i < pCfg->replica; ++i) { const SNodeInfo *pNodeInfo = pCfg->nodeInfo + i; pNode->peerInfo[i] = syncAddPeer(pNode, pNodeInfo); @@ -167,8 +184,6 @@ void *syncStart(const SSyncInfo *pInfo) { } } - syncAddNodeRef(pNode); - if (pNode->selfIndex < 0) { sInfo("vgId:%d, this node is not configured", pNode->vgId); terrno = TSDB_CODE_SYN_INVALID_CONFIG; @@ -210,7 +225,9 @@ void syncStop(void *param) { SSyncNode *pNode = param; SSyncPeer *pPeer; - if (pNode == NULL) return; + int ret = taosAcquireRef(tsSyncRefId, pNode); + if (ret < 0) return; + sInfo("vgId:%d, cleanup sync", pNode->vgId); pthread_mutex_lock(&(pNode->mutex)); @@ -228,14 +245,17 @@ void syncStop(void *param) { pthread_mutex_unlock(&(pNode->mutex)); - syncDecNodeRef(pNode); + taosReleaseRef(tsSyncRefId, pNode); + taosRemoveRef(tsSyncRefId, pNode); } int32_t syncReconfig(void *param, const SSyncCfg *pNewCfg) { SSyncNode *pNode = param; int i, j; - if (pNode == NULL) return TSDB_CODE_SYN_INVALID_CONFIG; + int ret = taosAcquireRef(tsSyncRefId, pNode); + if (ret < 0) return TSDB_CODE_SYN_INVALID_CONFIG; + sInfo("vgId:%d, reconfig, role:%s replica:%d old:%d", pNode->vgId, syncRole[nodeRole], pNewCfg->replica, pNode->replica); @@ -298,105 +318,63 @@ int32_t syncReconfig(void *param, const SSyncCfg *pNewCfg) { syncRole[nodeRole]); syncBroadcastStatus(pNode); + taosReleaseRef(tsSyncRefId, pNode); + return 0; } int32_t syncForwardToPeer(void *param, void *data, void *mhandle, int qtype) { SSyncNode *pNode = param; - SSyncPeer *pPeer; - SSyncHead *pSyncHead; - SWalHead * pWalHead = data; - int fwdLen; - int code = 0; - if (pNode == NULL) return 0; + int ret = taosAcquireRef(tsSyncRefId, pNode); + if (ret < 0) return 0; - if (nodeRole == TAOS_SYNC_ROLE_SLAVE && pWalHead->version != nodeVersion + 1) { - sError("vgId:%d, received ver:%" PRIu64 ", inconsistent with last ver:%" PRIu64 ", restart connection", pNode->vgId, - pWalHead->version, nodeVersion); - for (int i = 0; i < pNode->replica; ++i) { - pPeer = pNode->peerInfo[i]; - syncRestartConnection(pPeer); - } - return TSDB_CODE_SYN_INVALID_VERSION; - } + int32_t code = syncForwardToPeerImpl(pNode, data, mhandle, qtype); - // always update version - nodeVersion = pWalHead->version; - sDebug("vgId:%d, replica:%d nodeRole:%s qtype:%d ver:%" PRIu64, pNode->vgId, pNode->replica, syncRole[nodeRole], - qtype, pWalHead->version); - - if (pNode->replica == 1 || nodeRole != TAOS_SYNC_ROLE_MASTER) return 0; - - // only pkt from RPC or CQ can be forwarded - if (qtype != TAOS_QTYPE_RPC && qtype != TAOS_QTYPE_CQ) return 0; - - // a hacker way to improve the performance - pSyncHead = (SSyncHead *)(((char *)pWalHead) - sizeof(SSyncHead)); - pSyncHead->type = TAOS_SMSG_FORWARD; - pSyncHead->pversion = 0; - pSyncHead->len = sizeof(SWalHead) + pWalHead->len; - fwdLen = pSyncHead->len + sizeof(SSyncHead); // include the WAL and SYNC head - - pthread_mutex_lock(&(pNode->mutex)); - - for (int i = 0; i < pNode->replica; ++i) { - pPeer = pNode->peerInfo[i]; - if (pPeer == NULL || pPeer->peerFd < 0) continue; - if (pPeer->role != TAOS_SYNC_ROLE_SLAVE && pPeer->sstatus != TAOS_SYNC_STATUS_CACHE) continue; - - if (pNode->quorum > 1 && code == 0) { - syncSaveFwdInfo(pNode, pWalHead->version, mhandle); - code = 1; - } - - int retLen = write(pPeer->peerFd, pSyncHead, fwdLen); - if (retLen == fwdLen) { - sDebug("%s, forward is sent, ver:%" PRIu64 " contLen:%d", pPeer->id, pWalHead->version, pWalHead->len); - } else { - sError("%s, failed to forward, ver:%" PRIu64 " retLen:%d", pPeer->id, pWalHead->version, retLen); - syncRestartConnection(pPeer); - } - } - - pthread_mutex_unlock(&(pNode->mutex)); + taosReleaseRef(tsSyncRefId, pNode); return code; } void syncConfirmForward(void *param, uint64_t version, int32_t code) { SSyncNode *pNode = param; - if (pNode == NULL) return; - if (pNode->quorum <= 1) return; + + int ret = taosAcquireRef(tsSyncRefId, pNode); + if (ret < 0) return; SSyncPeer *pPeer = pNode->pMaster; - if (pPeer == NULL) return; + if (pPeer && pNode->quorum > 1) { + char msg[sizeof(SSyncHead) + sizeof(SFwdRsp)] = {0}; - char msg[sizeof(SSyncHead) + sizeof(SFwdRsp)] = {0}; + SSyncHead *pHead = (SSyncHead *)msg; + pHead->type = TAOS_SMSG_FORWARD_RSP; + pHead->len = sizeof(SFwdRsp); - SSyncHead *pHead = (SSyncHead *)msg; - pHead->type = TAOS_SMSG_FORWARD_RSP; - pHead->len = sizeof(SFwdRsp); + SFwdRsp *pFwdRsp = (SFwdRsp *)(msg + sizeof(SSyncHead)); + pFwdRsp->version = version; + pFwdRsp->code = code; - SFwdRsp *pFwdRsp = (SFwdRsp *)(msg + sizeof(SSyncHead)); - pFwdRsp->version = version; - pFwdRsp->code = code; + int msgLen = sizeof(SSyncHead) + sizeof(SFwdRsp); + int retLen = write(pPeer->peerFd, msg, msgLen); - int msgLen = sizeof(SSyncHead) + sizeof(SFwdRsp); - int retLen = write(pPeer->peerFd, msg, msgLen); - - if (retLen == msgLen) { - sDebug("%s, forward-rsp is sent, ver:%" PRIu64, pPeer->id, version); - } else { - sDebug("%s, failed to send forward ack, restart", pPeer->id); - syncRestartConnection(pPeer); + if (retLen == msgLen) { + sDebug("%s, forward-rsp is sent, ver:%" PRIu64, pPeer->id, version); + } else { + sDebug("%s, failed to send forward ack, restart", pPeer->id); + syncRestartConnection(pPeer); + } } + + taosReleaseRef(tsSyncRefId, pNode); } void syncRecover(void *param) { SSyncNode *pNode = param; SSyncPeer *pPeer; + int ret = taosAcquireRef(tsSyncRefId, pNode); + if (ret < 0) return; + // to do: add a few lines to check if recover is OK // if take this node to unsync state, the whole system may not work @@ -414,17 +392,24 @@ void syncRecover(void *param) { } pthread_mutex_unlock(&(pNode->mutex)); + + taosReleaseRef(tsSyncRefId, pNode); } int syncGetNodesRole(void *param, SNodesRole *pNodesRole) { SSyncNode *pNode = param; + int ret = taosAcquireRef(tsSyncRefId, pNode); + if (ret < 0) return -1; + pNodesRole->selfIndex = pNode->selfIndex; for (int i = 0; i < pNode->replica; ++i) { pNodesRole->nodeId[i] = pNode->peerInfo[i]->nodeId; pNodesRole->role[i] = pNode->peerInfo[i]->role; } + taosReleaseRef(tsSyncRefId, pNode); + return 0; } @@ -457,22 +442,20 @@ static void syncAddArbitrator(SSyncNode *pNode) { pNode->peerInfo[TAOS_SYNC_MAX_REPLICA] = syncAddPeer(pNode, &nodeInfo); } -static void syncAddNodeRef(SSyncNode *pNode) { atomic_add_fetch_8(&pNode->refCount, 1); } +static void syncFreeNode(void *param) { + SSyncNode *pNode = param; -static void syncDecNodeRef(SSyncNode *pNode) { - if (atomic_sub_fetch_8(&pNode->refCount, 1) == 0) { - pthread_mutex_destroy(&pNode->mutex); - taosTFree(pNode->pRecv); - taosTFree(pNode->pSyncFwds); - taosTFree(pNode); - } + pthread_mutex_destroy(&pNode->mutex); + taosTFree(pNode->pRecv); + taosTFree(pNode->pSyncFwds); + taosTFree(pNode); } void syncAddPeerRef(SSyncPeer *pPeer) { atomic_add_fetch_8(&pPeer->refCount, 1); } int syncDecPeerRef(SSyncPeer *pPeer) { if (atomic_sub_fetch_8(&pPeer->refCount, 1) == 0) { - syncDecNodeRef(pPeer->pSyncNode); + taosReleaseRef(tsSyncRefId, pPeer->pSyncNode); sDebug("%s, resource is freed", pPeer->id); taosTFree(pPeer->watchFd); @@ -529,7 +512,7 @@ static SSyncPeer *syncAddPeer(SSyncNode *pNode, const SNodeInfo *pInfo) { taosTmrReset(syncCheckPeerConnection, checkMs, pPeer, syncTmrCtrl, &pPeer->timer); } - syncAddNodeRef(pNode); + taosAcquireRef(tsSyncRefId, pNode); return pPeer; } @@ -1122,7 +1105,7 @@ static void syncProcessBrokenLink(void *param) { SSyncPeer *pPeer = param; SSyncNode *pNode = pPeer->pSyncNode; - syncAddNodeRef(pNode); + if (taosAcquireRef(tsSyncRefId, pNode) < 0) return; pthread_mutex_lock(&(pNode->mutex)); sDebug("%s, TCP link is broken(%s)", pPeer->id, strerror(errno)); @@ -1133,7 +1116,7 @@ static void syncProcessBrokenLink(void *param) { } pthread_mutex_unlock(&(pNode->mutex)); - syncDecNodeRef(pNode); + taosReleaseRef(tsSyncRefId, pNode); } static void syncSaveFwdInfo(SSyncNode *pNode, uint64_t version, void *mhandle) { @@ -1202,22 +1185,90 @@ static void syncProcessFwdAck(SSyncNode *pNode, SFwdInfo *pFwdInfo, int32_t code static void syncMonitorFwdInfos(void *param, void *tmrId) { SSyncNode *pNode = param; + + int ret = taosAcquireRef(tsSyncRefId, pNode); + if ( ret < 0) return; + SSyncFwds *pSyncFwds = pNode->pSyncFwds; - if (pSyncFwds == NULL) return; - uint64_t time = taosGetTimestampMs(); + if (pSyncFwds) {; + uint64_t time = taosGetTimestampMs(); - if (pSyncFwds->fwds > 0) { - pthread_mutex_lock(&(pNode->mutex)); - for (int i = 0; i < pSyncFwds->fwds; ++i) { - SFwdInfo *pFwdInfo = pSyncFwds->fwdInfo + (pSyncFwds->first + i) % tsMaxFwdInfo; - if (time - pFwdInfo->time < 2000) break; - syncProcessFwdAck(pNode, pFwdInfo, TSDB_CODE_RPC_NETWORK_UNAVAIL); + if (pSyncFwds->fwds > 0) { + pthread_mutex_lock(&(pNode->mutex)); + for (int i = 0; i < pSyncFwds->fwds; ++i) { + SFwdInfo *pFwdInfo = pSyncFwds->fwdInfo + (pSyncFwds->first + i) % tsMaxFwdInfo; + if (time - pFwdInfo->time < 2000) break; + syncProcessFwdAck(pNode, pFwdInfo, TSDB_CODE_RPC_NETWORK_UNAVAIL); + } + + syncRemoveConfirmedFwdInfo(pNode); + pthread_mutex_unlock(&(pNode->mutex)); } - syncRemoveConfirmedFwdInfo(pNode); - pthread_mutex_unlock(&(pNode->mutex)); + pNode->pFwdTimer = taosTmrStart(syncMonitorFwdInfos, 300, pNode, syncTmrCtrl); } - pNode->pFwdTimer = taosTmrStart(syncMonitorFwdInfos, 300, pNode, syncTmrCtrl); + taosReleaseRef(tsSyncRefId, pNode); } + +static int32_t syncForwardToPeerImpl(SSyncNode *pNode, void *data, void *mhandle, int qtype) { + SSyncPeer *pPeer; + SSyncHead *pSyncHead; + SWalHead * pWalHead = data; + int fwdLen; + int32_t code = 0; + + if (nodeRole == TAOS_SYNC_ROLE_SLAVE && pWalHead->version != nodeVersion + 1) { + sError("vgId:%d, received ver:%" PRIu64 ", inconsistent with last ver:%" PRIu64 ", restart connection", pNode->vgId, + pWalHead->version, nodeVersion); + for (int i = 0; i < pNode->replica; ++i) { + pPeer = pNode->peerInfo[i]; + syncRestartConnection(pPeer); + } + return TSDB_CODE_SYN_INVALID_VERSION; + } + + // always update version + nodeVersion = pWalHead->version; + sDebug("vgId:%d, replica:%d nodeRole:%s qtype:%d ver:%" PRIu64, pNode->vgId, pNode->replica, syncRole[nodeRole], + qtype, pWalHead->version); + + if (pNode->replica == 1 || nodeRole != TAOS_SYNC_ROLE_MASTER) return 0; + + // only pkt from RPC or CQ can be forwarded + if (qtype != TAOS_QTYPE_RPC && qtype != TAOS_QTYPE_CQ) return 0; + + // a hacker way to improve the performance + pSyncHead = (SSyncHead *)(((char *)pWalHead) - sizeof(SSyncHead)); + pSyncHead->type = TAOS_SMSG_FORWARD; + pSyncHead->pversion = 0; + pSyncHead->len = sizeof(SWalHead) + pWalHead->len; + fwdLen = pSyncHead->len + sizeof(SSyncHead); // include the WAL and SYNC head + + pthread_mutex_lock(&(pNode->mutex)); + + for (int i = 0; i < pNode->replica; ++i) { + pPeer = pNode->peerInfo[i]; + if (pPeer == NULL || pPeer->peerFd < 0) continue; + if (pPeer->role != TAOS_SYNC_ROLE_SLAVE && pPeer->sstatus != TAOS_SYNC_STATUS_CACHE) continue; + + if (pNode->quorum > 1 && code == 0) { + syncSaveFwdInfo(pNode, pWalHead->version, mhandle); + code = 1; + } + + int retLen = write(pPeer->peerFd, pSyncHead, fwdLen); + if (retLen == fwdLen) { + sDebug("%s, forward is sent, ver:%" PRIu64 " contLen:%d", pPeer->id, pWalHead->version, pWalHead->len); + } else { + sError("%s, failed to forward, ver:%" PRIu64 " retLen:%d", pPeer->id, pWalHead->version, retLen); + syncRestartConnection(pPeer); + } + } + + pthread_mutex_unlock(&(pNode->mutex)); + + return code; +} + diff --git a/src/tsdb/src/tsdbMeta.c b/src/tsdb/src/tsdbMeta.c index 352940e1e6..6811c976ca 100644 --- a/src/tsdb/src/tsdbMeta.c +++ b/src/tsdb/src/tsdbMeta.c @@ -563,12 +563,12 @@ int tsdbUnlockRepoMeta(STsdbRepo *pRepo) { void tsdbRefTable(STable *pTable) { int32_t ref = T_REF_INC(pTable); UNUSED(ref); - // tsdbDebug("ref table %"PRIu64", tid:%d, refCount:%d", TABLE_UID(pTable), TABLE_TID(pTable), ref); + tsdbDebug("ref table %s uid %" PRIu64 " tid:%d, refCount:%d", TABLE_CHAR_NAME(pTable), TABLE_UID(pTable), TABLE_TID(pTable), ref); } void tsdbUnRefTable(STable *pTable) { int32_t ref = T_REF_DEC(pTable); - tsdbDebug("unref table uid:%"PRIu64", tid:%d, refCount:%d", TABLE_UID(pTable), TABLE_TID(pTable), ref); + tsdbDebug("unref table %s uid:%"PRIu64" tid:%d, refCount:%d", TABLE_CHAR_NAME(pTable), TABLE_UID(pTable), TABLE_TID(pTable), ref); if (ref == 0) { // tsdbDebug("destory table name:%s uid:%"PRIu64", tid:%d", TABLE_CHAR_NAME(pTable), TABLE_UID(pTable), TABLE_TID(pTable)); @@ -890,7 +890,7 @@ static void tsdbRemoveTableFromMeta(STsdbRepo *pRepo, STable *pTable, bool rmFro } if (lock) tsdbUnlockRepoMeta(pRepo); - tsdbDebug("vgId:%d table %s is removed from meta", REPO_ID(pRepo), TABLE_CHAR_NAME(pTable)); + tsdbDebug("vgId:%d table %s uid %" PRIu64 " is removed from meta", REPO_ID(pRepo), TABLE_CHAR_NAME(pTable), TABLE_UID(pTable)); tsdbUnRefTable(pTable); } diff --git a/src/tsdb/src/tsdbRead.c b/src/tsdb/src/tsdbRead.c index 33bbbee300..58bb167f70 100644 --- a/src/tsdb/src/tsdbRead.c +++ b/src/tsdb/src/tsdbRead.c @@ -2150,7 +2150,16 @@ STimeWindow changeTableGroupByLastrow(STableGroupInfo *groupList) { } } - // clear current group + // clear current group, unref unused table + for (int32_t i = 0; i < numOfTables; ++i) { + STableKeyInfo* pKeyInfo = (STableKeyInfo*)taosArrayGet(pGroup, i); + + // keyInfo.pTable may be NULL here. + if (pKeyInfo->pTable != keyInfo.pTable) { + tsdbUnRefTable(pKeyInfo->pTable); + } + } + taosArrayClear(pGroup); // more than one table in each group, only one table left for each group diff --git a/src/util/inc/tref.h b/src/util/inc/tref.h index 6619ff407e..ead8e2eb90 100644 --- a/src/util/inc/tref.h +++ b/src/util/inc/tref.h @@ -21,15 +21,42 @@ extern "C" { #endif -int taosOpenRef(int max, void (*fp)(void *)); // return refId which will be used by other APIs -void taosCloseRef(int refId); -int taosListRef(); // return the number of references in system -int taosAddRef(int refId, void *p); -int taosAcquireRef(int refId, void *p); -void taosReleaseRef(int refId, void *p); +// open an instance, return refId which will be used by other APIs +int taosOpenRef(int max, void (*fp)(void *)); +// close the Ref instance +void taosCloseRef(int refId); + +// add ref, p is the pointer to resource or pointer ID +int taosAddRef(int refId, void *p); #define taosRemoveRef taosReleaseRef +// acquire ref, p is the pointer to resource or pointer ID +int taosAcquireRef(int refId, void *p); + +// release ref, p is the pointer to resource or pinter ID +void taosReleaseRef(int refId, void *p); + +// return the first if p is null, otherwise return the next after p +void *taosIterateRef(int refId, void *p); + +// return the number of references in system +int taosListRef(); + +/* sample code to iterate the refs + +void demoIterateRefs(int refId) { + + void *p = taosIterateRef(refId, NULL); + while (p) { + + // process P + + p = taosIterateRef(refId, p); + } +} + +*/ #ifdef __cplusplus } diff --git a/src/util/src/tref.c b/src/util/src/tref.c index 4c3b836340..23a7210e99 100644 --- a/src/util/src/tref.c +++ b/src/util/src/tref.c @@ -143,8 +143,6 @@ int taosAddRef(int refId, void *p) return TSDB_CODE_REF_INVALID_ID; } - uTrace("refId:%d p:%p try to add", refId, p); - pSet = tsRefSetList + refId; taosIncRefCount(pSet); if (pSet->state != TSDB_REF_STATE_ACTIVE) { @@ -203,8 +201,6 @@ int taosAcquireRef(int refId, void *p) return TSDB_CODE_REF_INVALID_ID; } - uTrace("refId:%d p:%p try to acquire", refId, p); - pSet = tsRefSetList + refId; taosIncRefCount(pSet); if (pSet->state != TSDB_REF_STATE_ACTIVE) { @@ -254,8 +250,6 @@ void taosReleaseRef(int refId, void *p) return; } - uTrace("refId:%d p:%p try to release", refId, p); - pSet = tsRefSetList + refId; if (pSet->state == TSDB_REF_STATE_EMPTY) { uTrace("refId:%d p:%p failed to release, cleaned", refId, p); @@ -305,6 +299,75 @@ void taosReleaseRef(int refId, void *p) if (released) taosDecRefCount(pSet); } +// if p is NULL, return the first p in hash list, otherwise, return the next after p +void *taosIterateRef(int refId, void *p) { + SRefNode *pNode = NULL; + SRefSet *pSet; + + if (refId < 0 || refId >= TSDB_REF_OBJECTS) { + uTrace("refId:%d p:%p failed to iterate, refId not valid", refId, p); + return NULL; + } + + pSet = tsRefSetList + refId; + taosIncRefCount(pSet); + if (pSet->state != TSDB_REF_STATE_ACTIVE) { + uTrace("refId:%d p:%p failed to iterate, not active", refId, p); + taosDecRefCount(pSet); + return NULL; + } + + int hash = 0; + if (p) { + hash = taosHashRef(pSet, p); + taosLockList(pSet->lockedBy+hash); + + pNode = pSet->nodeList[hash]; + while (pNode) { + if (pNode->p == p) break; + pNode = pNode->next; + } + + if (pNode == NULL) { + uError("refId:%d p:%p not there, quit", refId, p); + taosUnlockList(pSet->lockedBy+hash); + return NULL; + } + + // p is there + pNode = pNode->next; + if (pNode == NULL) { + taosUnlockList(pSet->lockedBy+hash); + hash++; + } + } + + if (pNode == NULL) { + for (; hash < pSet->max; ++hash) { + taosLockList(pSet->lockedBy+hash); + pNode = pSet->nodeList[hash]; + if (pNode) break; + taosUnlockList(pSet->lockedBy+hash); + } + } + + void *newP = NULL; + if (pNode) { + pNode->count++; // acquire it + newP = pNode->p; + taosUnlockList(pSet->lockedBy+hash); + uTrace("refId:%d p:%p is returned", refId, p); + } else { + uTrace("refId:%d p:%p the list is over", refId, p); + } + + if (p) taosReleaseRef(refId, p); // release the current one + + taosDecRefCount(pSet); + + return newP; +} + int taosListRef() { SRefSet *pSet; SRefNode *pNode; diff --git a/src/util/src/tutil.c b/src/util/src/tutil.c index 6c4af437b2..099b9d9530 100644 --- a/src/util/src/tutil.c +++ b/src/util/src/tutil.c @@ -326,6 +326,7 @@ int32_t taosHexStrToByteArray(char hexstr[], char bytes[]) { return 0; } +// TODO move to comm module bool taosGetVersionNumber(char *versionStr, int *versionNubmer) { if (versionStr == NULL || versionNubmer == NULL) { return false; diff --git a/src/util/tests/trefTest.c b/src/util/tests/trefTest.c index 486f9f6d6d..09ffccd7b5 100644 --- a/src/util/tests/trefTest.c +++ b/src/util/tests/trefTest.c @@ -17,6 +17,19 @@ typedef struct { void **p; } SRefSpace; +void iterateRefs(int refId) { + int count = 0; + + void *p = taosIterateRef(refId, NULL); + while (p) { + // process P + count++; + p = taosIterateRef(refId, p); + } + + printf(" %d ", count); +} + void *takeRefActions(void *param) { SRefSpace *pSpace = (SRefSpace *)param; int code, id; @@ -44,6 +57,9 @@ void *takeRefActions(void *param) { usleep(id % 5 + 1); taosReleaseRef(pSpace->refId, pSpace->p[id]); } + + id = random() % pSpace->refNum; + iterateRefs(id); } for (int i=0; i < pSpace->refNum; ++i) { @@ -63,7 +79,7 @@ void *openRefSpace(void *param) { SRefSpace *pSpace = (SRefSpace *)param; printf("c"); - pSpace->refId = taosOpenRef(10000, myfree); + pSpace->refId = taosOpenRef(50, myfree); if (pSpace->refId < 0) { printf("failed to open ref, reson:%s\n", tstrerror(pSpace->refId)); diff --git a/tests/pytest/fulltest.sh b/tests/pytest/fulltest.sh index 294bc52a94..b14321a4ef 100755 --- a/tests/pytest/fulltest.sh +++ b/tests/pytest/fulltest.sh @@ -154,6 +154,7 @@ python3 ./test.py -f query/queryConnection.py python3 ./test.py -f query/queryCountCSVData.py python3 ./test.py -f query/natualInterval.py python3 ./test.py -f query/bug1471.py +python3 ./test.py -f query/dataLossTest.py #stream python3 ./test.py -f stream/metric_1.py diff --git a/tests/pytest/query/dataLossTest.py b/tests/pytest/query/dataLossTest.py new file mode 100644 index 0000000000..b29dc1fa9f --- /dev/null +++ b/tests/pytest/query/dataLossTest.py @@ -0,0 +1,76 @@ +################################################################### +# Copyright (c) 2016 by TAOS Technologies, Inc. +# All rights reserved. +# +# This file is proprietary and confidential to TAOS Technologies. +# No part of this file may be reproduced, stored, transmitted, +# disclosed or used in any form or by any means other than as +# expressly provided by the written permission from Jianhui Tao +# +################################################################### + +# -*- coding: utf-8 -*- + +import sys +import taos +import os +from util.log import * +from util.cases import * +from util.sql import * +from util.dnodes import * +import inspect + + +class TDTestCase: + def init(self, conn, logSql): + tdLog.debug("start to execute %s" % __file__) + tdSql.init(conn.cursor()) + + self.numberOfTables = 240 + self.numberOfRecords = 10000 + + def run(self): + tdSql.prepare() + + os.system("yes | taosdemo -t %d -n %d" % (self.numberOfTables, self.numberOfRecords)) + print("==============step1") + + tdSql.execute("use test") + sql = "select count(*) from meters" + tdSql.query(sql) + rows = tdSql.getData(0, 0) + print ("number of records: %d" % rows) + + newRows = rows + for i in range(10000): + print("kill taosd") + time.sleep(10) + os.system("sudo kill -9 $(pgrep taosd)") + tdDnodes.startWithoutSleep(1) + while True: + try: + tdSql.query(sql) + newRows = tdSql.getData(0, 0) + print("numer of records after kill taosd %d" % newRows) + time.sleep(10) + break + except Exception as e: + pass + continue + + if newRows < rows: + caller = inspect.getframeinfo(inspect.stack()[1][0]) + args = (caller.filename, caller.lineno, sql, newRows, rows) + tdLog.exit("%s(%d) failed: sql:%s, queryRows:%d != expect:%d" % args) + break + + tdSql.query(sql) + tdSql.checkData(0, 0, rows) + + def stop(self): + tdSql.close() + tdLog.success("%s successfully executed" % __file__) + + +tdCases.addWindows(__file__, TDTestCase()) +tdCases.addLinux(__file__, TDTestCase()) diff --git a/tests/pytest/util/dnodes.py b/tests/pytest/util/dnodes.py index 1ac492bb3a..757399b4a2 100644 --- a/tests/pytest/util/dnodes.py +++ b/tests/pytest/util/dnodes.py @@ -15,6 +15,7 @@ import sys import os import os.path import subprocess +from time import sleep from util.log import * @@ -210,6 +211,7 @@ class TDDnode: (self.index, self.cfgPath)) def getBuildPath(self): + buildPath = "" selfPath = os.path.dirname(os.path.realpath(__file__)) if ("community" in selfPath): @@ -256,6 +258,35 @@ class TDDnode: tdLog.debug("wait 5 seconds for the dnode:%d to start." % (self.index)) time.sleep(5) + + def startWithoutSleep(self): + buildPath = self.getBuildPath() + + if (buildPath == ""): + tdLog.exit("taosd not found!") + else: + tdLog.info("taosd found in %s" % buildPath) + + binPath = buildPath + "/build/bin/taosd" + + if self.deployed == 0: + tdLog.exit("dnode:%d is not deployed" % (self.index)) + + if self.valgrind == 0: + cmd = "nohup %s -c %s > /dev/null 2>&1 & " % ( + binPath, self.cfgDir) + else: + valgrindCmdline = "valgrind --tool=memcheck --leak-check=full --show-reachable=no --track-origins=yes --show-leak-kinds=all -v --workaround-gcc296-bugs=yes" + + cmd = "nohup %s %s -c %s 2>&1 & " % ( + valgrindCmdline, binPath, self.cfgDir) + + print(cmd) + + if os.system(cmd) != 0: + tdLog.exit(cmd) + self.running = 1 + tdLog.debug("dnode:%d is running with %s " % (self.index, cmd)) def stop(self): if self.valgrind == 0: @@ -425,6 +456,10 @@ class TDDnodes: def start(self, index): self.check(index) self.dnodes[index - 1].start() + + def startWithoutSleep(self, index): + self.check(index) + self.dnodes[index - 1].startWithoutSleep() def stop(self, index): self.check(index) diff --git a/tests/pytest/util/sql.py b/tests/pytest/util/sql.py index 9abec354c6..b2ed6212fd 100644 --- a/tests/pytest/util/sql.py +++ b/tests/pytest/util/sql.py @@ -25,7 +25,7 @@ class TDSql: self.queryCols = 0 self.affectedRows = 0 - def init(self, cursor, log=True): + def init(self, cursor, log=False): self.cursor = cursor if (log): diff --git a/tests/script/general/parser/join_multivnode.sim b/tests/script/general/parser/join_multivnode.sim index 5968a9cd5e..5e4a0990c1 100644 --- a/tests/script/general/parser/join_multivnode.sim +++ b/tests/script/general/parser/join_multivnode.sim @@ -132,4 +132,134 @@ sql select join_mt0.ts, join_mt1.t1, join_mt0.t1, join_mt1.tbname, join_mt0.tbna sql select join_mt0.ts, join_mt1.t1, join_mt0.t1, join_mt1.tbname, join_mt0.tbname from join_mt0, join_mt1 where join_mt0.ts=join_mt1.ts and join_mt0.t1=join_mt1.t1 limit 1 +#1970-01-01 08:01:40.800 | 10 | 45.000000000 | 0 | true | false | 0 | +#1970-01-01 08:01:40.790 | 10 | 945.000000000 | 90 | true | true | 0 | + +sql select count(join_mt0.c1), sum(join_mt1.c2), first(join_mt0.c5), last(join_mt1.c7), first(join_mt1.c7) from join_mt0, join_mt1 where join_mt0.t1=join_mt1.t1 and join_mt0.ts=join_mt1.ts interval(10a) group by join_mt0.t1 order by join_mt0.ts desc limit 20 offset 19; +if $rows != 100 then + return -1 +endi + +# c5 is null ! error + +sql select count(join_mt0.c1), sum(join_mt0.c2)/count(*), avg(c2), first(join_mt0.c5), last(c7) from join_mt0 interval(10a) group by join_mt0.t1 order by join_mt0.ts desc; +if $rows != 100 then + return -1 +endi + +if $data00 != @70-01-01 08:01:40.990@ then + print expect 0, actual: $data00 + return -1 +endi + +if $data01 != 30 then + return -1 +endi + +if $data02 != 94.500000000 then + print expect 94.500000000, actual $data02 + return -1 +endi + +if $data03 != 94.500000000 then + return -1 +endi + +if $data04 != 90 then + return -1 +endi + +if $data05 != 1 then + return -1 +endi + +if $data06 != 2 then + return -1 +endi + +if $data10 != @70-01-01 08:01:40.980@ then + print expect 70-01-01 08:01:40.980, actual: $data10 + return -1 +endi + +if $data11 != 30 then + return -1 +endi + +if $data12 != 84.500000000 then + print expect 84.500000000, actual $data12 + return -1 +endi + +if $data13 != 84.500000000 then + return -1 +endi + +if $data14 != 80 then + return -1 +endi + +if $data15 != 1 then + return -1 +endi + +if $data16 != 2 then + return -1 +endi + +# this function will cause shell crash +sql select count(join_mt0.c1), first(join_mt0.c1) from join_mt0, join_mt1 where join_mt0.t1=join_mt1.t1 and join_mt0.ts=join_mt1.ts interval(10a) group by join_mt0.t1 order by join_mt0.ts desc; +if $rows != 100 then + return -1 +endi + +if $data00 != @70-01-01 08:01:40.990@ then + return -1 +endi + +if $data01 != 10 then + return -1 +endi + +if $data02 != 90 then + return -1 +endi + +if $data03 != 0 then + return -1 +endi + +if $data11 != 10 then + return -1 +endi + +if $data12 != 80 then + return -1 +endi + +if $data13 != 0 then + return -1 +endi + +sql select last(join_mt1.c7), first(join_mt1.c7) from join_mt0, join_mt1 where join_mt0.t1=join_mt1.t1 and join_mt0.ts=join_mt1.ts interval(10m) group by join_mt0.t1 order by join_mt0.ts asc; +if $rows != 1 then + return -1 +endi + +if $data00 != @70-01-01 08:00:00.000@ then + return -1 +endi + +if $data01 != 1 then + return -1 +endi + +if $data02 != 0 then + return -1 +endi + +if $data03 != 0 then + return -1 +endi + system sh/exec.sh -n dnode1 -s stop -x SIGINT \ No newline at end of file diff --git a/tests/script/general/parser/projection_limit_offset.sim b/tests/script/general/parser/projection_limit_offset.sim index 127ade66c5..7c83ca0c2f 100644 --- a/tests/script/general/parser/projection_limit_offset.sim +++ b/tests/script/general/parser/projection_limit_offset.sim @@ -324,8 +324,22 @@ sql create table tm0 using m1 tags(1); sql create table tm1 using m1 tags(2); sql insert into tm0 values(10000, 1) (20000, 2)(30000, 3) (40000, NULL) (50000, 2) tm1 values(10001, 2)(20000,4)(90000,9); -sql select count(*),first(k),last(k) from m1 where tbname in ('tm0') interval(1s) order by ts desc; +#=============================tbase-1205 +sql select count(*) from tm1 where ts= now -1d interval(1h) fill(NULL); + +print ===================>TD-1834 +sql select * from tm0 where ts>11000 and ts< 20000 order by ts asc +if $rows != 0 then + return -1 +endi + +sql select * from tm0 where ts>11000 and ts< 20000 order by ts desc +if $rows != 0 then + return -1 +endi + +sql select count(*),first(k),last(k) from m1 where tbname in ('tm0') interval(1s) order by ts desc; if $row != 5 then return -1 endi @@ -386,7 +400,25 @@ sql_error select k+1,sum(k) from tm0; sql_error select k, sum(k) from tm0; sql_error select k, sum(k)+1 from tm0; +print ================== restart server to commit data into disk +system sh/exec.sh -n dnode1 -s stop -x SIGINT +sleep 5000 +system sh/exec.sh -n dnode1 -s start +print ================== server restart completed + #=============================tbase-1205 sql select count(*) from tm1 where ts= now -1d interval(1h) fill(NULL); +print ===================>TD-1834 +sql select * from tm0 where ts>11000 and ts< 20000 order by ts asc +if $rows != 0 then + return -1 +endi + +sql select * from tm0 where ts>11000 and ts< 20000 order by ts desc +if $rows != 0 then + return -1 +endi + +