diff --git a/src/client/inc/tscUtil.h b/src/client/inc/tscUtil.h index 76a9bbac10..3df493349e 100644 --- a/src/client/inc/tscUtil.h +++ b/src/client/inc/tscUtil.h @@ -149,14 +149,13 @@ int tscAllocPayload(SSqlCmd* pCmd, int size); TAOS_FIELD tscCreateField(int8_t type, const char* name, int16_t bytes); -SFieldSupInfo* tscFieldInfoAppend(SFieldInfo* pFieldInfo, TAOS_FIELD* pField); -SFieldSupInfo* tscFieldInfoInsert(SFieldInfo* pFieldInfo, int32_t index, TAOS_FIELD* field); +SInternalField* tscFieldInfoAppend(SFieldInfo* pFieldInfo, TAOS_FIELD* pField); +SInternalField* tscFieldInfoInsert(SFieldInfo* pFieldInfo, int32_t index, TAOS_FIELD* field); -SFieldSupInfo* tscFieldInfoGetSupp(SFieldInfo* pFieldInfo, int32_t index); +SInternalField* tscFieldInfoGetInternalField(SFieldInfo* pFieldInfo, int32_t index); TAOS_FIELD* tscFieldInfoGetField(SFieldInfo* pFieldInfo, int32_t index); void tscFieldInfoUpdateOffset(SQueryInfo* pQueryInfo); -void tscFieldInfoCopy(SFieldInfo* dst, const SFieldInfo* src); void tscFieldInfoUpdateOffsetForInterResult(SQueryInfo* pQueryInfo); int16_t tscFieldInfoGetOffset(SQueryInfo* pQueryInfo, int32_t index); @@ -231,10 +230,11 @@ int tscGetTableMeta(SSqlObj* pSql, STableMetaInfo* pTableMetaInfo); int tscGetMeterMetaEx(SSqlObj* pSql, STableMetaInfo* pTableMetaInfo, bool createIfNotExists); void tscResetForNextRetrieve(SSqlRes* pRes); - -void tscAddTimestampColumn(SQueryInfo* pQueryInfo, int16_t functionId, int16_t tableIndex); void tscDoQuery(SSqlObj* pSql); +SVgroupsInfo* tscVgroupInfoClone(SVgroupsInfo *pInfo); +void* tscVgroupInfoClear(SVgroupsInfo *pInfo); +void tscSCMVgroupInfoCopy(SCMVgroupInfo* dst, const SCMVgroupInfo* src); /** * The create object function must be successful expect for the out of memory issue. * diff --git a/src/client/inc/tsclient.h b/src/client/inc/tsclient.h index 74550c56e8..fa215db270 100644 --- a/src/client/inc/tsclient.h +++ b/src/client/inc/tsclient.h @@ -90,10 +90,10 @@ typedef struct STableComInfo { } STableComInfo; typedef struct SCMCorVgroupInfo { - int32_t version; - int8_t inUse; - int8_t numOfEps; - SEpAddr epAddr[TSDB_MAX_REPLICA]; + int32_t version; + int8_t inUse; + int8_t numOfEps; + SEpAddr1 epAddr[TSDB_MAX_REPLICA]; } SCMCorVgroupInfo; typedef struct STableMeta { @@ -142,16 +142,17 @@ typedef struct SColumnIndex { int16_t columnIndex; } SColumnIndex; -typedef struct SFieldSupInfo { +typedef struct SInternalField { + TAOS_FIELD field; bool visible; SExprInfo *pArithExprInfo; SSqlExpr *pSqlExpr; -} SFieldSupInfo; +} SInternalField; typedef struct SFieldInfo { - int16_t numOfOutput; // number of column in result - SArray *pFields; // SArray - SArray *pSupportInfo; // SArray + int16_t numOfOutput; // number of column in result + TAOS_FIELD* final; + SArray *internalField; // SArray } SFieldInfo; typedef struct SColumn { @@ -443,6 +444,7 @@ void tscPartiallyFreeSqlObj(SSqlObj *pSql); */ void tscFreeSqlObj(SSqlObj *pSql); void tscFreeRegisteredSqlObj(void *pSql); +void tscFreeTableMetaHelper(void *pTableMeta); void tscCloseTscObj(STscObj *pObj); @@ -467,7 +469,7 @@ int32_t tscSQLSyntaxErrMsg(char* msg, const char* additionalInfo, const char* s int32_t tscToSQLCmd(SSqlObj *pSql, struct SSqlInfo *pInfo); static FORCE_INLINE void tscGetResultColumnChr(SSqlRes* pRes, SFieldInfo* pFieldInfo, int32_t columnIndex) { - SFieldSupInfo* pInfo = (SFieldSupInfo*) TARRAY_GET_ELEM(pFieldInfo->pSupportInfo, columnIndex); + SInternalField* pInfo = (SInternalField*) TARRAY_GET_ELEM(pFieldInfo->internalField, columnIndex); assert(pInfo->pSqlExpr != NULL); int32_t type = pInfo->pSqlExpr->resType; diff --git a/src/client/src/tscAsync.c b/src/client/src/tscAsync.c index 639de294e6..c996bb2a76 100644 --- a/src/client/src/tscAsync.c +++ b/src/client/src/tscAsync.c @@ -327,7 +327,7 @@ void tscAsyncFetchSingleRowProxy(void *param, TAOS_RES *tres, int numOfRows) { } for (int i = 0; i < pCmd->numOfCols; ++i){ - SFieldSupInfo* pSup = taosArrayGet(pQueryInfo->fieldsInfo.pSupportInfo, i); + SInternalField* pSup = taosArrayGet(pQueryInfo->fieldsInfo.internalField, i); if (pSup->pSqlExpr != NULL) { // pRes->tsrow[i] = TSC_GET_RESPTR_BASE(pRes, pQueryInfo, i) + pSup->pSqlExpr->resBytes * pRes->row; } else { @@ -348,7 +348,7 @@ void tscProcessFetchRow(SSchedMsg *pMsg) { SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex); for (int i = 0; i < pCmd->numOfCols; ++i) { - SFieldSupInfo* pSup = taosArrayGet(pQueryInfo->fieldsInfo.pSupportInfo, i); + SInternalField* pSup = taosArrayGet(pQueryInfo->fieldsInfo.internalField, i); if (pSup->pSqlExpr != NULL) { tscGetResultColumnChr(pRes, &pQueryInfo->fieldsInfo, i); @@ -405,11 +405,11 @@ void tscTableMetaCallBack(void *param, TAOS_RES *res, int code) { SSqlRes *pRes = &pSql->res; pRes->code = code; + const char* msg = (pCmd->command == TSDB_SQL_STABLEVGROUP)? "vgroup-list":"table-meta"; if (code != TSDB_CODE_SUCCESS) { - tscError("%p get tableMeta failed, code:%s", pSql, tstrerror(code)); + tscError("%p get %s failed, code:%s", pSql, msg, tstrerror(code)); goto _error; } else { - const char* msg = (pCmd->command == TSDB_SQL_STABLEVGROUP)? "vgroup-list":"table-meta"; tscDebug("%p get %s successfully", pSql, msg); } diff --git a/src/client/src/tscLocal.c b/src/client/src/tscLocal.c index 929b9ffe68..b4c3f3549b 100644 --- a/src/client/src/tscLocal.c +++ b/src/client/src/tscLocal.c @@ -239,7 +239,7 @@ static int32_t tscBuildTableSchemaResultFields(SSqlObj *pSql, int32_t numOfCols, TAOS_FIELD f = {.type = TSDB_DATA_TYPE_BINARY, .bytes = (TSDB_COL_NAME_LEN - 1) + VARSTR_HEADER_SIZE}; tstrncpy(f.name, "Field", sizeof(f.name)); - SFieldSupInfo* pInfo = tscFieldInfoAppend(&pQueryInfo->fieldsInfo, &f); + SInternalField* pInfo = tscFieldInfoAppend(&pQueryInfo->fieldsInfo, &f); pInfo->pSqlExpr = tscSqlExprAppend(pQueryInfo, TSDB_FUNC_TS_DUMMY, &index, TSDB_DATA_TYPE_BINARY, (TSDB_COL_NAME_LEN - 1) + VARSTR_HEADER_SIZE, (TSDB_COL_NAME_LEN - 1), false); @@ -485,7 +485,7 @@ static int32_t tscSCreateBuildResultFields(SSqlObj *pSql, BuildType type, const tstrncpy(f.name, "Database", sizeof(f.name)); } - SFieldSupInfo* pInfo = tscFieldInfoAppend(&pQueryInfo->fieldsInfo, &f); + SInternalField* pInfo = tscFieldInfoAppend(&pQueryInfo->fieldsInfo, &f); pInfo->pSqlExpr = tscSqlExprAppend(pQueryInfo, TSDB_FUNC_TS_DUMMY, &index, TSDB_DATA_TYPE_BINARY, f.bytes, f.bytes - VARSTR_HEADER_SIZE, false); @@ -922,19 +922,17 @@ void tscSetLocalQueryResult(SSqlObj *pSql, const char *val, const char *columnNa pQueryInfo->order.order = TSDB_ORDER_ASC; tscFieldInfoClear(&pQueryInfo->fieldsInfo); - pQueryInfo->fieldsInfo.pFields = taosArrayInit(1, sizeof(TAOS_FIELD)); - pQueryInfo->fieldsInfo.pSupportInfo = taosArrayInit(1, sizeof(SFieldSupInfo)); + pQueryInfo->fieldsInfo.internalField = taosArrayInit(1, sizeof(SInternalField)); TAOS_FIELD f = tscCreateField((int8_t)type, columnName, (int16_t)valueLength); tscFieldInfoAppend(&pQueryInfo->fieldsInfo, &f); tscInitResObjForLocalQuery(pSql, 1, (int32_t)valueLength); - TAOS_FIELD *pField = tscFieldInfoGetField(&pQueryInfo->fieldsInfo, 0); - SFieldSupInfo* pInfo = tscFieldInfoGetSupp(&pQueryInfo->fieldsInfo, 0); + SInternalField* pInfo = tscFieldInfoGetInternalField(&pQueryInfo->fieldsInfo, 0); pInfo->pSqlExpr = taosArrayGetP(pQueryInfo->exprList, 0); - memcpy(pRes->data, val, pField->bytes); + memcpy(pRes->data, val, pInfo->field.bytes); } int tscProcessLocalCmd(SSqlObj *pSql) { diff --git a/src/client/src/tscLocalMerge.c b/src/client/src/tscLocalMerge.c index d800d62c8a..44ccb2471a 100644 --- a/src/client/src/tscLocalMerge.c +++ b/src/client/src/tscLocalMerge.c @@ -510,7 +510,8 @@ void tscDestroyLocalReducer(SSqlObj *pSql) { taosTFree(pLocalReducer->pResultBuf); if (pLocalReducer->pResInfo != NULL) { - for (int32_t i = 0; i < pQueryInfo->fieldsInfo.numOfOutput; ++i) { + size_t num = tscSqlExprNumOfExprs(pQueryInfo); + for (int32_t i = 0; i < num; ++i) { taosTFree(pLocalReducer->pResInfo[i].interResultBuf); } diff --git a/src/client/src/tscSQLParser.c b/src/client/src/tscSQLParser.c index 055d44ff1e..e2573f7e19 100644 --- a/src/client/src/tscSQLParser.c +++ b/src/client/src/tscSQLParser.c @@ -122,7 +122,7 @@ static int32_t doCheckForCreateTable(SSqlObj* pSql, int32_t subClauseIndex, SSql static int32_t doCheckForCreateFromStable(SSqlObj* pSql, SSqlInfo* pInfo); static int32_t doCheckForStream(SSqlObj* pSql, SSqlInfo* pInfo); static int32_t doCheckForQuery(SSqlObj* pSql, SQuerySQL* pQuerySql, int32_t index); -static int32_t exprTreeFromSqlExpr(SSqlCmd* pCmd, tExprNode **pExpr, const tSQLExpr* pSqlExpr, SArray* pExprInfo, SQueryInfo* pQueryInfo, SArray* pCols); +static int32_t exprTreeFromSqlExpr(SSqlCmd* pCmd, tExprNode **pExpr, const tSQLExpr* pSqlExpr, SQueryInfo* pQueryInfo, SArray* pCols, int64_t *uid); /* * Used during parsing query sql. Since the query sql usually small in length, error position @@ -190,7 +190,8 @@ int32_t tscToSQLCmd(SSqlObj* pSql, struct SSqlInfo* pInfo) { SSqlRes* pRes = &pSql->res; int32_t code = TSDB_CODE_SUCCESS; - if (!pInfo->valid) { + if (!pInfo->valid || terrno == TSDB_CODE_TSC_SQL_SYNTAX_ERROR) { + terrno = TSDB_CODE_SUCCESS; // clear the error number return tscSQLSyntaxErrMsg(tscGetErrorMsgPayload(pCmd), NULL, pInfo->pzErrMsg); } @@ -1249,7 +1250,7 @@ static int32_t handleArithmeticExpr(SSqlCmd* pCmd, int32_t clauseIndex, int32_t tExprNode* pNode = NULL; SArray* colList = taosArrayInit(10, sizeof(SColIndex)); - int32_t ret = exprTreeFromSqlExpr(pCmd, &pNode, pItem->pNode, pQueryInfo->exprList, pQueryInfo, colList); + int32_t ret = exprTreeFromSqlExpr(pCmd, &pNode, pItem->pNode, pQueryInfo, colList, NULL); if (ret != TSDB_CODE_SUCCESS) { taosArrayDestroy(colList); tExprTreeDestroy(&pNode, NULL); @@ -1305,17 +1306,17 @@ static int32_t handleArithmeticExpr(SSqlCmd* pCmd, int32_t clauseIndex, int32_t insertResultField(pQueryInfo, exprIndex, &columnList, sizeof(double), TSDB_DATA_TYPE_DOUBLE, aliasName, NULL); int32_t slot = tscNumOfFields(pQueryInfo) - 1; - SFieldSupInfo* pInfo = tscFieldInfoGetSupp(&pQueryInfo->fieldsInfo, slot); + SInternalField* pInfo = tscFieldInfoGetInternalField(&pQueryInfo->fieldsInfo, slot); if (pInfo->pSqlExpr == NULL) { SExprInfo* pArithExprInfo = calloc(1, sizeof(SExprInfo)); // arithmetic expression always return result in the format of double float - pArithExprInfo->bytes = sizeof(double); + pArithExprInfo->bytes = sizeof(double); pArithExprInfo->interBytes = sizeof(double); - pArithExprInfo->type = TSDB_DATA_TYPE_DOUBLE; + pArithExprInfo->type = TSDB_DATA_TYPE_DOUBLE; - int32_t ret = exprTreeFromSqlExpr(pCmd, &pArithExprInfo->pExpr, pItem->pNode, pQueryInfo->exprList, pQueryInfo, NULL); + int32_t ret = exprTreeFromSqlExpr(pCmd, &pArithExprInfo->pExpr, pItem->pNode, pQueryInfo, NULL, &pArithExprInfo->uid); if (ret != TSDB_CODE_SUCCESS) { tExprTreeDestroy(&pArithExprInfo->pExpr, NULL); return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), "invalid expression in select clause"); @@ -1372,7 +1373,7 @@ static void addPrimaryTsColIntoResult(SQueryInfo* pQueryInfo) { int32_t numOfCols = (int32_t)tscSqlExprNumOfExprs(pQueryInfo); tscAddSpecialColumnForSelect(pQueryInfo, numOfCols, TSDB_FUNC_PRJ, &index, pSchema, TSDB_COL_NORMAL); - SFieldSupInfo* pSupInfo = tscFieldInfoGetSupp(&pQueryInfo->fieldsInfo, numOfCols); + SInternalField* pSupInfo = tscFieldInfoGetInternalField(&pQueryInfo->fieldsInfo, numOfCols); pSupInfo->visible = false; pQueryInfo->type |= TSDB_QUERY_TYPE_PROJECTION_QUERY; @@ -1469,7 +1470,7 @@ int32_t insertResultField(SQueryInfo* pQueryInfo, int32_t outputIndex, SColumnLi } TAOS_FIELD f = tscCreateField(type, fieldName, bytes); - SFieldSupInfo* pInfo = tscFieldInfoInsert(&pQueryInfo->fieldsInfo, outputIndex, &f); + SInternalField* pInfo = tscFieldInfoInsert(&pQueryInfo->fieldsInfo, outputIndex, &f); pInfo->pSqlExpr = pSqlExpr; return TSDB_CODE_SUCCESS; @@ -3384,10 +3385,26 @@ static int32_t validateSQLExpr(SSqlCmd* pCmd, tSQLExpr* pExpr, SQueryInfo* pQuer tSQLExprItem item = {.pNode = pExpr, .aliasName = NULL}; - // sql function in selection clause, append sql function info in pSqlCmd structure sequentially + // sql function list in selection clause. + // Append the sqlExpr into exprList of pQueryInfo structure sequentially if (addExprAndResultField(pCmd, pQueryInfo, outputIndex, &item, false) != TSDB_CODE_SUCCESS) { return TSDB_CODE_TSC_INVALID_SQL; } + + // It is invalid in case of more than one sqlExpr, such as first(ts, k) - last(ts, k) + int32_t inc = (int32_t) tscSqlExprNumOfExprs(pQueryInfo) - outputIndex; + if (inc > 1) { + return TSDB_CODE_TSC_INVALID_SQL; + } + + // Not supported data type in arithmetic expression + for(int32_t i = 0; i < inc; ++i) { + SSqlExpr* p1 = tscSqlExprGet(pQueryInfo, i + outputIndex); + int16_t t = p1->resType; + if (t == TSDB_DATA_TYPE_BINARY || t == TSDB_DATA_TYPE_NCHAR || t == TSDB_DATA_TYPE_BOOL || t == TSDB_DATA_TYPE_TIMESTAMP) { + return TSDB_CODE_TSC_INVALID_SQL; + } + } } return TSDB_CODE_SUCCESS; @@ -4062,7 +4079,7 @@ static int32_t getTagQueryCondExpr(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SCondE tExprNode* p = NULL; SArray* colList = taosArrayInit(10, sizeof(SColIndex)); - ret = exprTreeFromSqlExpr(pCmd, &p, p1, NULL, pQueryInfo, colList); + ret = exprTreeFromSqlExpr(pCmd, &p, p1, pQueryInfo, colList, NULL); SBufferWriter bw = tbufInitWriter(NULL, false); TRY(0) { @@ -4733,7 +4750,7 @@ int32_t setAlterTableInfo(SSqlObj* pSql, struct SSqlInfo* pInfo) { // validate the length of binary if ((pTagsSchema->type == TSDB_DATA_TYPE_BINARY || pTagsSchema->type == TSDB_DATA_TYPE_NCHAR) && - (pVarList->a[1].pVar.nLen + VARSTR_HEADER_SIZE) > pTagsSchema->bytes) { + varDataTLen(pAlterSQL->tagData.data) > pTagsSchema->bytes) { return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg14); } @@ -5259,26 +5276,6 @@ int32_t parseCreateDBOptions(SSqlCmd* pCmd, SCreateDBInfo* pCreateDbSql) { return TSDB_CODE_SUCCESS; } -//void tscAddTimestampColumn(SQueryInfo* pQueryInfo, int16_t functionId, int16_t tableIndex) { -// // the first column not timestamp column, add it -// SSqlExpr* pExpr = NULL; -// if (tscSqlExprNumOfExprs(pQueryInfo) > 0) { -// pExpr = tscSqlExprGet(pQueryInfo, 0); -// } -// -// if (pExpr == NULL || pExpr->colInfo.colId != PRIMARYKEY_TIMESTAMP_COL_INDEX || pExpr->functionId != functionId) { -// SColumnIndex index = {tableIndex, PRIMARYKEY_TIMESTAMP_COL_INDEX}; -// -// pExpr = tscSqlExprInsert(pQueryInfo, 0, functionId, &index, TSDB_DATA_TYPE_TIMESTAMP, TSDB_KEYSIZE, TSDB_KEYSIZE, false); -// pExpr->colInfo.flag = TSDB_COL_NORMAL; -// -// // NOTE: tag column does not add to source column list -// SColumnList ids = getColumnList(1, tableIndex, PRIMARYKEY_TIMESTAMP_COL_INDEX); -// -// insertResultField(pQueryInfo, 0, &ids, TSDB_KEYSIZE, TSDB_DATA_TYPE_TIMESTAMP, "ts", pExpr); -// } -//} - void addGroupInfoForSubquery(SSqlObj* pParentObj, SSqlObj* pSql, int32_t subClauseIndex, int32_t tableIndex) { SQueryInfo* pParentQueryInfo = tscGetQueryInfoDetail(&pParentObj->cmd, subClauseIndex); @@ -5335,7 +5332,7 @@ void doAddGroupColumnForSubquery(SQueryInfo* pQueryInfo, int32_t tagIndex) { tscAddSpecialColumnForSelect(pQueryInfo, (int32_t)size, TSDB_FUNC_PRJ, &colIndex, pSchema, TSDB_COL_NORMAL); - SFieldSupInfo* pInfo = tscFieldInfoGetSupp(&pQueryInfo->fieldsInfo, (int32_t)size); + SInternalField* pInfo = tscFieldInfoGetInternalField(&pQueryInfo->fieldsInfo, (int32_t)size); doLimitOutputNormalColOfGroupby(pInfo->pSqlExpr); pInfo->visible = false; } @@ -6380,19 +6377,19 @@ int32_t doCheckForQuery(SSqlObj* pSql, SQuerySQL* pQuerySql, int32_t index) { return TSDB_CODE_SUCCESS; // Does not build query message here } -int32_t exprTreeFromSqlExpr(SSqlCmd* pCmd, tExprNode **pExpr, const tSQLExpr* pSqlExpr, SArray* pExprInfo, SQueryInfo* pQueryInfo, SArray* pCols) { +int32_t exprTreeFromSqlExpr(SSqlCmd* pCmd, tExprNode **pExpr, const tSQLExpr* pSqlExpr, SQueryInfo* pQueryInfo, SArray* pCols, int64_t *uid) { tExprNode* pLeft = NULL; tExprNode* pRight= NULL; if (pSqlExpr->pLeft != NULL) { - int32_t ret = exprTreeFromSqlExpr(pCmd, &pLeft, pSqlExpr->pLeft, pExprInfo, pQueryInfo, pCols); + int32_t ret = exprTreeFromSqlExpr(pCmd, &pLeft, pSqlExpr->pLeft, pQueryInfo, pCols, uid); if (ret != TSDB_CODE_SUCCESS) { return ret; } } if (pSqlExpr->pRight != NULL) { - int32_t ret = exprTreeFromSqlExpr(pCmd, &pRight, pSqlExpr->pRight, pExprInfo, pQueryInfo, pCols); + int32_t ret = exprTreeFromSqlExpr(pCmd, &pRight, pSqlExpr->pRight, pQueryInfo, pCols, uid); if (ret != TSDB_CODE_SUCCESS) { return ret; } @@ -6419,14 +6416,19 @@ int32_t exprTreeFromSqlExpr(SSqlCmd* pCmd, tExprNode **pExpr, const tSQLExpr* pS strncpy((*pExpr)->pSchema->name, pSqlExpr->operand.z, pSqlExpr->operand.n); // set the input column data byte and type. - size_t size = taosArrayGetSize(pExprInfo); + size_t size = taosArrayGetSize(pQueryInfo->exprList); for (int32_t i = 0; i < size; ++i) { - SSqlExpr* p1 = taosArrayGetP(pExprInfo, i); + SSqlExpr* p1 = taosArrayGetP(pQueryInfo->exprList, i); if (strcmp((*pExpr)->pSchema->name, p1->aliasName) == 0) { - (*pExpr)->pSchema->type = (uint8_t)p1->resType; + (*pExpr)->pSchema->type = (uint8_t)p1->resType; (*pExpr)->pSchema->bytes = p1->resBytes; + + if (uid != NULL) { + *uid = p1->uid; + } + break; } } diff --git a/src/client/src/tscSchemaUtil.c b/src/client/src/tscSchemaUtil.c index 1e841c68fd..dfd707344c 100644 --- a/src/client/src/tscSchemaUtil.c +++ b/src/client/src/tscSchemaUtil.c @@ -145,10 +145,11 @@ static void tscInitCorVgroupInfo(SCMCorVgroupInfo *corVgroupInfo, SCMVgroupInfo corVgroupInfo->inUse = 0; corVgroupInfo->numOfEps = vgroupInfo->numOfEps; for (int32_t i = 0; i < corVgroupInfo->numOfEps; i++) { - strncpy(corVgroupInfo->epAddr[i].fqdn, vgroupInfo->epAddr[i].fqdn, TSDB_FQDN_LEN); + corVgroupInfo->epAddr[i].fqdn = strdup(vgroupInfo->epAddr[i].fqdn); corVgroupInfo->epAddr[i].port = vgroupInfo->epAddr[i].port; } } + STableMeta* tscCreateTableMetaFromMsg(STableMetaMsg* pTableMetaMsg, size_t* size) { assert(pTableMetaMsg != NULL); @@ -162,9 +163,19 @@ STableMeta* tscCreateTableMetaFromMsg(STableMetaMsg* pTableMetaMsg, size_t* size .numOfColumns = pTableMetaMsg->numOfColumns, }; - pTableMeta->id.tid = pTableMetaMsg->sid; + pTableMeta->id.tid = pTableMetaMsg->tid; pTableMeta->id.uid = pTableMetaMsg->uid; - pTableMeta->vgroupInfo = pTableMetaMsg->vgroup; + + SCMVgroupInfo* pVgroupInfo = &pTableMeta->vgroupInfo; + pVgroupInfo->numOfEps = pTableMetaMsg->vgroup.numOfEps; + pVgroupInfo->vgId = pTableMetaMsg->vgroup.vgId; + + for(int32_t i = 0; i < pVgroupInfo->numOfEps; ++i) { + SEpAddrMsg* pEpMsg = &pTableMetaMsg->vgroup.epAddr[i]; + + pVgroupInfo->epAddr[i].fqdn = strndup(pEpMsg->fqdn, tListLen(pEpMsg->fqdn)); + pVgroupInfo->epAddr[i].port = pEpMsg->port; + } tscInitCorVgroupInfo(&pTableMeta->corVgroupInfo, &pTableMeta->vgroupInfo); diff --git a/src/client/src/tscServer.c b/src/client/src/tscServer.c index 40e050e6a4..a0841fa234 100644 --- a/src/client/src/tscServer.c +++ b/src/client/src/tscServer.c @@ -124,9 +124,11 @@ static void tscUpdateVgroupInfo(SSqlObj *pObj, SRpcEpSet *pEpSet) { pVgroupInfo->inUse = pEpSet->inUse; pVgroupInfo->numOfEps = pEpSet->numOfEps; for (int32_t i = 0; i < pVgroupInfo->numOfEps; i++) { - tstrncpy(pVgroupInfo->epAddr[i].fqdn, pEpSet->fqdn[i], TSDB_FQDN_LEN); + taosTFree(pVgroupInfo->epAddr[i].fqdn); + pVgroupInfo->epAddr[i].fqdn = strndup(pEpSet->fqdn[i], tListLen(pEpSet->fqdn[i])); pVgroupInfo->epAddr[i].port = pEpSet->port[i]; } + tscDebug("after: EndPoint in use: %d", pVgroupInfo->inUse); taosCorEndWrite(&pVgroupInfo->version); } @@ -1648,7 +1650,7 @@ int tscBuildHeartBeatMsg(SSqlObj *pSql, SSqlInfo *pInfo) { int tscProcessTableMetaRsp(SSqlObj *pSql) { STableMetaMsg *pMetaMsg = (STableMetaMsg *)pSql->res.pRsp; - pMetaMsg->sid = htonl(pMetaMsg->sid); + pMetaMsg->tid = htonl(pMetaMsg->tid); pMetaMsg->sversion = htons(pMetaMsg->sversion); pMetaMsg->tversion = htons(pMetaMsg->tversion); pMetaMsg->vgroup.vgId = htonl(pMetaMsg->vgroup.vgId); @@ -1658,9 +1660,9 @@ int tscProcessTableMetaRsp(SSqlObj *pSql) { pMetaMsg->numOfColumns = htons(pMetaMsg->numOfColumns); if ((pMetaMsg->tableType != TSDB_SUPER_TABLE) && - (pMetaMsg->sid <= 0 || pMetaMsg->vgroup.vgId < 2 || pMetaMsg->vgroup.numOfEps <= 0)) { + (pMetaMsg->tid <= 0 || pMetaMsg->vgroup.vgId < 2 || pMetaMsg->vgroup.numOfEps <= 0)) { tscError("invalid value in table numOfEps:%d, vgId:%d tid:%d, name:%s", pMetaMsg->vgroup.numOfEps, pMetaMsg->vgroup.vgId, - pMetaMsg->sid, pMetaMsg->tableId); + pMetaMsg->tid, pMetaMsg->tableId); return TSDB_CODE_TSC_INVALID_VALUE; } @@ -1839,22 +1841,30 @@ int tscProcessSTableVgroupRsp(SSqlObj *pSql) { SSqlCmd* pCmd = &parent->cmd; for(int32_t i = 0; i < pStableVgroup->numOfTables; ++i) { STableMetaInfo *pInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, i); - SVgroupsInfo * pVgroupInfo = (SVgroupsInfo *)pMsg; - pVgroupInfo->numOfVgroups = htonl(pVgroupInfo->numOfVgroups); - size_t size = sizeof(SCMVgroupInfo) * pVgroupInfo->numOfVgroups + sizeof(SVgroupsInfo); - pInfo->vgroupList = calloc(1, size); + SVgroupsMsg * pVgroupMsg = (SVgroupsMsg *) pMsg; + pVgroupMsg->numOfVgroups = htonl(pVgroupMsg->numOfVgroups); + + size_t size = sizeof(SCMVgroupMsg) * pVgroupMsg->numOfVgroups + sizeof(SVgroupsMsg); + + size_t vgroupsz = sizeof(SCMVgroupInfo) * pVgroupMsg->numOfVgroups + sizeof(SVgroupsInfo); + pInfo->vgroupList = calloc(1, vgroupsz); assert(pInfo->vgroupList != NULL); - memcpy(pInfo->vgroupList, pVgroupInfo, size); + pInfo->vgroupList->numOfVgroups = pVgroupMsg->numOfVgroups; for (int32_t j = 0; j < pInfo->vgroupList->numOfVgroups; ++j) { //just init, no need to lock SCMVgroupInfo *pVgroups = &pInfo->vgroupList->vgroups[j]; - pVgroups->vgId = htonl(pVgroups->vgId); - assert(pVgroups->numOfEps >= 1); + + SCMVgroupMsg *vmsg = &pVgroupMsg->vgroups[j]; + pVgroups->vgId = htonl(vmsg->vgId); + pVgroups->numOfEps = vmsg->numOfEps; + + assert(pVgroups->numOfEps >= 1 && pVgroups->vgId >= 1); for (int32_t k = 0; k < pVgroups->numOfEps; ++k) { - pVgroups->epAddr[k].port = htons(pVgroups->epAddr[k].port); + pVgroups->epAddr[k].port = htons(vmsg->epAddr[k].port); + pVgroups->epAddr[k].fqdn = strndup(vmsg->epAddr[k].fqdn, tListLen(vmsg->epAddr[k].fqdn)); } } @@ -1890,7 +1900,7 @@ int tscProcessShowRsp(SSqlObj *pSql) { pMetaMsg->numOfColumns = ntohs(pMetaMsg->numOfColumns); pSchema = pMetaMsg->schema; - pMetaMsg->sid = ntohs(pMetaMsg->sid); + pMetaMsg->tid = ntohs(pMetaMsg->tid); for (int i = 0; i < pMetaMsg->numOfColumns; ++i) { pSchema->bytes = htons(pSchema->bytes); pSchema++; @@ -1924,7 +1934,7 @@ int tscProcessShowRsp(SSqlObj *pSql) { tscColumnListInsert(pQueryInfo->colList, &index); TAOS_FIELD f = tscCreateField(pSchema->type, pSchema->name, pSchema->bytes); - SFieldSupInfo* pInfo = tscFieldInfoAppend(pFieldInfo, &f); + SInternalField* pInfo = tscFieldInfoAppend(pFieldInfo, &f); pInfo->pSqlExpr = tscSqlExprAppend(pQueryInfo, TSDB_FUNC_TS_DUMMY, &index, pTableSchema[i].type, pTableSchema[i].bytes, pTableSchema[i].bytes, false); diff --git a/src/client/src/tscSql.c b/src/client/src/tscSql.c index 9799c9129b..3b291b2271 100644 --- a/src/client/src/tscSql.c +++ b/src/client/src/tscSql.c @@ -373,7 +373,7 @@ int taos_num_fields(TAOS_RES *res) { size_t numOfCols = tscNumOfFields(pQueryInfo); for(int32_t i = 0; i < numOfCols; ++i) { - SFieldSupInfo* pInfo = taosArrayGet(pQueryInfo->fieldsInfo.pSupportInfo, i); + SInternalField* pInfo = taosArrayGet(pQueryInfo->fieldsInfo.internalField, i); if (pInfo->visible) { num++; } @@ -409,8 +409,24 @@ TAOS_FIELD *taos_fetch_fields(TAOS_RES *res) { if (numOfCols == 0) { return NULL; } - - return pQueryInfo->fieldsInfo.pFields->pData; + + SFieldInfo *pFieldInfo = &pQueryInfo->fieldsInfo; + + if (pFieldInfo->final == NULL) { + TAOS_FIELD* f = calloc(pFieldInfo->numOfOutput, sizeof(TAOS_FIELD)); + + int32_t j = 0; + for(int32_t i = 0; i < pFieldInfo->numOfOutput; ++i) { + SInternalField* pField = tscFieldInfoGetInternalField(pFieldInfo, i); + if (pField->visible) { + f[j++] = pField->field; + } + } + + pFieldInfo->final = f; + } + + return pFieldInfo->final; } int taos_retrieve(TAOS_RES *res) { diff --git a/src/client/src/tscStream.c b/src/client/src/tscStream.c index 61614b56fb..0f67911bbe 100644 --- a/src/client/src/tscStream.c +++ b/src/client/src/tscStream.c @@ -168,8 +168,8 @@ static void tscProcessStreamQueryCallback(void *param, TAOS_RES *tres, int numOf STableMetaInfo* pTableMetaInfo = tscGetTableMetaInfoFromCmd(&pStream->pSql->cmd, 0, 0); taosCacheRelease(tscMetaCache, (void**)&(pTableMetaInfo->pTableMeta), true); - taosTFree(pTableMetaInfo->vgroupList); - + pTableMetaInfo->vgroupList = tscVgroupInfoClear(pTableMetaInfo->vgroupList); + tscSetRetryTimer(pStream, pStream->pSql, retryDelay); return; } @@ -275,7 +275,7 @@ static void tscProcessStreamRetrieveResult(void *param, TAOS_RES *res, int numOf tscFreeSqlResult(pSql); taosTFree(pSql->pSubs); pSql->subState.numOfSub = 0; - taosTFree(pTableMetaInfo->vgroupList); + pTableMetaInfo->vgroupList = tscVgroupInfoClear(pTableMetaInfo->vgroupList); tscSetNextLaunchTimer(pStream, pSql); } } diff --git a/src/client/src/tscSubquery.c b/src/client/src/tscSubquery.c index 06e1efc9e4..6b615c3a9b 100644 --- a/src/client/src/tscSubquery.c +++ b/src/client/src/tscSubquery.c @@ -449,7 +449,7 @@ void tscBuildVgroupTableInfo(SSqlObj* pSql, STableMetaInfo* pTableMetaInfo, SArr SVgroupTableInfo info = {{0}}; for (int32_t m = 0; m < pvg->numOfVgroups; ++m) { if (tt->vgId == pvg->vgroups[m].vgId) { - info.vgInfo = pvg->vgroups[m]; + tscSCMVgroupInfoCopy(&info.vgInfo, &pvg->vgroups[m]); break; } } @@ -1142,7 +1142,6 @@ static void tscRetrieveDataRes(void *param, TAOS_RES *tres, int code); static SSqlObj *tscCreateSTableSubquery(SSqlObj *pSql, SRetrieveSupport *trsupport, SSqlObj *prevSqlObj); -// TODO int32_t tscCreateJoinSubquery(SSqlObj *pSql, int16_t tableIndex, SJoinSupporter *pSupporter) { SSqlCmd * pCmd = &pSql->cmd; SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex); @@ -1298,14 +1297,6 @@ void tscHandleMasterJoinQuery(SSqlObj* pSql) { assert((pQueryInfo->type & TSDB_QUERY_TYPE_SUBQUERY) == 0); int32_t code = TSDB_CODE_SUCCESS; - - // todo add test -// SSubqueryState *pState = calloc(1, sizeof(SSubqueryState)); -// if (pState == NULL) { -// code = TSDB_CODE_TSC_OUT_OF_MEMORY; -// goto _error; -// } - pSql->subState.numOfSub = pQueryInfo->numOfTables; bool hasEmptySub = false; @@ -1645,9 +1636,9 @@ static void tscAllDataRetrievedFromDnode(SRetrieveSupport *trsupport, SSqlObj* p // data in from current vnode is stored in cache and disk uint32_t numOfRowsFromSubquery = (uint32_t)(trsupport->pExtMemBuffer[idx]->numOfTotalElems + trsupport->localBuffer->num); - tscDebug("%p sub:%p all data retrieved from ep:%s, vgId:%d, numOfRows:%d, orderOfSub:%d", pParentSql, pSql, - pTableMetaInfo->vgroupList->vgroups[0].epAddr[0].fqdn, pTableMetaInfo->vgroupList->vgroups[0].vgId, - numOfRowsFromSubquery, idx); + SVgroupsInfo* vgroupsInfo = pTableMetaInfo->vgroupList; + tscDebug("%p sub:%p all data retrieved from ep:%s, vgId:%d, numOfRows:%d, orderOfSub:%d", pParentSql, pSql, + vgroupsInfo->vgroups[0].epAddr[0].fqdn, vgroupsInfo->vgroups[0].vgId, numOfRowsFromSubquery, idx); tColModelCompact(pDesc->pColumnModel, trsupport->localBuffer, pDesc->pColumnModel->capacity); @@ -2022,7 +2013,7 @@ int32_t tscHandleMultivnodeInsert(SSqlObj *pSql) { static char* getResultBlockPosition(SSqlCmd* pCmd, SSqlRes* pRes, int32_t columnIndex, int16_t* bytes) { SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex); - SFieldSupInfo* pInfo = (SFieldSupInfo*) TARRAY_GET_ELEM(pQueryInfo->fieldsInfo.pSupportInfo, columnIndex); + SInternalField* pInfo = (SInternalField*) TARRAY_GET_ELEM(pQueryInfo->fieldsInfo.internalField, columnIndex); assert(pInfo->pSqlExpr != NULL); *bytes = pInfo->pSqlExpr->resBytes; @@ -2172,7 +2163,7 @@ TAOS_ROW doSetResultRowData(SSqlObj *pSql, bool finalResult) { size_t size = tscNumOfFields(pQueryInfo); for (int i = 0; i < size; ++i) { - SFieldSupInfo* pSup = TARRAY_GET_ELEM(pQueryInfo->fieldsInfo.pSupportInfo, i); + SInternalField* pSup = TARRAY_GET_ELEM(pQueryInfo->fieldsInfo.internalField, i); if (pSup->pSqlExpr != NULL) { tscGetResultColumnChr(pRes, &pQueryInfo->fieldsInfo, i); } @@ -2182,7 +2173,7 @@ TAOS_ROW doSetResultRowData(SSqlObj *pSql, bool finalResult) { continue; } - TAOS_FIELD *pField = TARRAY_GET_ELEM(pQueryInfo->fieldsInfo.pFields, i); + TAOS_FIELD *pField = TARRAY_GET_ELEM(pQueryInfo->fieldsInfo.internalField, i); if (pRes->tsrow[i] != NULL && pField->type == TSDB_DATA_TYPE_NCHAR) { transferNcharData(pSql, i, pField); } diff --git a/src/client/src/tscSystem.c b/src/client/src/tscSystem.c index 4c5dbb079f..47c2d35a75 100644 --- a/src/client/src/tscSystem.c +++ b/src/client/src/tscSystem.c @@ -77,6 +77,7 @@ int32_t tscInitRpc(const char *user, const char *secretEncrypt, void **pDnodeCon return 0; } + void taos_init_imp(void) { char temp[128]; @@ -124,8 +125,9 @@ void taos_init_imp(void) { double factor = (tscEmbedded == 0)? 2.0:4.0; tscNumOfThreads = (int)(tsNumOfCores * tsNumOfThreadsPerCore / factor); - - if (tscNumOfThreads < 2) tscNumOfThreads = 2; + if (tscNumOfThreads < 2) { + tscNumOfThreads = 2; + } tscQhandle = taosInitScheduler(queueSize, tscNumOfThreads, "tsc"); if (NULL == tscQhandle) { @@ -140,7 +142,7 @@ void taos_init_imp(void) { int64_t refreshTime = 10; // 10 seconds by default if (tscMetaCache == NULL) { - tscMetaCache = taosCacheInit(TSDB_DATA_TYPE_BINARY, refreshTime, false, NULL, "tableMeta"); + tscMetaCache = taosCacheInit(TSDB_DATA_TYPE_BINARY, refreshTime, false, tscFreeTableMetaHelper, "tableMeta"); tscObjCache = taosCacheInit(TSDB_CACHE_PTR_KEY, refreshTime / 2, false, tscFreeRegisteredSqlObj, "sqlObj"); } diff --git a/src/client/src/tscUtil.c b/src/client/src/tscUtil.c index a38be04ae0..7e1c378584 100644 --- a/src/client/src/tscUtil.c +++ b/src/client/src/tscUtil.c @@ -408,6 +408,24 @@ void tscFreeRegisteredSqlObj(void *pSql) { } } +void tscFreeTableMetaHelper(void *pTableMeta) { + STableMeta* p = (STableMeta*) pTableMeta; + + int32_t numOfEps = p->vgroupInfo.numOfEps; + assert(numOfEps >= 0 && numOfEps <= TSDB_MAX_REPLICA); + + for(int32_t i = 0; i < numOfEps; ++i) { + taosTFree(p->vgroupInfo.epAddr[i].fqdn); + } + + int32_t numOfEps1 = p->corVgroupInfo.numOfEps; + assert(numOfEps1 >= 0 && numOfEps1 <= TSDB_MAX_REPLICA); + + for(int32_t i = 0; i < numOfEps1; ++i) { + taosTFree(p->corVgroupInfo.epAddr[i].fqdn); + } +} + void tscFreeSqlObj(SSqlObj* pSql) { if (pSql == NULL || pSql->signature != pSql) { return; @@ -829,35 +847,30 @@ TAOS_FIELD tscCreateField(int8_t type, const char* name, int16_t bytes) { return f; } -SFieldSupInfo* tscFieldInfoAppend(SFieldInfo* pFieldInfo, TAOS_FIELD* pField) { +SInternalField* tscFieldInfoAppend(SFieldInfo* pFieldInfo, TAOS_FIELD* pField) { assert(pFieldInfo != NULL); - taosArrayPush(pFieldInfo->pFields, pField); pFieldInfo->numOfOutput++; - struct SFieldSupInfo info = { + struct SInternalField info = { .pSqlExpr = NULL, .pArithExprInfo = NULL, .visible = true, }; - - return taosArrayPush(pFieldInfo->pSupportInfo, &info); + + info.field = *pField; + return taosArrayPush(pFieldInfo->internalField, &info); } -SFieldSupInfo* tscFieldInfoGetSupp(SFieldInfo* pFieldInfo, int32_t index) { - return TARRAY_GET_ELEM(pFieldInfo->pSupportInfo, index); -} - -SFieldSupInfo* tscFieldInfoInsert(SFieldInfo* pFieldInfo, int32_t index, TAOS_FIELD* field) { - taosArrayInsert(pFieldInfo->pFields, index, field); +SInternalField* tscFieldInfoInsert(SFieldInfo* pFieldInfo, int32_t index, TAOS_FIELD* field) { pFieldInfo->numOfOutput++; - - struct SFieldSupInfo info = { + struct SInternalField info = { .pSqlExpr = NULL, .pArithExprInfo = NULL, .visible = true, }; - - return taosArrayInsert(pFieldInfo->pSupportInfo, index, &info); + + info.field = *field; + return taosArrayInsert(pFieldInfo->internalField, index, &info); } void tscFieldInfoUpdateOffset(SQueryInfo* pQueryInfo) { @@ -891,29 +904,18 @@ void tscFieldInfoUpdateOffsetForInterResult(SQueryInfo* pQueryInfo) { } } -void tscFieldInfoCopy(SFieldInfo* dst, const SFieldInfo* src) { - dst->numOfOutput = src->numOfOutput; - - if (dst->pFields == NULL) { - dst->pFields = taosArrayClone(src->pFields); - } else { - taosArrayCopy(dst->pFields, src->pFields); - } - - if (dst->pSupportInfo == NULL) { - dst->pSupportInfo = taosArrayClone(src->pSupportInfo); - } else { - taosArrayCopy(dst->pSupportInfo, src->pSupportInfo); - } +SInternalField* tscFieldInfoGetInternalField(SFieldInfo* pFieldInfo, int32_t index) { + assert(index < pFieldInfo->numOfOutput); + return TARRAY_GET_ELEM(pFieldInfo->internalField, index); } TAOS_FIELD* tscFieldInfoGetField(SFieldInfo* pFieldInfo, int32_t index) { assert(index < pFieldInfo->numOfOutput); - return TARRAY_GET_ELEM(pFieldInfo->pFields, index); + return &((SInternalField*)TARRAY_GET_ELEM(pFieldInfo->internalField, index))->field; } int16_t tscFieldInfoGetOffset(SQueryInfo* pQueryInfo, int32_t index) { - SFieldSupInfo* pInfo = tscFieldInfoGetSupp(&pQueryInfo->fieldsInfo, index); + SInternalField* pInfo = tscFieldInfoGetInternalField(&pQueryInfo->fieldsInfo, index); assert(pInfo != NULL && pInfo->pSqlExpr != NULL); return pInfo->pSqlExpr->offset; @@ -960,10 +962,8 @@ void tscFieldInfoClear(SFieldInfo* pFieldInfo) { return; } - taosArrayDestroy(pFieldInfo->pFields); - for(int32_t i = 0; i < pFieldInfo->numOfOutput; ++i) { - SFieldSupInfo* pInfo = taosArrayGet(pFieldInfo->pSupportInfo, i); + SInternalField* pInfo = taosArrayGet(pFieldInfo->internalField, i); if (pInfo->pArithExprInfo != NULL) { tExprTreeDestroy(&pInfo->pArithExprInfo->pExpr, NULL); @@ -971,7 +971,9 @@ void tscFieldInfoClear(SFieldInfo* pFieldInfo) { } } - taosArrayDestroy(pFieldInfo->pSupportInfo); + taosArrayDestroy(pFieldInfo->internalField); + taosTFree(pFieldInfo->final); + memset(pFieldInfo, 0, sizeof(SFieldInfo)); } @@ -1615,11 +1617,8 @@ STableMetaInfo* tscGetTableMetaInfoByUid(SQueryInfo* pQueryInfo, uint64_t uid, i } void tscInitQueryInfo(SQueryInfo* pQueryInfo) { - assert(pQueryInfo->fieldsInfo.pFields == NULL); - pQueryInfo->fieldsInfo.pFields = taosArrayInit(4, sizeof(TAOS_FIELD)); - - assert(pQueryInfo->fieldsInfo.pSupportInfo == NULL); - pQueryInfo->fieldsInfo.pSupportInfo = taosArrayInit(4, sizeof(SFieldSupInfo)); + assert(pQueryInfo->fieldsInfo.internalField == NULL); + pQueryInfo->fieldsInfo.internalField = taosArrayInit(4, sizeof(SInternalField)); assert(pQueryInfo->exprList == NULL); pQueryInfo->exprList = taosArrayInit(4, POINTER_BYTES); @@ -1682,8 +1681,14 @@ void tscClearSubqueryInfo(SSqlCmd* pCmd) { void tscFreeVgroupTableInfo(SArray* pVgroupTables) { if (pVgroupTables != NULL) { - for (size_t i = 0; i < taosArrayGetSize(pVgroupTables); i++) { + size_t num = taosArrayGetSize(pVgroupTables); + for (size_t i = 0; i < num; i++) { SVgroupTableInfo* pInfo = taosArrayGet(pVgroupTables, i); + + for(int32_t j = 0; j < pInfo->vgInfo.numOfEps; ++j) { + taosTFree(pInfo->vgInfo.epAddr[j].fqdn); + } + taosArrayDestroy(pInfo->itemList); } taosArrayDestroy(pVgroupTables); @@ -1695,6 +1700,7 @@ void clearAllTableMetaInfo(SQueryInfo* pQueryInfo, const char* address, bool rem for(int32_t i = 0; i < pQueryInfo->numOfTables; ++i) { STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, i); + tscFreeVgroupTableInfo(pTableMetaInfo->pVgroupTables); tscClearTableMetaInfo(pTableMetaInfo, removeFromCache); free(pTableMetaInfo); @@ -1727,13 +1733,7 @@ STableMetaInfo* tscAddTableMetaInfo(SQueryInfo* pQueryInfo, const char* name, ST pTableMetaInfo->pTableMeta = pTableMeta; if (vgroupList != NULL) { - size_t size = sizeof(SVgroupsInfo) + sizeof(SCMVgroupInfo) * vgroupList->numOfVgroups; - pTableMetaInfo->vgroupList = malloc(size); - if (pTableMetaInfo->vgroupList == NULL) { - return NULL; - } - - memcpy(pTableMetaInfo->vgroupList, vgroupList, size); + pTableMetaInfo->vgroupList = tscVgroupInfoClone(vgroupList); } pTableMetaInfo->tagColList = taosArrayInit(4, POINTER_BYTES); @@ -1762,8 +1762,7 @@ void tscClearTableMetaInfo(STableMetaInfo* pTableMetaInfo, bool removeFromCache) taosCacheRelease(tscMetaCache, (void**)&(pTableMetaInfo->pTableMeta), removeFromCache); } - taosTFree(pTableMetaInfo->vgroupList); - + pTableMetaInfo->vgroupList = tscVgroupInfoClear(pTableMetaInfo->vgroupList); tscColumnListDestroy(pTableMetaInfo->tagColList); pTableMetaInfo->tagColList = NULL; } @@ -1831,54 +1830,23 @@ SSqlObj* createSimpleSubObj(SSqlObj* pSql, void (*fp)(), void* param, int32_t cm return pNew; } -// current sql function is not direct output result, so create a dummy output field -static void doSetNewFieldInfo(SQueryInfo* pNewQueryInfo, SSqlExpr* pExpr) { - TAOS_FIELD f = {.type = (uint8_t)pExpr->resType, .bytes = pExpr->resBytes}; - tstrncpy(f.name, pExpr->aliasName, sizeof(f.name)); - - SFieldSupInfo* pInfo1 = tscFieldInfoAppend(&pNewQueryInfo->fieldsInfo, &f); - - pInfo1->pSqlExpr = pExpr; - pInfo1->visible = false; -} - static void doSetSqlExprAndResultFieldInfo(SQueryInfo* pQueryInfo, SQueryInfo* pNewQueryInfo, int64_t uid) { int32_t numOfOutput = (int32_t)tscSqlExprNumOfExprs(pNewQueryInfo); if (numOfOutput == 0) { return; } - size_t numOfExprs = tscSqlExprNumOfExprs(pQueryInfo); - SFieldInfo* pFieldInfo = &pQueryInfo->fieldsInfo; - - // set the field info in pNewQueryInfo object + // set the field info in pNewQueryInfo object according to sqlExpr information + size_t numOfExprs = tscSqlExprNumOfExprs(pNewQueryInfo); for (int32_t i = 0; i < numOfExprs; ++i) { - SSqlExpr* pExpr = tscSqlExprGet(pQueryInfo, i); + SSqlExpr* pExpr = tscSqlExprGet(pNewQueryInfo, i); - if (pExpr->uid == uid) { - if (i < pFieldInfo->numOfOutput) { - SFieldSupInfo* pInfo = tscFieldInfoGetSupp(pFieldInfo, i); - - if (pInfo->pSqlExpr != NULL) { - TAOS_FIELD* p = tscFieldInfoGetField(pFieldInfo, i); - assert(strcmp(p->name, pExpr->aliasName) == 0); - - SFieldSupInfo* pInfo1 = tscFieldInfoAppend(&pNewQueryInfo->fieldsInfo, p); - *pInfo1 = *pInfo; - } else { - assert(pInfo->pArithExprInfo != NULL); - doSetNewFieldInfo(pNewQueryInfo, pExpr); - } - } else { // it is a arithmetic column, does not have actual field for sqlExpr, so build it - doSetNewFieldInfo(pNewQueryInfo, pExpr); - } - } + TAOS_FIELD f = tscCreateField((int8_t) pExpr->resType, pExpr->aliasName, pExpr->resBytes); + SInternalField* pInfo1 = tscFieldInfoAppend(&pNewQueryInfo->fieldsInfo, &f); + pInfo1->pSqlExpr = pExpr; } - // make sure the the sqlExpr for each fields is correct - numOfExprs = tscSqlExprNumOfExprs(pNewQueryInfo); - - // update the pSqlExpr pointer in SFieldSupInfo according the field name + // update the pSqlExpr pointer in SInternalField according the field name // make sure the pSqlExpr point to the correct SqlExpr in pNewQueryInfo, not SqlExpr in pQueryInfo for (int32_t f = 0; f < pNewQueryInfo->fieldsInfo.numOfOutput; ++f) { TAOS_FIELD* field = tscFieldInfoGetField(&pNewQueryInfo->fieldsInfo, f); @@ -1888,7 +1856,7 @@ static void doSetSqlExprAndResultFieldInfo(SQueryInfo* pQueryInfo, SQueryInfo* p SSqlExpr* pExpr1 = tscSqlExprGet(pNewQueryInfo, k1); if (strcmp(field->name, pExpr1->aliasName) == 0) { // establish link according to the result field name - SFieldSupInfo* pInfo = tscFieldInfoGetSupp(&pNewQueryInfo->fieldsInfo, f); + SInternalField* pInfo = tscFieldInfoGetInternalField(&pNewQueryInfo->fieldsInfo, f); pInfo->pSqlExpr = pExpr1; matched = true; @@ -2403,3 +2371,58 @@ void tscClearSqlOwner(SSqlObj* pSql) { assert(taosCheckPthreadValid(pSql->owner)); atomic_store_64(&pSql->owner, 0); } + +SVgroupsInfo* tscVgroupInfoClone(SVgroupsInfo *vgroupList) { + if (vgroupList == NULL) { + return NULL; + } + + size_t size = sizeof(SVgroupsInfo) + sizeof(SCMVgroupInfo) * vgroupList->numOfVgroups; + SVgroupsInfo* pNew = calloc(1, size); + if (pNew == NULL) { + return NULL; + } + + pNew->numOfVgroups = vgroupList->numOfVgroups; + + for(int32_t i = 0; i < vgroupList->numOfVgroups; ++i) { + SCMVgroupInfo* pNewVInfo = &pNew->vgroups[i]; + + SCMVgroupInfo* pvInfo = &vgroupList->vgroups[i]; + pNewVInfo->vgId = pvInfo->vgId; + pNewVInfo->numOfEps = pvInfo->numOfEps; + + for(int32_t j = 0; j < pvInfo->numOfEps; ++j) { + pNewVInfo->epAddr[j].fqdn = strdup(pvInfo->epAddr[j].fqdn); + pNewVInfo->epAddr[j].port = pvInfo->epAddr[j].port; + } + } + + return pNew; +} + +void* tscVgroupInfoClear(SVgroupsInfo *vgroupList) { + if (vgroupList == NULL) { + return NULL; + } + + for(int32_t i = 0; i < vgroupList->numOfVgroups; ++i) { + SCMVgroupInfo* pVgroupInfo = &vgroupList->vgroups[i]; + + for(int32_t j = 0; j < pVgroupInfo->numOfEps; ++j) { + taosTFree(pVgroupInfo->epAddr[j].fqdn); + } + } + + taosTFree(vgroupList); + return NULL; +} + +void tscSCMVgroupInfoCopy(SCMVgroupInfo* dst, const SCMVgroupInfo* src) { + dst->vgId = src->vgId; + dst->numOfEps = src->numOfEps; + for(int32_t i = 0; i < dst->numOfEps; ++i) { + dst->epAddr[i].port = src->epAddr[i].port; + dst->epAddr[i].fqdn = strdup(src->epAddr[i].fqdn); + } +} diff --git a/src/dnode/src/dnodeShell.c b/src/dnode/src/dnodeShell.c index a5c5d4759b..4c6c2100e0 100644 --- a/src/dnode/src/dnodeShell.c +++ b/src/dnode/src/dnodeShell.c @@ -173,15 +173,15 @@ static int dnodeRetrieveUserAuthInfo(char *user, char *spi, char *encrypt, char return rpcRsp.code; } -void *dnodeSendCfgTableToRecv(int32_t vgId, int32_t sid) { - dDebug("vgId:%d, sid:%d send config table msg to mnode", vgId, sid); +void *dnodeSendCfgTableToRecv(int32_t vgId, int32_t tid) { + dDebug("vgId:%d, tid:%d send config table msg to mnode", vgId, tid); int32_t contLen = sizeof(SDMConfigTableMsg); SDMConfigTableMsg *pMsg = rpcMallocCont(contLen); pMsg->dnodeId = htonl(dnodeGetDnodeId()); pMsg->vgId = htonl(vgId); - pMsg->sid = htonl(sid); + pMsg->tid = htonl(tid); SRpcMsg rpcMsg = {0}; rpcMsg.pCont = pMsg; @@ -194,18 +194,18 @@ void *dnodeSendCfgTableToRecv(int32_t vgId, int32_t sid) { if (rpcRsp.code != 0) { rpcFreeCont(rpcRsp.pCont); - dError("vgId:%d, sid:%d failed to config table from mnode", vgId, sid); + dError("vgId:%d, tid:%d failed to config table from mnode", vgId, tid); return NULL; } else { - dInfo("vgId:%d, sid:%d config table msg is received", vgId, sid); + dInfo("vgId:%d, tid:%d config table msg is received", vgId, tid); // delete this after debug finished SMDCreateTableMsg *pTable = rpcRsp.pCont; int16_t numOfColumns = htons(pTable->numOfColumns); int16_t numOfTags = htons(pTable->numOfTags); - int32_t sid = htonl(pTable->sid); + int32_t tableId = htonl(pTable->tid); uint64_t uid = htobe64(pTable->uid); - dInfo("table:%s, numOfColumns:%d numOfTags:%d sid:%d uid:%" PRIu64, pTable->tableId, numOfColumns, numOfTags, sid, uid); + dInfo("table:%s, numOfColumns:%d numOfTags:%d tid:%d uid:%" PRIu64, pTable->tableId, numOfColumns, numOfTags, tableId, uid); return rpcRsp.pCont; } diff --git a/src/dnode/src/dnodeVRead.c b/src/dnode/src/dnodeVRead.c index 654e35cad5..e61158ef30 100644 --- a/src/dnode/src/dnodeVRead.c +++ b/src/dnode/src/dnodeVRead.c @@ -212,7 +212,8 @@ static void *dnodeProcessReadQueue(void *param) { } else { if (code == TSDB_CODE_QRY_HAS_RSP) { dnodeSendRpcReadRsp(pVnode, pReadMsg, pReadMsg->rpcMsg.code); - } else { // code == TSDB_CODE_NOT_READY, do not return msg to client + } else { // code == TSDB_CODE_QRY_NOT_READY, do not return msg to client + assert(pReadMsg->rpcMsg.handle == NULL || (pReadMsg->rpcMsg.handle != NULL && pReadMsg->rpcMsg.msgType == 5)); dnodeDispatchNonRspMsg(pVnode, pReadMsg, code); } } diff --git a/src/inc/dnode.h b/src/inc/dnode.h index 83d2a4ad9c..e84545be17 100644 --- a/src/inc/dnode.h +++ b/src/inc/dnode.h @@ -49,7 +49,7 @@ void dnodeAddClientRspHandle(uint8_t msgType, void (*fp)(SRpcMsg *rpcMsg)); void dnodeSendMsgToDnode(SRpcEpSet *epSet, SRpcMsg *rpcMsg); void dnodeSendMsgToMnodeRecv(SRpcMsg *rpcMsg, SRpcMsg *rpcRsp); void dnodeSendMsgToDnodeRecv(SRpcMsg *rpcMsg, SRpcMsg *rpcRsp, SRpcEpSet *epSet); -void *dnodeSendCfgTableToRecv(int32_t vgId, int32_t sid); +void *dnodeSendCfgTableToRecv(int32_t vgId, int32_t tid); void *dnodeAllocateVnodeWqueue(void *pVnode); void dnodeFreeVnodeWqueue(void *queue); diff --git a/src/inc/taosmsg.h b/src/inc/taosmsg.h index be66c76d71..600347c44f 100644 --- a/src/inc/taosmsg.h +++ b/src/inc/taosmsg.h @@ -182,10 +182,16 @@ extern char *taosMsg[]; #pragma pack(push, 1) +// null-terminated string instead of char array to avoid too many memory consumption in case of more than 1M tableMeta typedef struct { char fqdn[TSDB_FQDN_LEN]; uint16_t port; -} SEpAddr; +} SEpAddrMsg; + +typedef struct { + char* fqdn; + uint16_t port; +} SEpAddr1; typedef struct { int32_t numOfVnodes; @@ -245,7 +251,7 @@ typedef struct { int8_t tableType; int16_t numOfColumns; int16_t numOfTags; - int32_t sid; + int32_t tid; int32_t sversion; int32_t tversion; int32_t tagDataLen; @@ -354,7 +360,7 @@ typedef struct { typedef struct { int32_t contLen; int32_t vgId; - int32_t sid; + int32_t tid; uint64_t uid; char tableId[TSDB_TABLE_FNAME_LEN]; } SMDDropTableMsg; @@ -401,6 +407,7 @@ typedef struct SExprInfo { int16_t bytes; int16_t type; int32_t interBytes; + int64_t uid; } SExprInfo; typedef struct SColumnFilterInfo { @@ -662,16 +669,27 @@ typedef struct SCMSTableVgroupMsg { } SCMSTableVgroupMsg, SCMSTableVgroupRspMsg; typedef struct { - int32_t vgId; - int8_t numOfEps; - SEpAddr epAddr[TSDB_MAX_REPLICA]; + int32_t vgId; + int8_t numOfEps; + SEpAddr1 epAddr[TSDB_MAX_REPLICA]; } SCMVgroupInfo; +typedef struct { + int32_t vgId; + int8_t numOfEps; + SEpAddrMsg epAddr[TSDB_MAX_REPLICA]; +} SCMVgroupMsg; + typedef struct { int32_t numOfVgroups; SCMVgroupInfo vgroups[]; } SVgroupsInfo; +typedef struct { + int32_t numOfVgroups; + SCMVgroupMsg vgroups[]; +} SVgroupsMsg; + typedef struct STableMetaMsg { int32_t contLen; char tableId[TSDB_TABLE_FNAME_LEN]; // table id @@ -682,9 +700,9 @@ typedef struct STableMetaMsg { int16_t numOfColumns; int16_t sversion; int16_t tversion; - int32_t sid; + int32_t tid; uint64_t uid; - SCMVgroupInfo vgroup; + SCMVgroupMsg vgroup; SSchema schema[]; } STableMetaMsg; @@ -730,7 +748,7 @@ typedef struct { typedef struct { int32_t dnodeId; int32_t vgId; - int32_t sid; + int32_t tid; } SDMConfigTableMsg; typedef struct { diff --git a/src/mnode/inc/mnodeDef.h b/src/mnode/inc/mnodeDef.h index 8a2947dd18..4bc840f026 100644 --- a/src/mnode/inc/mnodeDef.h +++ b/src/mnode/inc/mnodeDef.h @@ -115,7 +115,7 @@ typedef struct { uint64_t suid; int64_t createdTime; int32_t numOfColumns; //used by normal table - int32_t sid; + int32_t tid; int32_t vgId; int32_t sqlLen; int8_t updateEnd[4]; diff --git a/src/mnode/src/mnodeTable.c b/src/mnode/src/mnodeTable.c index 8545e8790d..82a062169a 100644 --- a/src/mnode/src/mnodeTable.c +++ b/src/mnode/src/mnodeTable.c @@ -303,7 +303,7 @@ static int32_t mnodeChildTableActionRestored() { SVgObj *pVgroup = mnodeGetVgroup(pTable->vgId); if (pVgroup == NULL) { - mError("ctable:%s, failed to get vgId:%d sid:%d, discard it", pTable->info.tableId, pTable->vgId, pTable->sid); + mError("ctable:%s, failed to get vgId:%d tid:%d, discard it", pTable->info.tableId, pTable->vgId, pTable->tid); pTable->vgId = 0; SSdbOper desc = {.type = SDB_OPER_LOCAL, .pObj = pTable, .table = tsChildTableSdb}; sdbDeleteRow(&desc); @@ -314,7 +314,7 @@ static int32_t mnodeChildTableActionRestored() { if (strcmp(pVgroup->dbName, pDb->name) != 0) { mError("ctable:%s, db:%s not match with vgId:%d db:%s sid:%d, discard it", - pTable->info.tableId, pDb->name, pTable->vgId, pVgroup->dbName, pTable->sid); + pTable->info.tableId, pDb->name, pTable->vgId, pVgroup->dbName, pTable->tid); pTable->vgId = 0; SSdbOper desc = {.type = SDB_OPER_LOCAL, .pObj = pTable, .table = tsChildTableSdb}; sdbDeleteRow(&desc); @@ -771,8 +771,8 @@ static int32_t mnodeProcessDropTableMsg(SMnodeMsg *pMsg) { return mnodeProcessDropSuperTableMsg(pMsg); } else { SChildTableObj *pCTable = (SChildTableObj *)pMsg->pTable; - mInfo("app:%p:%p, table:%s, start to drop ctable, vgId:%d sid:%d uid:%" PRIu64, pMsg->rpcMsg.ahandle, pMsg, - pDrop->tableId, pCTable->vgId, pCTable->sid, pCTable->uid); + mInfo("app:%p:%p, table:%s, start to drop ctable, vgId:%d tid:%d uid:%" PRIu64, pMsg->rpcMsg.ahandle, pMsg, + pDrop->tableId, pCTable->vgId, pCTable->tid, pCTable->uid); return mnodeProcessDropChildTableMsg(pMsg); } } @@ -1476,13 +1476,14 @@ static int32_t mnodeProcessSuperTableVgroupMsg(SMnodeMsg *pMsg) { int32_t numOfTable = htonl(pInfo->numOfTables); // reserve space - int32_t contLen = sizeof(SCMSTableVgroupRspMsg) + 32 * sizeof(SCMVgroupInfo) + sizeof(SVgroupsInfo); + int32_t contLen = sizeof(SCMSTableVgroupRspMsg) + 32 * sizeof(SCMVgroupMsg) + sizeof(SVgroupsMsg); for (int32_t i = 0; i < numOfTable; ++i) { char *stableName = (char*)pInfo + sizeof(SCMSTableVgroupMsg) + (TSDB_TABLE_FNAME_LEN) * i; SSuperTableObj *pTable = mnodeGetSuperTable(stableName); if (pTable != NULL && pTable->vgHash != NULL) { - contLen += (taosHashGetSize(pTable->vgHash) * sizeof(SCMVgroupInfo) + sizeof(SVgroupsInfo)); + contLen += (taosHashGetSize(pTable->vgHash) * sizeof(SCMVgroupMsg) + sizeof(SVgroupsMsg)); } + mnodeDecTableRef(pTable); } @@ -1510,12 +1511,12 @@ static int32_t mnodeProcessSuperTableVgroupMsg(SMnodeMsg *pMsg) { // even this super table has no corresponding table, still return pRsp->numOfTables++; - SVgroupsInfo *pVgroupInfo = (SVgroupsInfo *)msg; - pVgroupInfo->numOfVgroups = 0; + SVgroupsMsg *pVgroupMsg = (SVgroupsMsg *)msg; + pVgroupMsg->numOfVgroups = 0; - msg += sizeof(SVgroupsInfo); + msg += sizeof(SVgroupsMsg); } else { - SVgroupsInfo *pVgroupInfo = (SVgroupsInfo *)msg; + SVgroupsMsg *pVgroupMsg = (SVgroupsMsg *)msg; SHashMutableIterator *pIter = taosHashCreateIter(pTable->vgHash); int32_t vgSize = 0; @@ -1524,15 +1525,17 @@ static int32_t mnodeProcessSuperTableVgroupMsg(SMnodeMsg *pMsg) { SVgObj * pVgroup = mnodeGetVgroup(*pVgId); if (pVgroup == NULL) continue; - pVgroupInfo->vgroups[vgSize].vgId = htonl(pVgroup->vgId); + pVgroupMsg->vgroups[vgSize].vgId = htonl(pVgroup->vgId); + pVgroupMsg->vgroups[vgSize].numOfEps = 0; + for (int32_t vn = 0; vn < pVgroup->numOfVnodes; ++vn) { SDnodeObj *pDnode = pVgroup->vnodeGid[vn].pDnode; if (pDnode == NULL) break; - tstrncpy(pVgroupInfo->vgroups[vgSize].epAddr[vn].fqdn, pDnode->dnodeFqdn, TSDB_FQDN_LEN); - pVgroupInfo->vgroups[vgSize].epAddr[vn].port = htons(pDnode->dnodePort); + tstrncpy(pVgroupMsg->vgroups[vgSize].epAddr[vn].fqdn, pDnode->dnodeFqdn, TSDB_FQDN_LEN); + pVgroupMsg->vgroups[vgSize].epAddr[vn].port = htons(pDnode->dnodePort); - pVgroupInfo->vgroups[vgSize].numOfEps++; + pVgroupMsg->vgroups[vgSize].numOfEps++; } vgSize++; @@ -1542,10 +1545,10 @@ static int32_t mnodeProcessSuperTableVgroupMsg(SMnodeMsg *pMsg) { taosHashDestroyIter(pIter); mnodeDecTableRef(pTable); - pVgroupInfo->numOfVgroups = htonl(vgSize); + pVgroupMsg->numOfVgroups = htonl(vgSize); // one table is done, try the next table - msg += sizeof(SVgroupsInfo) + vgSize * sizeof(SCMVgroupInfo); + msg += sizeof(SVgroupsMsg) + vgSize * sizeof(SCMVgroupMsg); pRsp->numOfTables++; } } @@ -1595,7 +1598,7 @@ static void *mnodeBuildCreateChildTableMsg(SCMCreateTableMsg *pMsg, SChildTableO pCreate->vgId = htonl(pTable->vgId); pCreate->tableType = pTable->info.type; pCreate->createdTime = htobe64(pTable->createdTime); - pCreate->sid = htonl(pTable->sid); + pCreate->tid = htonl(pTable->tid); pCreate->sqlDataLen = htonl(pTable->sqlLen); pCreate->uid = htobe64(pTable->uid); @@ -1644,7 +1647,7 @@ static int32_t mnodeDoCreateChildTableFp(SMnodeMsg *pMsg) { assert(pTable); mDebug("app:%p:%p, table:%s, created in mnode, vgId:%d sid:%d, uid:%" PRIu64, pMsg->rpcMsg.ahandle, pMsg, - pTable->info.tableId, pTable->vgId, pTable->sid, pTable->uid); + pTable->info.tableId, pTable->vgId, pTable->tid, pTable->uid); SCMCreateTableMsg *pCreate = pMsg->rpcMsg.pCont; SMDCreateTableMsg *pMDCreate = mnodeBuildCreateChildTableMsg(pCreate, pTable); @@ -1686,7 +1689,7 @@ static int32_t mnodeDoCreateChildTableCb(SMnodeMsg *pMsg, int32_t code) { return TSDB_CODE_MND_ACTION_IN_PROGRESS; } else { mError("app:%p:%p, table:%s, failed to create table sid:%d, uid:%" PRIu64 ", reason:%s", pMsg->rpcMsg.ahandle, pMsg, - pTable->info.tableId, pTable->sid, pTable->uid, tstrerror(code)); + pTable->info.tableId, pTable->tid, pTable->uid, tstrerror(code)); SSdbOper desc = {.type = SDB_OPER_GLOBAL, .pObj = pTable, .table = tsChildTableSdb}; sdbDeleteRow(&desc); return code; @@ -1710,7 +1713,7 @@ static int32_t mnodeDoCreateChildTable(SMnodeMsg *pMsg, int32_t tid) { pTable->info.tableId = strdup(pCreate->tableId); pTable->createdTime = taosGetTimestampMs(); - pTable->sid = tid; + pTable->tid = tid; pTable->vgId = pVgroup->vgId; if (pTable->info.type == TSDB_CHILD_TABLE) { @@ -1724,7 +1727,7 @@ static int32_t mnodeDoCreateChildTable(SMnodeMsg *pMsg, int32_t tid) { } pTable->suid = pMsg->pSTable->uid; - pTable->uid = (((uint64_t)pTable->vgId) << 48) + ((((uint64_t)pTable->sid) & ((1ul << 24) - 1ul)) << 24) + + pTable->uid = (((uint64_t)pTable->vgId) << 48) + ((((uint64_t)pTable->tid) & ((1ul << 24) - 1ul)) << 24) + ((sdbGetVersion() & ((1ul << 16) - 1ul)) << 8) + (taosRand() & ((1ul << 8) - 1ul)); pTable->superTable = pMsg->pSTable; } else { @@ -1732,7 +1735,7 @@ static int32_t mnodeDoCreateChildTable(SMnodeMsg *pMsg, int32_t tid) { int64_t us = taosGetTimestampUs(); pTable->uid = (us << 24) + ((sdbGetVersion() & ((1ul << 16) - 1ul)) << 8) + (taosRand() & ((1ul << 8) - 1ul)); } else { - pTable->uid = (((uint64_t)pTable->vgId) << 48) + ((((uint64_t)pTable->sid) & ((1ul << 24) - 1ul)) << 24) + + pTable->uid = (((uint64_t)pTable->vgId) << 48) + ((((uint64_t)pTable->tid) & ((1ul << 24) - 1ul)) << 24) + ((sdbGetVersion() & ((1ul << 16) - 1ul)) << 8) + (taosRand() & ((1ul << 8) - 1ul)); } @@ -1789,7 +1792,7 @@ static int32_t mnodeDoCreateChildTable(SMnodeMsg *pMsg, int32_t tid) { tstrerror(code)); } else { mDebug("app:%p:%p, table:%s, allocated in vgroup, vgId:%d sid:%d uid:%" PRIu64, pMsg->rpcMsg.ahandle, pMsg, - pTable->info.tableId, pVgroup->vgId, pTable->sid, pTable->uid); + pTable->info.tableId, pVgroup->vgId, pTable->tid, pTable->uid); } return code; @@ -1807,8 +1810,8 @@ static int32_t mnodeProcessCreateChildTableMsg(SMnodeMsg *pMsg) { if (pMsg->retry == 0) { if (pMsg->pTable == NULL) { SVgObj *pVgroup = NULL; - int32_t sid = 0; - code = mnodeGetAvailableVgroup(pMsg, &pVgroup, &sid); + int32_t tid = 0; + code = mnodeGetAvailableVgroup(pMsg, &pVgroup, &tid); if (code != TSDB_CODE_SUCCESS) { mDebug("app:%p:%p, table:%s, failed to get available vgroup, reason:%s", pMsg->rpcMsg.ahandle, pMsg, pCreate->tableId, tstrerror(code)); @@ -1822,7 +1825,7 @@ static int32_t mnodeProcessCreateChildTableMsg(SMnodeMsg *pMsg) { pMsg->pVgroup = pVgroup; mnodeIncVgroupRef(pVgroup); - return mnodeDoCreateChildTable(pMsg, sid); + return mnodeDoCreateChildTable(pMsg, tid); } } else { if (pMsg->pTable == NULL) pMsg->pTable = mnodeGetTable(pCreate->tableId); @@ -1852,13 +1855,13 @@ static int32_t mnodeSendDropChildTableMsg(SMnodeMsg *pMsg, bool needReturn) { tstrncpy(pDrop->tableId, pTable->info.tableId, TSDB_TABLE_FNAME_LEN); pDrop->vgId = htonl(pTable->vgId); pDrop->contLen = htonl(sizeof(SMDDropTableMsg)); - pDrop->sid = htonl(pTable->sid); + pDrop->tid = htonl(pTable->tid); pDrop->uid = htobe64(pTable->uid); SRpcEpSet epSet = mnodeGetEpSetFromVgroup(pMsg->pVgroup); mInfo("app:%p:%p, ctable:%s, send drop ctable msg, vgId:%d sid:%d uid:%" PRIu64, pMsg->rpcMsg.ahandle, pMsg, - pDrop->tableId, pTable->vgId, pTable->sid, pTable->uid); + pDrop->tableId, pTable->vgId, pTable->tid, pTable->uid); SRpcMsg rpcMsg = { .ahandle = pMsg, @@ -2097,7 +2100,7 @@ static int32_t mnodeDoGetChildTableMeta(SMnodeMsg *pMsg, STableMetaMsg *pMeta) { SChildTableObj *pTable = (SChildTableObj *)pMsg->pTable; pMeta->uid = htobe64(pTable->uid); - pMeta->sid = htonl(pTable->sid); + pMeta->tid = htonl(pTable->tid); pMeta->precision = pDb->cfg.precision; pMeta->tableType = pTable->info.type; tstrncpy(pMeta->tableId, pTable->info.tableId, TSDB_TABLE_FNAME_LEN); @@ -2137,7 +2140,7 @@ static int32_t mnodeDoGetChildTableMeta(SMnodeMsg *pMsg, STableMetaMsg *pMeta) { pMeta->vgroup.vgId = htonl(pMsg->pVgroup->vgId); mDebug("app:%p:%p, table:%s, uid:%" PRIu64 " table meta is retrieved, vgId:%d sid:%d", pMsg->rpcMsg.ahandle, pMsg, - pTable->info.tableId, pTable->uid, pTable->vgId, pTable->sid); + pTable->info.tableId, pTable->uid, pTable->vgId, pTable->tid); return TSDB_CODE_SUCCESS; } @@ -2289,11 +2292,11 @@ static void mnodeDropAllChildTablesInStable(SSuperTableObj *pStable) { } #if 0 -static SChildTableObj* mnodeGetTableByPos(int32_t vnode, int32_t sid) { +static SChildTableObj* mnodeGetTableByPos(int32_t vnode, int32_t tid) { SVgObj *pVgroup = mnodeGetVgroup(vnode); if (pVgroup == NULL) return NULL; - SChildTableObj *pTable = pVgroup->tableList[sid - 1]; + SChildTableObj *pTable = pVgroup->tableList[tid - 1]; mnodeIncTableRef((STableObj *)pTable); mnodeDecVgroupRef(pVgroup); @@ -2341,12 +2344,12 @@ static void mnodeProcessDropChildTableRsp(SRpcMsg *rpcMsg) { assert(pTable); mInfo("app:%p:%p, table:%s, drop table rsp received, vgId:%d sid:%d uid:%" PRIu64 ", thandle:%p result:%s", - mnodeMsg->rpcMsg.ahandle, mnodeMsg, pTable->info.tableId, pTable->vgId, pTable->sid, pTable->uid, + mnodeMsg->rpcMsg.ahandle, mnodeMsg, pTable->info.tableId, pTable->vgId, pTable->tid, pTable->uid, mnodeMsg->rpcMsg.handle, tstrerror(rpcMsg->code)); if (rpcMsg->code != TSDB_CODE_SUCCESS) { mError("app:%p:%p, table:%s, failed to drop in dnode, vgId:%d sid:%d uid:%" PRIu64 ", reason:%s", - mnodeMsg->rpcMsg.ahandle, mnodeMsg, pTable->info.tableId, pTable->vgId, pTable->sid, pTable->uid, + mnodeMsg->rpcMsg.ahandle, mnodeMsg, pTable->info.tableId, pTable->vgId, pTable->tid, pTable->uid, tstrerror(rpcMsg->code)); dnodeSendRpcMnodeWriteRsp(mnodeMsg, rpcMsg->code); return; @@ -2384,7 +2387,7 @@ static void mnodeProcessCreateChildTableRsp(SRpcMsg *rpcMsg) { // If the table is deleted by another thread during creation, stop creating and send drop msg to vnode if (sdbCheckRowDeleted(tsChildTableSdb, pTable)) { mDebug("app:%p:%p, table:%s, create table rsp received, but a deleting opertion incoming, vgId:%d sid:%d uid:%" PRIu64, - mnodeMsg->rpcMsg.ahandle, mnodeMsg, pTable->info.tableId, pTable->vgId, pTable->sid, pTable->uid); + mnodeMsg->rpcMsg.ahandle, mnodeMsg, pTable->info.tableId, pTable->vgId, pTable->tid, pTable->uid); // if the vgroup is already dropped from hash, it can't be accquired by pTable->vgId // so the refCount of vgroup can not be decreased @@ -2419,13 +2422,13 @@ static void mnodeProcessCreateChildTableRsp(SRpcMsg *rpcMsg) { if (mnodeMsg->retry++ < 10) { mDebug("app:%p:%p, table:%s, create table rsp received, need retry, times:%d vgId:%d sid:%d uid:%" PRIu64 " result:%s thandle:%p", - mnodeMsg->rpcMsg.ahandle, mnodeMsg, pTable->info.tableId, mnodeMsg->retry, pTable->vgId, pTable->sid, + mnodeMsg->rpcMsg.ahandle, mnodeMsg, pTable->info.tableId, mnodeMsg->retry, pTable->vgId, pTable->tid, pTable->uid, tstrerror(rpcMsg->code), mnodeMsg->rpcMsg.handle); dnodeDelayReprocessMnodeWriteMsg(mnodeMsg); } else { mError("app:%p:%p, table:%s, failed to create in dnode, vgId:%d sid:%d uid:%" PRIu64 ", result:%s thandle:%p", - mnodeMsg->rpcMsg.ahandle, mnodeMsg, pTable->info.tableId, pTable->vgId, pTable->sid, pTable->uid, + mnodeMsg->rpcMsg.ahandle, mnodeMsg, pTable->info.tableId, pTable->vgId, pTable->tid, pTable->uid, tstrerror(rpcMsg->code), mnodeMsg->rpcMsg.handle); SSdbOper oper = {.type = SDB_OPER_GLOBAL, .table = tsChildTableSdb, .pObj = pTable}; @@ -2680,7 +2683,7 @@ static int32_t mnodeRetrieveShowTables(SShowObj *pShow, char *data, int32_t rows // tid pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; - *(int32_t*) pWrite = pTable->sid; + *(int32_t*) pWrite = pTable->tid; cols++; //vgid diff --git a/src/mnode/src/mnodeVgroup.c b/src/mnode/src/mnodeVgroup.c index 28e4d17920..5084f1276a 100644 --- a/src/mnode/src/mnodeVgroup.c +++ b/src/mnode/src/mnodeVgroup.c @@ -793,12 +793,12 @@ static int32_t mnodeRetrieveVgroups(SShowObj *pShow, char *data, int32_t rows, v void mnodeAddTableIntoVgroup(SVgObj *pVgroup, SChildTableObj *pTable) { int32_t idPoolSize = taosIdPoolMaxSize(pVgroup->idPool); - if (pTable->sid > idPoolSize) { + if (pTable->tid > idPoolSize) { mnodeAllocVgroupIdPool(pVgroup); } - if (pTable->sid >= 1) { - taosIdPoolMarkStatus(pVgroup->idPool, pTable->sid); + if (pTable->tid >= 1) { + taosIdPoolMarkStatus(pVgroup->idPool, pTable->tid); pVgroup->numOfTables++; // The create vgroup message may be received later than the create table message // and the writing order in sdb is therefore uncertain @@ -808,8 +808,8 @@ void mnodeAddTableIntoVgroup(SVgObj *pVgroup, SChildTableObj *pTable) { } void mnodeRemoveTableFromVgroup(SVgObj *pVgroup, SChildTableObj *pTable) { - if (pTable->sid >= 1) { - taosFreeId(pVgroup->idPool, pTable->sid); + if (pTable->tid >= 1) { + taosFreeId(pVgroup->idPool, pTable->tid); pVgroup->numOfTables--; // The create vgroup message may be received later than the create table message // and the writing order in sdb is therefore uncertain diff --git a/src/query/inc/qExecutor.h b/src/query/inc/qExecutor.h index 33b724e434..f392644e67 100644 --- a/src/query/inc/qExecutor.h +++ b/src/query/inc/qExecutor.h @@ -213,6 +213,7 @@ typedef struct SQInfo { void* pBuf; // allocated buffer for STableQueryInfo, sizeof(STableQueryInfo)*numOfTables; pthread_mutex_t lock; // used to synchronize the rsp/query threads + tsem_t ready; int32_t dataReady; // denote if query result is ready or not void* rspContext; // response context } SQInfo; diff --git a/src/query/inc/qSqlparser.h b/src/query/inc/qSqlparser.h index d6664577a3..bc8f9a5e23 100644 --- a/src/query/inc/qSqlparser.h +++ b/src/query/inc/qSqlparser.h @@ -276,8 +276,6 @@ tSQLExpr *tSQLExprNodeClone(tSQLExpr *pExpr); SAlterTableSQL *tAlterTableSQLElems(SStrToken *pMeterName, tFieldList *pCols, tVariantList *pVals, int32_t type); -tSQLExprListList *tSQLListListAppend(tSQLExprListList *pList, tSQLExprList *pExprList); - void destroyAllSelectClause(SSubclauseInfo *pSql); void doDestroyQuerySql(SQuerySQL *pSql); diff --git a/src/query/src/qAst.c b/src/query/src/qAst.c index 63411aaf3f..893105e44a 100644 --- a/src/query/src/qAst.c +++ b/src/query/src/qAst.c @@ -646,9 +646,7 @@ static bool filterItem(tExprNode *pExpr, const void *pItem, SExprTraverseSupp *p } // handle the leaf node - assert(pLeft->nodeType == TSQL_NODE_COL && pRight->nodeType == TSQL_NODE_VALUE); param->setupInfoFn(pExpr, param->pExtInfo); - return param->nodeFilterFn(pItem, pExpr->_node.info); } @@ -769,6 +767,7 @@ void tExprTreeTraverse(tExprNode *pExpr, SSkipList *pSkipList, SArray *result, S assert(taosArrayGetSize(result) == 0); tSQLBinaryTraverseOnSkipList(pExpr, result, pSkipList, param); } + return; } diff --git a/src/query/src/qExecutor.c b/src/query/src/qExecutor.c index 869e2dd7ed..3cb8db9faf 100644 --- a/src/query/src/qExecutor.c +++ b/src/query/src/qExecutor.c @@ -6227,7 +6227,9 @@ static SQInfo *createQInfoImpl(SQueryTableMsg *pQueryMsg, SSqlGroupbyExpr *pGrou // NOTE: pTableCheckInfo need to update the query time range and the lastKey info pQInfo->arrTableIdInfo = taosArrayInit(tableIndex, sizeof(STableIdInfo)); pQInfo->dataReady = QUERY_RESULT_NOT_READY; + pQInfo->rspContext = NULL; pthread_mutex_init(&pQInfo->lock, NULL); + tsem_init(&pQInfo->ready, 0, 0); pQuery->pos = -1; pQuery->window = pQueryMsg->window; @@ -6692,12 +6694,14 @@ static bool doBuildResCheck(SQInfo* pQInfo) { pQInfo->dataReady = QUERY_RESULT_READY; buildRes = (pQInfo->rspContext != NULL); - pthread_mutex_unlock(&pQInfo->lock); - - // clear qhandle owner + // clear qhandle owner, it must be in the secure area. other thread may run ahead before current, after it is + // put into task to be executed. assert(pQInfo->owner == taosGetPthreadId()); pQInfo->owner = 0; + pthread_mutex_unlock(&pQInfo->lock); + + tsem_post(&pQInfo->ready); return buildRes; } @@ -6761,18 +6765,24 @@ int32_t qRetrieveQueryResultInfo(qinfo_t qinfo, bool* buildRes, void* pRspContex SQInfo *pQInfo = (SQInfo *)qinfo; if (pQInfo == NULL || !isValidQInfo(pQInfo)) { + qError("QInfo:%p invalid qhandle", pQInfo); return TSDB_CODE_QRY_INVALID_QHANDLE; } *buildRes = false; - SQuery *pQuery = pQInfo->runtimeEnv.pQuery; if (IS_QUERY_KILLED(pQInfo)) { qDebug("QInfo:%p query is killed, code:%d", pQInfo, pQInfo->code); return pQInfo->code; } int32_t code = TSDB_CODE_SUCCESS; + +#if 0 + SQuery *pQuery = pQInfo->runtimeEnv.pQuery; + pthread_mutex_lock(&pQInfo->lock); + assert(pQInfo->rspContext == NULL); + if (pQInfo->dataReady == QUERY_RESULT_READY) { *buildRes = true; qDebug("QInfo:%p retrieve result info, rowsize:%d, rows:%"PRId64", code:%d", pQInfo, pQuery->rowSize, pQuery->rec.rows, @@ -6781,10 +6791,17 @@ int32_t qRetrieveQueryResultInfo(qinfo_t qinfo, bool* buildRes, void* pRspContex *buildRes = false; qDebug("QInfo:%p retrieve req set query return result after paused", pQInfo); pQInfo->rspContext = pRspContext; + assert(pQInfo->rspContext != NULL); } code = pQInfo->code; pthread_mutex_unlock(&pQInfo->lock); +#else + tsem_wait(&pQInfo->ready); + *buildRes = true; + code = pQInfo->code; +#endif + return code; } diff --git a/src/query/src/qParserImpl.c b/src/query/src/qParserImpl.c index 33237a58c2..7e8128f200 100644 --- a/src/query/src/qParserImpl.c +++ b/src/query/src/qParserImpl.c @@ -130,13 +130,15 @@ tSQLExpr *tSQLExprIdValueCreate(SStrToken *pToken, int32_t optrType) { tVariantCreate(&pSQLExpr->val, pToken); pSQLExpr->nSQLOptr = optrType; } else if (optrType == TK_NOW) { - // default use microsecond + // use microsecond by default pSQLExpr->val.i64Key = taosGetTimestamp(TSDB_TIME_PRECISION_MICRO); pSQLExpr->val.nType = TSDB_DATA_TYPE_BIGINT; pSQLExpr->nSQLOptr = TK_TIMESTAMP; // TK_TIMESTAMP used to denote the time value is in microsecond } else if (optrType == TK_VARIABLE) { int32_t ret = parseAbsoluteDuration(pToken->z, pToken->n, &pSQLExpr->val.i64Key); - UNUSED(ret); + if (ret != TSDB_CODE_SUCCESS) { + terrno = TSDB_CODE_TSC_SQL_SYNTAX_ERROR; + } pSQLExpr->val.nType = TSDB_DATA_TYPE_BIGINT; pSQLExpr->nSQLOptr = TK_TIMESTAMP; @@ -148,6 +150,7 @@ tSQLExpr *tSQLExprIdValueCreate(SStrToken *pToken, int32_t optrType) { pSQLExpr->nSQLOptr = optrType; } + return pSQLExpr; } @@ -532,26 +535,6 @@ SQuerySQL *tSetQuerySQLElems(SStrToken *pSelectToken, tSQLExprList *pSelection, return pQuery; } -tSQLExprListList *tSQLListListAppend(tSQLExprListList *pList, tSQLExprList *pExprList) { - if (pList == NULL) pList = calloc(1, sizeof(tSQLExprListList)); - - if (pList->nAlloc <= pList->nList) { // - pList->nAlloc = (pList->nAlloc << 1) + 4; - pList->a = realloc(pList->a, pList->nAlloc * sizeof(pList->a[0])); - if (pList->a == 0) { - pList->nList = pList->nAlloc = 0; - return pList; - } - } - assert(pList->a != 0); - - if (pExprList) { - pList->a[pList->nList++] = pExprList; - } - - return pList; -} - void doDestroyQuerySql(SQuerySQL *pQuerySql) { if (pQuerySql == NULL) { return; diff --git a/src/query/src/qPercentile.c b/src/query/src/qPercentile.c index 3a8be781d5..ab9ffb7bcb 100644 --- a/src/query/src/qPercentile.c +++ b/src/query/src/qPercentile.c @@ -365,7 +365,7 @@ void tMemBucketDestroy(tMemBucket *pBucket) { taosTFree(pBucket); } -void tMemBucketUpdateBoundingBox(MinMaxEntry *r, char *data, int32_t dataType) { +void tMemBucketUpdateBoundingBox(MinMaxEntry *r, const char *data, int32_t dataType) { switch (dataType) { case TSDB_DATA_TYPE_INT: { int32_t val = *(int32_t *)data; diff --git a/src/query/src/qSyntaxtreefunction.c b/src/query/src/qSyntaxtreefunction.c index 2104edfd91..7f7fca2c1e 100644 --- a/src/query/src/qSyntaxtreefunction.c +++ b/src/query/src/qSyntaxtreefunction.c @@ -1247,7 +1247,10 @@ _bi_consumer_fn_t tGetBiConsumerFn(int32_t leftType, int32_t rightType, int32_t case TSDB_BINARY_OP_REMAINDER: return rem_function_arraylist[leftType][rightType]; default: + assert(0); return NULL; } + + assert(0); return NULL; } diff --git a/src/rpc/src/rpcMain.c b/src/rpc/src/rpcMain.c index 63231bb1fa..6e9088d9fb 100644 --- a/src/rpc/src/rpcMain.c +++ b/src/rpc/src/rpcMain.c @@ -412,7 +412,7 @@ void rpcSendResponse(const SRpcMsg *pRsp) { rpcLockConn(pConn); if ( pConn->inType == 0 || pConn->user[0] == 0 ) { - tDebug("%s, connection is already released, rsp wont be sent", pConn->info); + tError("%s, connection is already released, rsp wont be sent", pConn->info); rpcUnlockConn(pConn); rpcFreeCont(pMsg->pCont); rpcDecRef(pRpc); diff --git a/src/tsdb/src/tsdbMeta.c b/src/tsdb/src/tsdbMeta.c index 564d7f5db5..f3bd91f038 100644 --- a/src/tsdb/src/tsdbMeta.c +++ b/src/tsdb/src/tsdbMeta.c @@ -239,7 +239,7 @@ STableCfg *tsdbCreateTableCfgFromMsg(SMDCreateTableMsg *pMsg) { return NULL; } - if (tsdbInitTableCfg(pCfg, pMsg->tableType, htobe64(pMsg->uid), htonl(pMsg->sid)) < 0) goto _err; + if (tsdbInitTableCfg(pCfg, pMsg->tableType, htobe64(pMsg->uid), htonl(pMsg->tid)) < 0) goto _err; if (tdInitTSchemaBuilder(&schemaBuilder, htonl(pMsg->sversion)) < 0) { terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; goto _err; diff --git a/src/vnode/src/vnodeRead.c b/src/vnode/src/vnodeRead.c index 11e094fe98..11bf569a40 100644 --- a/src/vnode/src/vnodeRead.c +++ b/src/vnode/src/vnodeRead.c @@ -14,7 +14,8 @@ */ #define _DEFAULT_SOURCE -//#include +#define _NON_BLOCKING_RETRIEVE 0 + #include "os.h" #include "tglobal.h" @@ -92,12 +93,12 @@ int32_t vnodeProcessRead(void *param, SReadMsg *pRead) { } } -static void vnodePutItemIntoReadQueue(SVnodeObj *pVnode, void **qhandle) { +static void vnodePutItemIntoReadQueue(SVnodeObj *pVnode, void **qhandle, void *ahandle) { SReadMsg *pRead = (SReadMsg *)taosAllocateQitem(sizeof(SReadMsg)); pRead->rpcMsg.msgType = TSDB_MSG_TYPE_QUERY; pRead->pCont = qhandle; pRead->contLen = 0; - pRead->rpcMsg.handle = NULL; + pRead->rpcMsg.ahandle = ahandle; atomic_add_fetch_32(&pVnode->refCount, 1); @@ -105,14 +106,23 @@ static void vnodePutItemIntoReadQueue(SVnodeObj *pVnode, void **qhandle) { taosWriteQitem(pVnode->rqueue, TAOS_QTYPE_QUERY, pRead); } -static int32_t vnodeDumpQueryResult(SRspRet *pRet, void *pVnode, void **handle, bool *freeHandle) { +/** + * + * @param pRet response message object + * @param pVnode the vnode object + * @param handle qhandle for executing query + * @param freeHandle free qhandle or not + * @param ahandle sqlObj address at client side + * @return + */ +static int32_t vnodeDumpQueryResult(SRspRet *pRet, void *pVnode, void **handle, bool *freeHandle, void *ahandle) { bool continueExec = false; int32_t code = TSDB_CODE_SUCCESS; if ((code = qDumpRetrieveResult(*handle, (SRetrieveTableRsp **)&pRet->rsp, &pRet->len, &continueExec)) == TSDB_CODE_SUCCESS) { if (continueExec) { *freeHandle = false; - vnodePutItemIntoReadQueue(pVnode, handle); + vnodePutItemIntoReadQueue(pVnode, handle, ahandle); pRet->qhandle = *handle; } else { *freeHandle = true; @@ -190,7 +200,7 @@ static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg) { // current connect is broken if (code == TSDB_CODE_SUCCESS) { handle = qRegisterQInfo(pVnode->qMgmt, (uint64_t)pQInfo); - if (handle == NULL) { // failed to register qhandle, todo add error test case + if (handle == NULL) { // failed to register qhandle pRsp->code = terrno; terrno = 0; vError("vgId:%d QInfo:%p register qhandle failed, return to app, code:%s", pVnode->vgId, (void *)pQInfo, @@ -216,7 +226,7 @@ static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg) { if (handle != NULL) { vDebug("vgId:%d, QInfo:%p, dnode query msg disposed, create qhandle and returns to app", vgId, *handle); - vnodePutItemIntoReadQueue(pVnode, handle); + vnodePutItemIntoReadQueue(pVnode, handle, pReadMsg->rpcMsg.ahandle); } } else { assert(pCont != NULL); @@ -224,6 +234,8 @@ static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg) { vDebug("vgId:%d, QInfo:%p, dnode continues to exec query", pVnode->vgId, *qhandle); + +#if _NON_BLOCKING_RETRIEVE bool freehandle = false; bool buildRes = qTableQuery(*qhandle); // do execute query @@ -237,11 +249,14 @@ static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg) { pReadMsg->rpcMsg.handle); // set the real rsp error code - pReadMsg->rpcMsg.code = vnodeDumpQueryResult(&pReadMsg->rspRet, pVnode, qhandle, &freehandle); + pReadMsg->rpcMsg.code = vnodeDumpQueryResult(&pReadMsg->rspRet, pVnode, qhandle, &freehandle, pReadMsg->rpcMsg.ahandle); // NOTE: set return code to be TSDB_CODE_QRY_HAS_RSP to notify dnode to return msg to client code = TSDB_CODE_QRY_HAS_RSP; } else { + void* h1 = qGetResultRetrieveMsg(*qhandle); + assert(h1 == NULL); + freehandle = qQueryCompleted(*qhandle); } @@ -250,6 +265,10 @@ static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg) { if (freehandle || (!buildRes)) { qReleaseQInfo(pVnode->qMgmt, (void **)&qhandle, freehandle); } +#else + qTableQuery(*qhandle); // do execute query + qReleaseQInfo(pVnode->qMgmt, (void **)&qhandle, false); +#endif } return code; @@ -309,12 +328,18 @@ static int32_t vnodeProcessFetchMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg) { memset(pRet->rsp, 0, sizeof(SRetrieveTableRsp)); freeHandle = true; } else { // result is not ready, return immediately + assert(buildRes == true); +#if _NON_BLOCKING_RETRIEVE if (!buildRes) { + assert(pReadMsg->rpcMsg.handle != NULL); + qReleaseQInfo(pVnode->qMgmt, (void **)&handle, false); return TSDB_CODE_QRY_NOT_READY; } +#endif - code = vnodeDumpQueryResult(pRet, pVnode, handle, &freeHandle); + // ahandle is the sqlObj pointer + code = vnodeDumpQueryResult(pRet, pVnode, handle, &freeHandle, pReadMsg->rpcMsg.ahandle); } // If qhandle is not added into vread queue, the query should be completed already or paused with error. diff --git a/src/vnode/src/vnodeWrite.c b/src/vnode/src/vnodeWrite.c index 0966192c94..492a396d93 100644 --- a/src/vnode/src/vnodeWrite.c +++ b/src/vnode/src/vnodeWrite.c @@ -151,7 +151,7 @@ static int32_t vnodeProcessDropTableMsg(SVnodeObj *pVnode, void *pCont, SRspRet int32_t code = TSDB_CODE_SUCCESS; vDebug("vgId:%d, table:%s, start to drop", pVnode->vgId, pTable->tableId); - STableId tableId = {.uid = htobe64(pTable->uid), .tid = htonl(pTable->sid)}; + STableId tableId = {.uid = htobe64(pTable->uid), .tid = htonl(pTable->tid)}; if (tsdbDropTable(pVnode->tsdb, tableId) < 0) code = terrno; diff --git a/tests/script/general/parser/col_arithmetic_operation.sim b/tests/script/general/parser/col_arithmetic_operation.sim index 132ecf4342..3208df95e4 100644 --- a/tests/script/general/parser/col_arithmetic_operation.sim +++ b/tests/script/general/parser/col_arithmetic_operation.sim @@ -91,4 +91,60 @@ endi sql_error select max(c2*2) from $tb sql_error select max(c1-c2) from $tb +print =====================> td-1764 +sql select sum(c1)/count(*), sum(c1) as b, count(*) as b from $stb interval(1y) +if $rows != 1 then + return -1 +endi + +if $data00 != @18-01-01 00:00:00.000@ then + return -1 +endi + +if $data01 != 2.250000000 then + return -1 +endi + +if $data02 != 225000 then + return -1 +endi + +sql select first(c1) - last(c1), first(c1) as b, last(c1) as b, min(c1) - max(c1), spread(c1) from ca_stb0 interval(1y) +if $rows != 1 then + return -1 +endi + +if $data00 != @18-01-01 00:00:00.000@ then + return -1 +endi + +if $data01 != -9.000000000 then + return -1 +endi + +if $data02 != 0 then + return -1 +endi + +if $data03 != 9 then + return -1 +endi + +if $data04 != -9.000000000 then + return -1 +endi + +if $data05 != 9.000000000 then + return -1 +endi + +sql_error select first(c1, c2) - last(c1, c2) from stb interval(1y) +sql_error select first(ts) - last(ts) from stb interval(1y) +sql_error select top(c1, 2) - last(c1) from stb; +sql_error select stddev(c1) - last(c1) from stb; +sql_error select diff(c1) - last(c1) from stb; +sql_error select first(c7) - last(c7) from stb; +sql_error select first(c8) - last(c8) from stb; +sql_error select first(c9) - last(c9) from stb; + system sh/exec.sh -n dnode1 -s stop -x SIGINT \ No newline at end of file diff --git a/tests/script/general/parser/constCol.sim b/tests/script/general/parser/constCol.sim index 13b4455779..7ae496f1ac 100644 --- a/tests/script/general/parser/constCol.sim +++ b/tests/script/general/parser/constCol.sim @@ -353,4 +353,36 @@ sql_error select from t1 sql_error select abc from t1 sql_error select abc as tu from t1 +print ========================> td-1756 +sql_error select * from t1 where ts>now-1y +sql_error select * from t1 where ts>now-1n + +print ========================> td-1752 +sql select * from db.st2 where t2 < 200 and t2 is not null; +if $rows != 1 then + return -1 +endi + +if $data00 != @19-12-09 16:27:35.000@ then + return -1 +endi + +if $data01 != 2 then + return -1 +endi + +if $data02 != 1 then + return -1 +endi + +sql select * from db.st2 where t2 > 200 or t2 is null; +if $rows != 0 then + return -1 +endi + +sql select * from st2 where t2 < 200 and t2 is null; +if $rows != 0 then + return -1 +endi + system sh/exec.sh -n dnode1 -s stop -x SIGINT diff --git a/tests/script/general/parser/tags_dynamically_specifiy.sim b/tests/script/general/parser/tags_dynamically_specifiy.sim index 0a5d5c9716..07bf4d8dd1 100644 --- a/tests/script/general/parser/tags_dynamically_specifiy.sim +++ b/tests/script/general/parser/tags_dynamically_specifiy.sim @@ -96,5 +96,10 @@ if $rows != 14 then return -1 endi +print ===============================> td-1765 +sql create table m1(ts timestamp, k int) tags(a binary(4), b nchar(4)); +sql create table tm0 using m1 tags('abcd', 'abcd'); +sql_error alter table tm0 set tag b = 'abcd1'; +sql_error alter table tm0 set tag a = 'abcd1'; system sh/exec.sh -n dnode1 -s stop -x SIGINT \ No newline at end of file diff --git a/tests/script/general/parser/union.sim b/tests/script/general/parser/union.sim index 9e178537a2..4af482bde0 100644 --- a/tests/script/general/parser/union.sim +++ b/tests/script/general/parser/union.sim @@ -251,7 +251,7 @@ if $rows != 15 then endi # first subclause are empty -sql select count(*) as c from union_tb0 where ts>now+10y union all select sum(c1) as c from union_tb1; +sql select count(*) as c from union_tb0 where ts > now + 3650d union all select sum(c1) as c from union_tb1; if $rows != 1 then return -1 endi @@ -346,7 +346,7 @@ if $data91 != 99 then return -1 endi -#1111111111111111111111111111111111111111111111111 +#================================================================================================= # two aggregated functions for normal tables sql select sum(c1) as a from union_tb0 limit 1 union all select sum(c3) as a from union_tb1 limit 2; if $rows != 2 then