diff --git a/src/client/inc/tscUtil.h b/src/client/inc/tscUtil.h index 0be65633af..401eeb5d87 100644 --- a/src/client/inc/tscUtil.h +++ b/src/client/inc/tscUtil.h @@ -237,7 +237,7 @@ void tscInitQueryInfo(SQueryInfo* pQueryInfo); void tscClearSubqueryInfo(SSqlCmd* pCmd); int32_t tscAddQueryInfo(SSqlCmd *pCmd); SQueryInfo *tscGetQueryInfo(SSqlCmd* pCmd); -SQueryInfo *tscGetQueryInfoS(SSqlCmd *pCmd, int32_t subClauseIndex); +SQueryInfo *tscGetQueryInfoS(SSqlCmd *pCmd); void tscClearTableMetaInfo(STableMetaInfo* pTableMetaInfo); @@ -256,7 +256,6 @@ int tscGetTableMeta(SSqlObj* pSql, STableMetaInfo* pTableMetaInfo); int tscGetTableMetaEx(SSqlObj* pSql, STableMetaInfo* pTableMetaInfo, bool createIfNotExists); void tscResetForNextRetrieve(SSqlRes* pRes); -void tscDoQuery(SSqlObj* pSql); void executeQuery(SSqlObj* pSql, SQueryInfo* pQueryInfo); void doExecuteQuery(SSqlObj* pSql, SQueryInfo* pQueryInfo); diff --git a/src/client/inc/tsclient.h b/src/client/inc/tsclient.h index b335b7d53d..af8d0a3b80 100644 --- a/src/client/inc/tsclient.h +++ b/src/client/inc/tsclient.h @@ -251,19 +251,32 @@ typedef struct { SVgroupsInfo *pVgroupInfo; } STableMetaVgroupInfo; +typedef struct SInsertStatementParam { + SName **pTableNameList; // all involved tableMeta list of current insert sql statement. + int32_t numOfTables; + SHashObj *pTableBlockHashList; // data block for each table + SArray *pDataBlocks; // SArray. Merged submit block for each vgroup + int8_t schemaAttached; // denote if submit block is built with table schema or not + STagData tagData; // NOTE: pTagData->data is used as a variant length array + int32_t dataSourceType; // from file or from sql statement + + char msg[512]; // error message + char *sql; // current sql statement position + uint32_t insertType; // TODO remove it +} SInsertStatementParam; + // TODO extract sql parser supporter typedef struct { int command; uint8_t msgType; + SInsertStatementParam insertParam; char reserve1[3]; // fix bus error on arm32 bool autoCreated; // create table if it is not existed during retrieve table meta in mnode union { int32_t count; - int32_t numOfTablesInSubmit; }; - uint32_t insertType; // TODO remove it char * curSql; // current sql, resume position of sql after parsing paused int8_t parseFinished; char reserve2[3]; // fix bus error on arm32 @@ -276,24 +289,22 @@ typedef struct { SHashObj *pTableMetaMap; // local buffer to keep the queried table meta, before validating the AST SQueryInfo *pQueryInfo; - - int32_t clauseIndex; // index of multiple subclause query SQueryInfo *active; // current active query info int32_t batchSize; // for parameter ('?') binding and batch processing int32_t numOfParams; int8_t dataSourceType; // load data from file or not - char reserve4[3]; // fix bus error on arm32 - int8_t submitSchema; // submit block is built with table schema - char reserve5[3]; // fix bus error on arm32 + char reserve4[3]; // fix bus error on arm32 +// int8_t submitSchema; // submit block is built with table schema + char reserve5[3]; // fix bus error on arm32 STagData tagData; // NOTE: pTagData->data is used as a variant length array - SName **pTableNameList; // all involved tableMeta list of current insert sql statement. - int32_t numOfTables; +// SName **pTableNameList; // all involved tableMeta list of current insert sql statement. +// int32_t numOfTables; - SHashObj *pTableBlockHashList; // data block for each table - SArray *pDataBlocks; // SArray. Merged submit block for each vgroup +// SHashObj *pTableBlockHashList; // data block for each table +// SArray *pDataBlocks; // SArray. Merged submit block for each vgroup int32_t resColumnId; } SSqlCmd; diff --git a/src/client/src/tscAsync.c b/src/client/src/tscAsync.c index 72432f602c..b968bfc613 100644 --- a/src/client/src/tscAsync.c +++ b/src/client/src/tscAsync.c @@ -460,7 +460,7 @@ void tscTableMetaCallBack(void *param, TAOS_RES *res, int code) { goto _error; } - if (pCmd->insertType == TSDB_QUERY_TYPE_STMT_INSERT) { + if (TSDB_QUERY_HAS_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_STMT_INSERT)){ STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, 0); code = tscGetTableMeta(pSql, pTableMetaInfo); if (code == TSDB_CODE_TSC_ACTION_IN_PROGRESS) { diff --git a/src/client/src/tscParseInsert.c b/src/client/src/tscParseInsert.c index d55521c0c8..467d42a731 100644 --- a/src/client/src/tscParseInsert.c +++ b/src/client/src/tscParseInsert.c @@ -441,7 +441,7 @@ int tsParseOneRow(char **str, STableDataBlocks *pDataBlocks, SSqlCmd *pCmd, int1 *str += index; if (sToken.type == TK_QUESTION) { - if (pCmd->insertType != TSDB_QUERY_TYPE_STMT_INSERT) { + if (pCmd->insertParam.insertType != TSDB_QUERY_TYPE_STMT_INSERT) { return tscSQLSyntaxErrMsg(pCmd->payload, "? only allowed in binding insertion", *str); } @@ -1120,9 +1120,9 @@ int tsParseInsertSql(SSqlObj *pSql) { return code; } - if (NULL == pCmd->pTableBlockHashList) { - pCmd->pTableBlockHashList = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, false); - if (NULL == pCmd->pTableBlockHashList) { + if (NULL == pCmd->insertParam.pTableBlockHashList) { + pCmd->insertParam.pTableBlockHashList = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, false); + if (NULL == pCmd->insertParam.pTableBlockHashList) { code = TSDB_CODE_TSC_OUT_OF_MEMORY; goto _clean; } @@ -1130,7 +1130,7 @@ int tsParseInsertSql(SSqlObj *pSql) { str = pCmd->curSql; } - tscDebug("0x%"PRIx64" create data block list hashList:%p", pSql->self, pCmd->pTableBlockHashList); + tscDebug("0x%"PRIx64" create data block list hashList:%p", pSql->self, pCmd->insertParam.pTableBlockHashList); while (1) { int32_t index = 0; @@ -1241,7 +1241,7 @@ int tsParseInsertSql(SSqlObj *pSql) { } STableDataBlocks *dataBuf = NULL; - int32_t ret = tscGetDataBlockFromList(pCmd->pTableBlockHashList, pTableMeta->id.uid, TSDB_DEFAULT_PAYLOAD_SIZE, + int32_t ret = tscGetDataBlockFromList(pCmd->insertParam.pTableBlockHashList, pTableMeta->id.uid, TSDB_DEFAULT_PAYLOAD_SIZE, sizeof(SSubmitBlk), tinfo.rowSize, &pTableMetaInfo->name, pTableMeta, &dataBuf, NULL); if (ret != TSDB_CODE_SUCCESS) { @@ -1261,7 +1261,7 @@ int tsParseInsertSql(SSqlObj *pSql) { } STableDataBlocks *dataBuf = NULL; - int32_t ret = tscGetDataBlockFromList(pCmd->pTableBlockHashList, pTableMeta->id.uid, TSDB_DEFAULT_PAYLOAD_SIZE, + int32_t ret = tscGetDataBlockFromList(pCmd->insertParam.pTableBlockHashList, pTableMeta->id.uid, TSDB_DEFAULT_PAYLOAD_SIZE, sizeof(SSubmitBlk), tinfo.rowSize, &pTableMetaInfo->name, pTableMeta, &dataBuf, NULL); if (ret != TSDB_CODE_SUCCESS) { @@ -1297,7 +1297,8 @@ int tsParseInsertSql(SSqlObj *pSql) { goto _clean; } - if ((pCmd->insertType != TSDB_QUERY_TYPE_STMT_INSERT) && taosHashGetSize(pCmd->pTableBlockHashList) > 0) { // merge according to vgId + // merge according to vgId + if (!TSDB_QUERY_HAS_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_STMT_INSERT) && taosHashGetSize(pCmd->insertParam.pTableBlockHashList) > 0) { if ((code = tscMergeTableDataBlocks(pSql, true)) != TSDB_CODE_SUCCESS) { goto _clean; } @@ -1326,9 +1327,9 @@ int tsInsertInitialCheck(SSqlObj *pSql) { pCmd->count = 0; pCmd->command = TSDB_SQL_INSERT; - SQueryInfo *pQueryInfo = tscGetQueryInfoS(pCmd, pCmd->clauseIndex); + SQueryInfo *pQueryInfo = tscGetQueryInfoS(pCmd); - TSDB_QUERY_SET_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_INSERT | pCmd->insertType); + TSDB_QUERY_SET_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_INSERT | TSDB_QUERY_TYPE_STMT_INSERT); sToken = tStrGetToken(pSql->sqlstr, &index, false); if (sToken.type != TK_INTO) { @@ -1410,7 +1411,7 @@ static int doPackSendDataBlock(SSqlObj *pSql, int32_t numOfRows, STableDataBlock return code; } - STableDataBlocks *pDataBlock = taosArrayGetP(pCmd->pDataBlocks, 0); + STableDataBlocks *pDataBlock = taosArrayGetP(pCmd->insertParam.pDataBlocks, 0); if ((code = tscCopyDataBlockToPayload(pSql, pDataBlock)) != TSDB_CODE_SUCCESS) { return code; } @@ -1466,11 +1467,11 @@ static void parseFileSendDataBlock(void *param, TAOS_RES *tres, int32_t numOfRow destroyTableNameList(pCmd); - pCmd->pDataBlocks = tscDestroyBlockArrayList(pCmd->pDataBlocks); + pCmd->insertParam.pDataBlocks = tscDestroyBlockArrayList(pCmd->insertParam.pDataBlocks); - if (pCmd->pTableBlockHashList == NULL) { - pCmd->pTableBlockHashList = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, false); - if (pCmd->pTableBlockHashList == NULL) { + if (pCmd->insertParam.pTableBlockHashList == NULL) { + pCmd->insertParam.pTableBlockHashList = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, false); + if (pCmd->insertParam.pTableBlockHashList == NULL) { code = TSDB_CODE_TSC_OUT_OF_MEMORY; goto _error; } @@ -1478,7 +1479,7 @@ static void parseFileSendDataBlock(void *param, TAOS_RES *tres, int32_t numOfRow STableDataBlocks *pTableDataBlock = NULL; int32_t ret = - tscGetDataBlockFromList(pCmd->pTableBlockHashList, pTableMeta->id.uid, TSDB_PAYLOAD_SIZE, sizeof(SSubmitBlk), + tscGetDataBlockFromList(pCmd->insertParam.pTableBlockHashList, pTableMeta->id.uid, TSDB_PAYLOAD_SIZE, sizeof(SSubmitBlk), tinfo.rowSize, &pTableMetaInfo->name, pTableMeta, &pTableDataBlock, NULL); if (ret != TSDB_CODE_SUCCESS) { pParentSql->res.code = TSDB_CODE_TSC_OUT_OF_MEMORY; diff --git a/src/client/src/tscPrepare.c b/src/client/src/tscPrepare.c index e76219b320..b90776d0a8 100644 --- a/src/client/src/tscPrepare.c +++ b/src/client/src/tscPrepare.c @@ -308,7 +308,7 @@ static int fillColumnsNull(STableDataBlocks* pBlock, int32_t rowNum) { int32_t fillTablesColumnsNull(SSqlObj* pSql) { SSqlCmd* pCmd = &pSql->cmd; - STableDataBlocks** p = taosHashIterate(pCmd->pTableBlockHashList, NULL); + STableDataBlocks** p = taosHashIterate(pCmd->insertParam.pTableBlockHashList, NULL); STableDataBlocks* pOneTableBlock = *p; while(pOneTableBlock) { @@ -317,7 +317,7 @@ int32_t fillTablesColumnsNull(SSqlObj* pSql) { fillColumnsNull(pOneTableBlock, pBlocks->numOfRows); } - p = taosHashIterate(pCmd->pTableBlockHashList, p); + p = taosHashIterate(pCmd->insertParam.pTableBlockHashList, p); if (p == NULL) { break; } @@ -840,12 +840,12 @@ static int insertStmtBindParam(STscStmt* stmt, TAOS_BIND* bind) { STableDataBlocks* pBlock = NULL; if (pStmt->multiTbInsert) { - if (pCmd->pTableBlockHashList == NULL) { + if (pCmd->insertParam.pTableBlockHashList == NULL) { tscError("0x%"PRIx64" Table block hash list is empty", pStmt->pSql->self); return TSDB_CODE_TSC_APP_ERROR; } - STableDataBlocks** t1 = (STableDataBlocks**)taosHashGet(pCmd->pTableBlockHashList, (const char*)&pStmt->mtb.currentUid, sizeof(pStmt->mtb.currentUid)); + STableDataBlocks** t1 = (STableDataBlocks**)taosHashGet(pCmd->insertParam.pTableBlockHashList, (const char*)&pStmt->mtb.currentUid, sizeof(pStmt->mtb.currentUid)); if (t1 == NULL) { tscError("0x%"PRIx64" no table data block in hash list, uid:%" PRId64 , pStmt->pSql->self, pStmt->mtb.currentUid); return TSDB_CODE_TSC_APP_ERROR; @@ -856,12 +856,12 @@ static int insertStmtBindParam(STscStmt* stmt, TAOS_BIND* bind) { STableMetaInfo* pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, 0); STableMeta* pTableMeta = pTableMetaInfo->pTableMeta; - if (pCmd->pTableBlockHashList == NULL) { - pCmd->pTableBlockHashList = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, false); + if (pCmd->insertParam.pTableBlockHashList == NULL) { + pCmd->insertParam.pTableBlockHashList = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, false); } int32_t ret = - tscGetDataBlockFromList(pCmd->pTableBlockHashList, pTableMeta->id.uid, TSDB_PAYLOAD_SIZE, sizeof(SSubmitBlk), + tscGetDataBlockFromList(pCmd->insertParam.pTableBlockHashList, pTableMeta->id.uid, TSDB_PAYLOAD_SIZE, sizeof(SSubmitBlk), pTableMeta->tableInfo.rowSize, &pTableMetaInfo->name, pTableMeta, &pBlock, NULL); if (ret != 0) { return ret; @@ -904,12 +904,12 @@ static int insertStmtBindParamBatch(STscStmt* stmt, TAOS_MULTI_BIND* bind, int c STableDataBlocks* pBlock = NULL; if (pStmt->multiTbInsert) { - if (pCmd->pTableBlockHashList == NULL) { + if (pCmd->insertParam.pTableBlockHashList == NULL) { tscError("0x%"PRIx64" Table block hash list is empty", pStmt->pSql->self); return TSDB_CODE_TSC_APP_ERROR; } - STableDataBlocks** t1 = (STableDataBlocks**)taosHashGet(pCmd->pTableBlockHashList, (const char*)&pStmt->mtb.currentUid, sizeof(pStmt->mtb.currentUid)); + STableDataBlocks** t1 = (STableDataBlocks**)taosHashGet(pCmd->insertParam.pTableBlockHashList, (const char*)&pStmt->mtb.currentUid, sizeof(pStmt->mtb.currentUid)); if (t1 == NULL) { tscError("0x%"PRIx64" no table data block in hash list, uid:%" PRId64 , pStmt->pSql->self, pStmt->mtb.currentUid); return TSDB_CODE_TSC_APP_ERROR; @@ -920,12 +920,12 @@ static int insertStmtBindParamBatch(STscStmt* stmt, TAOS_MULTI_BIND* bind, int c STableMetaInfo* pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, 0); STableMeta* pTableMeta = pTableMetaInfo->pTableMeta; - if (pCmd->pTableBlockHashList == NULL) { - pCmd->pTableBlockHashList = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, false); + if (pCmd->insertParam.pTableBlockHashList == NULL) { + pCmd->insertParam.pTableBlockHashList = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, false); } int32_t ret = - tscGetDataBlockFromList(pCmd->pTableBlockHashList, pTableMeta->id.uid, TSDB_PAYLOAD_SIZE, sizeof(SSubmitBlk), + tscGetDataBlockFromList(pCmd->insertParam.pTableBlockHashList, pTableMeta->id.uid, TSDB_PAYLOAD_SIZE, sizeof(SSubmitBlk), pTableMeta->tableInfo.rowSize, &pTableMetaInfo->name, pTableMeta, &pBlock, NULL); if (ret != 0) { return ret; @@ -991,11 +991,11 @@ static int insertStmtUpdateBatch(STscStmt* stmt) { return TSDB_CODE_TSC_APP_ERROR; } - if (taosHashGetSize(pCmd->pTableBlockHashList) == 0) { + if (taosHashGetSize(pCmd->insertParam.pTableBlockHashList) == 0) { return TSDB_CODE_SUCCESS; } - STableDataBlocks** t1 = (STableDataBlocks**)taosHashGet(pCmd->pTableBlockHashList, (const char*)&stmt->mtb.currentUid, sizeof(stmt->mtb.currentUid)); + STableDataBlocks** t1 = (STableDataBlocks**)taosHashGet(pCmd->insertParam.pTableBlockHashList, (const char*)&stmt->mtb.currentUid, sizeof(stmt->mtb.currentUid)); if (t1 == NULL) { tscError("0x%"PRIx64" no table data block in hash list, uid:%" PRId64 , pSql->self, stmt->mtb.currentUid); return TSDB_CODE_TSC_APP_ERROR; @@ -1031,9 +1031,9 @@ static int insertStmtReset(STscStmt* pStmt) { if (pCmd->batchSize > 2) { int32_t alloced = (pCmd->batchSize + 1) / 2; - size_t size = taosArrayGetSize(pCmd->pDataBlocks); + size_t size = taosArrayGetSize(pCmd->insertParam.pDataBlocks); for (int32_t i = 0; i < size; ++i) { - STableDataBlocks* pBlock = taosArrayGetP(pCmd->pDataBlocks, i); + STableDataBlocks* pBlock = taosArrayGetP(pCmd->insertParam.pDataBlocks, i); uint32_t totalDataSize = pBlock->size - sizeof(SSubmitBlk); pBlock->size = sizeof(SSubmitBlk) + totalDataSize / alloced; @@ -1055,21 +1055,21 @@ static int insertStmtExecute(STscStmt* stmt) { return TSDB_CODE_TSC_INVALID_VALUE; } - if (taosHashGetSize(pCmd->pTableBlockHashList) == 0) { + if (taosHashGetSize(pCmd->insertParam.pTableBlockHashList) == 0) { return TSDB_CODE_SUCCESS; } STableMetaInfo* pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, 0); STableMeta* pTableMeta = pTableMetaInfo->pTableMeta; - if (pCmd->pTableBlockHashList == NULL) { - pCmd->pTableBlockHashList = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, false); + if (pCmd->insertParam.pTableBlockHashList == NULL) { + pCmd->insertParam.pTableBlockHashList = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, false); } STableDataBlocks* pBlock = NULL; int32_t ret = - tscGetDataBlockFromList(pCmd->pTableBlockHashList, pTableMeta->id.uid, TSDB_PAYLOAD_SIZE, sizeof(SSubmitBlk), + tscGetDataBlockFromList(pCmd->insertParam.pTableBlockHashList, pTableMeta->id.uid, TSDB_PAYLOAD_SIZE, sizeof(SSubmitBlk), pTableMeta->tableInfo.rowSize, &pTableMetaInfo->name, pTableMeta, &pBlock, NULL); assert(ret == 0); pBlock->size = sizeof(SSubmitBlk) + pCmd->batchSize * pBlock->rowSize; @@ -1086,7 +1086,7 @@ static int insertStmtExecute(STscStmt* stmt) { return code; } - STableDataBlocks* pDataBlock = taosArrayGetP(pCmd->pDataBlocks, 0); + STableDataBlocks* pDataBlock = taosArrayGetP(pCmd->insertParam.pDataBlocks, 0); code = tscCopyDataBlockToPayload(stmt->pSql, pDataBlock); if (code != TSDB_CODE_SUCCESS) { return code; @@ -1112,7 +1112,7 @@ static int insertStmtExecute(STscStmt* stmt) { pCmd->numOfTables = 0; tfree(pCmd->pTableNameList); - pCmd->pDataBlocks = tscDestroyBlockArrayList(pCmd->pDataBlocks); + pCmd->insertParam.pDataBlocks = tscDestroyBlockArrayList(pCmd->insertParam.pDataBlocks); return pSql->res.code; } @@ -1120,7 +1120,7 @@ static int insertStmtExecute(STscStmt* stmt) { static void insertBatchClean(STscStmt* pStmt) { SSqlCmd *pCmd = &pStmt->pSql->cmd; SSqlObj *pSql = pStmt->pSql; - int32_t size = taosHashGetSize(pCmd->pTableBlockHashList); + int32_t size = taosHashGetSize(pCmd->insertParam.pTableBlockHashList); // data block reset pCmd->batchSize = 0; @@ -1134,7 +1134,7 @@ static void insertBatchClean(STscStmt* pStmt) { tfree(pCmd->pTableNameList); /* - STableDataBlocks** p = taosHashIterate(pCmd->pTableBlockHashList, NULL); + STableDataBlocks** p = taosHashIterate(pCmd->insertParam.pTableBlockHashList, NULL); STableDataBlocks* pOneTableBlock = *p; @@ -1145,7 +1145,7 @@ static void insertBatchClean(STscStmt* pStmt) { pBlocks->numOfRows = 0; - p = taosHashIterate(pCmd->pTableBlockHashList, p); + p = taosHashIterate(pCmd->insertParam.pTableBlockHashList, p); if (p == NULL) { break; } @@ -1154,10 +1154,10 @@ static void insertBatchClean(STscStmt* pStmt) { } */ - pCmd->pDataBlocks = tscDestroyBlockArrayList(pCmd->pDataBlocks); + pCmd->insertParam.pDataBlocks = tscDestroyBlockArrayList(pCmd->insertParam.pDataBlocks); pCmd->numOfTables = 0; - taosHashEmpty(pCmd->pTableBlockHashList); + taosHashEmpty(pCmd->insertParam.pTableBlockHashList); tscFreeSqlResult(pSql); tscFreeSubobj(pSql); tfree(pSql->pSubs); @@ -1376,7 +1376,7 @@ int taos_stmt_set_tbname(TAOS_STMT* stmt, const char* name) { SSubmitBlk* pBlk = (SSubmitBlk*) (*t1)->pData; pCmd->batchSize = pBlk->numOfRows; - taosHashPut(pCmd->pTableBlockHashList, (void *)&pStmt->mtb.currentUid, sizeof(pStmt->mtb.currentUid), (void*)t1, POINTER_BYTES); + taosHashPut(pCmd->insertParam.pTableBlockHashList, (void *)&pStmt->mtb.currentUid, sizeof(pStmt->mtb.currentUid), (void*)t1, POINTER_BYTES); tscDebug("0x%"PRIx64" table:%s is already prepared, uid:%" PRIu64, pSql->self, name, pStmt->mtb.currentUid); return TSDB_CODE_SUCCESS; @@ -1391,11 +1391,11 @@ int taos_stmt_set_tbname(TAOS_STMT* stmt, const char* name) { pSql->cmd.numOfParams = 0; pSql->cmd.batchSize = 0; - if (taosHashGetSize(pCmd->pTableBlockHashList) > 0) { - SHashObj* hashList = pCmd->pTableBlockHashList; - pCmd->pTableBlockHashList = NULL; + if (taosHashGetSize(pCmd->insertParam.pTableBlockHashList) > 0) { + SHashObj* hashList = pCmd->insertParam.pTableBlockHashList; + pCmd->insertParam.pTableBlockHashList = NULL; tscResetSqlCmd(pCmd, true); - pCmd->pTableBlockHashList = hashList; + pCmd->insertParam.pTableBlockHashList = hashList; } int32_t code = tsParseSql(pStmt->pSql, true); @@ -1411,7 +1411,7 @@ int taos_stmt_set_tbname(TAOS_STMT* stmt, const char* name) { STableMeta* pTableMeta = pTableMetaInfo->pTableMeta; STableDataBlocks* pBlock = NULL; - code = tscGetDataBlockFromList(pCmd->pTableBlockHashList, pTableMeta->id.uid, TSDB_PAYLOAD_SIZE, sizeof(SSubmitBlk), + code = tscGetDataBlockFromList(pCmd->insertParam.pTableBlockHashList, pTableMeta->id.uid, TSDB_PAYLOAD_SIZE, sizeof(SSubmitBlk), pTableMeta->tableInfo.rowSize, &pTableMetaInfo->name, pTableMeta, &pBlock, NULL); if (code != TSDB_CODE_SUCCESS) { return code; @@ -1688,14 +1688,14 @@ int taos_stmt_get_param(TAOS_STMT *stmt, int idx, int *type, int *bytes) { SSqlCmd* pCmd = &pStmt->pSql->cmd; STableMetaInfo* pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, 0); STableMeta* pTableMeta = pTableMetaInfo->pTableMeta; - if (pCmd->pTableBlockHashList == NULL) { - pCmd->pTableBlockHashList = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, false); + if (pCmd->insertParam.pTableBlockHashList == NULL) { + pCmd->insertParam.pTableBlockHashList = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, false); } STableDataBlocks* pBlock = NULL; int32_t ret = - tscGetDataBlockFromList(pCmd->pTableBlockHashList, pTableMeta->id.uid, TSDB_PAYLOAD_SIZE, sizeof(SSubmitBlk), + tscGetDataBlockFromList(pCmd->insertParam.pTableBlockHashList, pTableMeta->id.uid, TSDB_PAYLOAD_SIZE, sizeof(SSubmitBlk), pTableMeta->tableInfo.rowSize, &pTableMetaInfo->name, pTableMeta, &pBlock, NULL); if (ret != 0) { // todo handle error diff --git a/src/client/src/tscSQLParser.c b/src/client/src/tscSQLParser.c index ae079e7517..0c77b5da0a 100644 --- a/src/client/src/tscSQLParser.c +++ b/src/client/src/tscSQLParser.c @@ -293,7 +293,7 @@ int32_t tscToSQLCmd(SSqlObj* pSql, struct SSqlInfo* pInfo) { return tscSQLSyntaxErrMsg(tscGetErrorMsgPayload(pCmd), NULL, pInfo->msg); } - SQueryInfo* pQueryInfo = tscGetQueryInfoS(pCmd, pCmd->clauseIndex); + SQueryInfo* pQueryInfo = tscGetQueryInfoS(pCmd); if (pQueryInfo == NULL) { pRes->code = terrno; return pRes->code; @@ -656,7 +656,6 @@ int32_t tscToSQLCmd(SSqlObj* pSql, struct SSqlInfo* pInfo) { } tscPrintSelNodeList(pSql, i); - pCmd->clauseIndex += 1; if ((i + 1) < size && pQueryInfo->sibling == NULL) { if ((code = tscAddQueryInfo(pCmd)) != TSDB_CODE_SUCCESS) { @@ -671,9 +670,6 @@ int32_t tscToSQLCmd(SSqlObj* pSql, struct SSqlInfo* pInfo) { return code; } - // restore the clause index - pCmd->clauseIndex = 0; - // set the command/global limit parameters from the first subclause to the sqlcmd object pCmd->active = pCmd->pQueryInfo; pCmd->command = pCmd->pQueryInfo->command; diff --git a/src/client/src/tscServer.c b/src/client/src/tscServer.c index 289ce525f2..aaf40ec5e3 100644 --- a/src/client/src/tscServer.c +++ b/src/client/src/tscServer.c @@ -584,22 +584,22 @@ int tscBuildSubmitMsg(SSqlObj *pSql, SSqlInfo *pInfo) { SQueryInfo *pQueryInfo = tscGetQueryInfo(&pSql->cmd); STableMeta* pTableMeta = tscGetMetaInfo(pQueryInfo, 0)->pTableMeta; - char* pMsg = pSql->cmd.payload; - - // NOTE: shell message size should not include SMsgDesc - int32_t size = pSql->cmd.payloadLen - sizeof(SMsgDesc); - - SMsgDesc* pMsgDesc = (SMsgDesc*) pMsg; - pMsgDesc->numOfVnodes = htonl(1); // always one vnode - - pMsg += sizeof(SMsgDesc); - SSubmitMsg *pShellMsg = (SSubmitMsg *)pMsg; - - pShellMsg->header.vgId = htonl(pTableMeta->vgId); - pShellMsg->header.contLen = htonl(size); // the length not includes the size of SMsgDesc - pShellMsg->length = pShellMsg->header.contLen; - - pShellMsg->numOfBlocks = htonl(pSql->cmd.numOfTablesInSubmit); // number of tables to be inserted +// char* pMsg = pSql->cmd.payload; +// +// // NOTE: shell message size should not include SMsgDesc +// int32_t size = pSql->cmd.payloadLen - sizeof(SMsgDesc); +// +// SMsgDesc* pMsgDesc = (SMsgDesc*) pMsg; +// pMsgDesc->numOfVnodes = htonl(1); // always one vnode +// +// pMsg += sizeof(SMsgDesc); +// SSubmitMsg *pShellMsg = (SSubmitMsg *)pMsg; +// +// pShellMsg->header.vgId = htonl(pTableMeta->vgId); // data in current block all routes to the same vgroup +// pShellMsg->header.contLen = htonl(size); // the length not includes the size of SMsgDesc +// pShellMsg->length = pShellMsg->header.contLen; +// +// pShellMsg->numOfBlocks = htonl(pSql->cmd.numOfTablesInSubmit); // the number of tables to be inserted // pSql->cmd.payloadLen is set during copying data into payload pSql->cmd.msgType = TSDB_MSG_TYPE_SUBMIT; @@ -608,15 +608,15 @@ int tscBuildSubmitMsg(SSqlObj *pSql, SSqlInfo *pInfo) { taosHashGetClone(tscVgroupMap, &pTableMeta->vgId, sizeof(pTableMeta->vgId), NULL, &vgroupInfo, sizeof(SNewVgroupInfo)); tscDumpEpSetFromVgroupInfo(&pSql->epSet, &vgroupInfo); - tscDebug("0x%"PRIx64" build submit msg, vgId:%d numOfTables:%d numberOfEP:%d", pSql->self, pTableMeta->vgId, pSql->cmd.numOfTablesInSubmit, - pSql->epSet.numOfEps); + tscDebug("0x%"PRIx64" submit msg built, numberOfEP:%d", pSql->self, pSql->epSet.numOfEps); + return TSDB_CODE_SUCCESS; } /* * for table query, simply return the size <= 1k */ -static int32_t tscEstimateQueryMsgSize(SSqlObj *pSql, int32_t clauseIndex) { +static int32_t tscEstimateQueryMsgSize(SSqlObj *pSql) { const static int32_t MIN_QUERY_MSG_PKT_SIZE = TSDB_MAX_BYTES_PER_ROW * 5; SSqlCmd* pCmd = &pSql->cmd; @@ -815,7 +815,7 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) { SSqlCmd *pCmd = &pSql->cmd; int32_t code = TSDB_CODE_SUCCESS; - int32_t size = tscEstimateQueryMsgSize(pSql, pCmd->clauseIndex); + int32_t size = tscEstimateQueryMsgSize(pSql); if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, size)) { tscError("%p failed to malloc for query msg", pSql); @@ -2155,7 +2155,7 @@ static void createHbObj(STscObj* pObj) { pSql->fp = tscProcessHeartBeatRsp; - SQueryInfo *pQueryInfo = tscGetQueryInfoS(&pSql->cmd, 0); + SQueryInfo *pQueryInfo = tscGetQueryInfoS(&pSql->cmd); if (pQueryInfo == NULL) { terrno = TSDB_CODE_TSC_OUT_OF_MEMORY; tfree(pSql); @@ -2369,7 +2369,7 @@ static int32_t getTableMetaFromMnode(SSqlObj *pSql, STableMetaInfo *pTableMetaIn tscAddQueryInfo(&pNew->cmd); - SQueryInfo *pNewQueryInfo = tscGetQueryInfoS(&pNew->cmd, 0); + SQueryInfo *pNewQueryInfo = tscGetQueryInfoS(&pNew->cmd); pNew->cmd.autoCreated = pSql->cmd.autoCreated; // create table if not exists if (TSDB_CODE_SUCCESS != tscAllocPayload(&pNew->cmd, TSDB_DEFAULT_PAYLOAD_SIZE + pSql->cmd.payloadLen)) { @@ -2592,7 +2592,7 @@ int tscGetSTableVgroupInfo(SSqlObj *pSql, SQueryInfo* pQueryInfo) { pNew->cmd.command = TSDB_SQL_STABLEVGROUP; // TODO TEST IT - SQueryInfo *pNewQueryInfo = tscGetQueryInfoS(&pNew->cmd, 0); + SQueryInfo *pNewQueryInfo = tscGetQueryInfoS(&pNew->cmd); if (pNewQueryInfo == NULL) { tscFreeSqlObj(pNew); return code; diff --git a/src/client/src/tscSql.c b/src/client/src/tscSql.c index 8a70502da6..09765f5ce6 100644 --- a/src/client/src/tscSql.c +++ b/src/client/src/tscSql.c @@ -901,9 +901,9 @@ int taos_validate_sql(TAOS *taos, const char *sql) { strtolower(pSql->sqlstr, sql); pCmd->curSql = NULL; - if (NULL != pCmd->pTableBlockHashList) { - taosHashCleanup(pCmd->pTableBlockHashList); - pCmd->pTableBlockHashList = NULL; + if (NULL != pCmd->insertParam.pTableBlockHashList) { + taosHashCleanup(pCmd->insertParam.pTableBlockHashList); + pCmd->insertParam.pTableBlockHashList = NULL; } pSql->fp = asyncCallback; diff --git a/src/client/src/tscSubquery.c b/src/client/src/tscSubquery.c index 3909b81f59..2c6eb4f3e2 100644 --- a/src/client/src/tscSubquery.c +++ b/src/client/src/tscSubquery.c @@ -3056,7 +3056,7 @@ static void multiVnodeInsertFinalize(void* param, TAOS_RES* tres, int numOfRows) // clean up tableMeta in cache tscFreeQueryInfo(&pSql->cmd, false); - SQueryInfo* pQueryInfo = tscGetQueryInfoS(&pSql->cmd, 0); + SQueryInfo* pQueryInfo = tscGetQueryInfoS(&pSql->cmd); STableMetaInfo* pMasterTableMetaInfo = tscGetTableMetaInfoFromCmd(&pParentObj->cmd, 0); tscAddTableMetaInfo(pQueryInfo, &pMasterTableMetaInfo->name, NULL, NULL, NULL, NULL); @@ -3145,7 +3145,7 @@ int32_t tscHandleMultivnodeInsert(SSqlObj *pSql) { return TSDB_CODE_SUCCESS; } - pSql->subState.numOfSub = (uint16_t)taosArrayGetSize(pCmd->pDataBlocks); + pSql->subState.numOfSub = (uint16_t)taosArrayGetSize(pCmd->insertParam.pDataBlocks); assert(pSql->subState.numOfSub > 0); pRes->code = TSDB_CODE_SUCCESS; @@ -3195,7 +3195,7 @@ int32_t tscHandleMultivnodeInsert(SSqlObj *pSql) { pNew->fetchFp = pNew->fp; pSql->pSubs[numOfSub] = pNew; - STableDataBlocks* pTableDataBlock = taosArrayGetP(pCmd->pDataBlocks, numOfSub); + STableDataBlocks* pTableDataBlock = taosArrayGetP(pCmd->insertParam.pDataBlocks, numOfSub); pRes->code = tscCopyDataBlockToPayload(pNew, pTableDataBlock); if (pRes->code == TSDB_CODE_SUCCESS) { tscDebug("0x%"PRIx64" sub:%p create subObj success. orderOfSub:%d", pSql->self, pNew, numOfSub); @@ -3213,7 +3213,7 @@ int32_t tscHandleMultivnodeInsert(SSqlObj *pSql) { goto _error; } - pCmd->pDataBlocks = tscDestroyBlockArrayList(pCmd->pDataBlocks); + pCmd->insertParam.pDataBlocks = tscDestroyBlockArrayList(pCmd->insertParam.pDataBlocks); // use the local variable for (int32_t j = 0; j < numOfSub; ++j) { diff --git a/src/client/src/tscUtil.c b/src/client/src/tscUtil.c index 32c85ab43f..6f3fd1a253 100644 --- a/src/client/src/tscUtil.c +++ b/src/client/src/tscUtil.c @@ -1130,8 +1130,8 @@ void tscResetSqlCmd(SSqlCmd* pCmd, bool removeMeta) { destroyTableNameList(pCmd); - pCmd->pTableBlockHashList = tscDestroyBlockHashTable(pCmd->pTableBlockHashList, removeMeta); - pCmd->pDataBlocks = tscDestroyBlockArrayList(pCmd->pDataBlocks); + pCmd->insertParam.pTableBlockHashList = tscDestroyBlockHashTable(pCmd->insertParam.pTableBlockHashList, removeMeta); + pCmd->insertParam.pDataBlocks = tscDestroyBlockArrayList(pCmd->insertParam.pDataBlocks); tscFreeQueryInfo(pCmd, removeMeta); if (pCmd->pTableMetaMap != NULL) { @@ -1343,12 +1343,9 @@ int32_t tscCopyDataBlockToPayload(SSqlObj* pSql, STableDataBlocks* pDataBlock) { SSqlCmd* pCmd = &pSql->cmd; assert(pDataBlock->pTableMeta != NULL); - pCmd->numOfTablesInSubmit = pDataBlock->numOfTables; - -// assert(pCmd->numOfClause == 1); STableMetaInfo* pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, 0); - // todo refactor + // todo remove it later // set the correct table meta object, the table meta has been locked in pDataBlocks, so it must be in the cache if (pTableMetaInfo->pTableMeta != pDataBlock->pTableMeta) { tNameAssign(&pTableMetaInfo->name, &pDataBlock->tableName); @@ -1358,13 +1355,13 @@ int32_t tscCopyDataBlockToPayload(SSqlObj* pSql, STableDataBlocks* pDataBlock) { } pTableMetaInfo->pTableMeta = tscTableMetaDup(pDataBlock->pTableMeta); - pTableMetaInfo->tableMetaSize = tscGetTableMetaSize(pDataBlock->pTableMeta); + pTableMetaInfo->tableMetaSize = tscGetTableMetaSize(pDataBlock->pTableMeta); } /* - * the submit message consists of : [RPC header|message body|digest] - * the dataBlock only includes the RPC Header buffer and actual submit message body, space for digest needs - * additional space. + * the format of submit message is as follows [RPC header|message body|digest] + * the dataBlock only includes the RPC Header buffer and actual submit message body, + * space for digest needs additional space. */ int ret = tscAllocPayload(pCmd, pDataBlock->size + 100); if (TSDB_CODE_SUCCESS != ret) { @@ -1374,13 +1371,24 @@ int32_t tscCopyDataBlockToPayload(SSqlObj* pSql, STableDataBlocks* pDataBlock) { assert(pDataBlock->size <= pDataBlock->nAllocSize); memcpy(pCmd->payload, pDataBlock->pData, pDataBlock->size); - /* - * the payloadLen should be actual message body size - * the old value of payloadLen is the allocated payload size - */ + //the payloadLen should be actual message body size, the old value of payloadLen is the allocated payload size pCmd->payloadLen = pDataBlock->size; + // NOTE: shell message size should not include SMsgDesc + int32_t size = pCmd->payloadLen - sizeof(SMsgDesc); + + SMsgDesc* pMsgDesc = (SMsgDesc*) pCmd->payload; + pMsgDesc->numOfVnodes = htonl(1); // always for one vnode + + SSubmitMsg *pShellMsg = (SSubmitMsg *)(pCmd->payload + sizeof(SMsgDesc)); + pShellMsg->header.vgId = htonl(pDataBlock->pTableMeta->vgId); // data in current block all routes to the same vgroup + pShellMsg->header.contLen = htonl(size); // the length not includes the size of SMsgDesc + pShellMsg->length = pShellMsg->header.contLen; + pShellMsg->numOfBlocks = htonl(pDataBlock->numOfTables); // the number of tables to be inserted + assert(pCmd->allocSize >= (uint32_t)(pCmd->payloadLen + 100) && pCmd->payloadLen > 0); + + tscDebug("0x%"PRIx64" submit msg built, vgId:%d numOfTables:%d", pSql->self, pDataBlock->pTableMeta->vgId, pDataBlock->numOfTables); return TSDB_CODE_SUCCESS; } @@ -1542,25 +1550,25 @@ static int32_t getRowExpandSize(STableMeta* pTableMeta) { } static void extractTableNameList(SSqlCmd* pCmd, bool freeBlockMap) { - pCmd->numOfTables = (int32_t) taosHashGetSize(pCmd->pTableBlockHashList); + pCmd->numOfTables = (int32_t) taosHashGetSize(pCmd->insertParam.pTableBlockHashList); if (pCmd->pTableNameList == NULL) { pCmd->pTableNameList = calloc(pCmd->numOfTables, POINTER_BYTES); } else { memset(pCmd->pTableNameList, 0, pCmd->numOfTables * POINTER_BYTES); } - STableDataBlocks **p1 = taosHashIterate(pCmd->pTableBlockHashList, NULL); + STableDataBlocks **p1 = taosHashIterate(pCmd->insertParam.pTableBlockHashList, NULL); int32_t i = 0; while(p1) { STableDataBlocks* pBlocks = *p1; tfree(pCmd->pTableNameList[i]); pCmd->pTableNameList[i++] = tNameDup(&pBlocks->tableName); - p1 = taosHashIterate(pCmd->pTableBlockHashList, p1); + p1 = taosHashIterate(pCmd->insertParam.pTableBlockHashList, p1); } if (freeBlockMap) { - pCmd->pTableBlockHashList = tscDestroyBlockHashTable(pCmd->pTableBlockHashList, false); + pCmd->insertParam.pTableBlockHashList = tscDestroyBlockHashTable(pCmd->insertParam.pTableBlockHashList, false); } } @@ -1571,7 +1579,7 @@ int32_t tscMergeTableDataBlocks(SSqlObj* pSql, bool freeBlockMap) { void* pVnodeDataBlockHashList = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, false); SArray* pVnodeDataBlockList = taosArrayInit(8, POINTER_BYTES); - STableDataBlocks** p = taosHashIterate(pCmd->pTableBlockHashList, NULL); + STableDataBlocks** p = taosHashIterate(pCmd->insertParam.pTableBlockHashList, NULL); STableDataBlocks* pOneTableBlock = *p; while(pOneTableBlock) { @@ -1642,7 +1650,7 @@ int32_t tscMergeTableDataBlocks(SSqlObj* pSql, bool freeBlockMap) { tscDebug("0x%"PRIx64" table %s data block is empty", pSql->self, pOneTableBlock->tableName.tname); } - p = taosHashIterate(pCmd->pTableBlockHashList, p); + p = taosHashIterate(pCmd->insertParam.pTableBlockHashList, p); if (p == NULL) { break; } @@ -1653,7 +1661,7 @@ int32_t tscMergeTableDataBlocks(SSqlObj* pSql, bool freeBlockMap) { extractTableNameList(pCmd, freeBlockMap); // free the table data blocks; - pCmd->pDataBlocks = pVnodeDataBlockList; + pCmd->insertParam.pDataBlocks = pVnodeDataBlockList; taosHashCleanup(pVnodeDataBlockHashList); return TSDB_CODE_SUCCESS; @@ -1848,7 +1856,6 @@ void* sqlExprDestroy(SExprInfo* pExpr) { tExprTreeDestroy(pExpr->pExpr, NULL); } - printf("free---------------%p\n", pExpr); tfree(pExpr); return NULL; } @@ -1918,7 +1925,6 @@ SExprInfo* tscExprCreate(SQueryInfo* pQueryInfo, int16_t functionId, SColumnInde return NULL; } - printf("malloc======================%p\n", pExpr); SSqlExpr* p = &pExpr->base; p->functionId = functionId; @@ -2625,7 +2631,7 @@ STableMetaInfo* tscGetMetaInfo(SQueryInfo* pQueryInfo, int32_t tableIndex) { return pQueryInfo->pTableMetaInfo[tableIndex]; } -SQueryInfo* tscGetQueryInfoS(SSqlCmd* pCmd, int32_t subClauseIndex) { +SQueryInfo* tscGetQueryInfoS(SSqlCmd* pCmd) { SQueryInfo* pQueryInfo = tscGetQueryInfo(pCmd); int32_t ret = TSDB_CODE_SUCCESS; @@ -3025,9 +3031,7 @@ SSqlObj* createSimpleSubObj(SSqlObj* pSql, __async_cb_func_t fp, void* param, in pNew->sqlstr = NULL; pNew->maxRetry = TSDB_MAX_REPLICA; - SQueryInfo* pQueryInfo = tscGetQueryInfoS(pCmd, 0); - - assert(pSql->cmd.clauseIndex == 0); + SQueryInfo* pQueryInfo = tscGetQueryInfoS(pCmd); STableMetaInfo* pMasterTableMetaInfo = tscGetTableMetaInfoFromCmd(&pSql->cmd, 0); tscAddTableMetaInfo(pQueryInfo, &pMasterTableMetaInfo->name, NULL, NULL, NULL, NULL); @@ -3102,7 +3106,6 @@ SSqlObj* createSubqueryObj(SSqlObj* pSql, int16_t tableIndex, __async_cb_func_t pnCmd->pTableMetaMap = NULL; pnCmd->pQueryInfo = NULL; - pnCmd->clauseIndex = 0; pnCmd->pDataBlocks = NULL; pnCmd->numOfTables = 0; @@ -3401,71 +3404,6 @@ void executeQuery(SSqlObj* pSql, SQueryInfo* pQueryInfo) { doExecuteQuery(pSql, pQueryInfo); } -/** - * todo remove it - * To decide if current is a two-stage super table query, join query, or insert. And invoke different - * procedure accordingly - * @param pSql - */ -void tscDoQuery(SSqlObj* pSql) { - SSqlCmd* pCmd = &pSql->cmd; - SSqlRes* pRes = &pSql->res; - - pRes->code = TSDB_CODE_SUCCESS; - - if (pCmd->command > TSDB_SQL_LOCAL) { - tscProcessLocalCmd(pSql); - return; - } - - if (pCmd->dataSourceType == DATA_FROM_DATA_FILE) { - tscImportDataFromFile(pSql); - } else { - SQueryInfo *pQueryInfo = tscGetQueryInfo(pCmd); - uint16_t type = pQueryInfo->type; - - if ((pCmd->command == TSDB_SQL_SELECT) && (!TSDB_QUERY_HAS_TYPE(type, TSDB_QUERY_TYPE_SUBQUERY)) && (!TSDB_QUERY_HAS_TYPE(type, TSDB_QUERY_TYPE_STABLE_SUBQUERY))) { - tscAddIntoSqlList(pSql); - } - - if (TSDB_QUERY_HAS_TYPE(type, TSDB_QUERY_TYPE_INSERT)) { // multi-vnodes insertion - tscHandleMultivnodeInsert(pSql); - return; - } - - if (QUERY_IS_JOIN_QUERY(type)) { - if (!TSDB_QUERY_HAS_TYPE(type, TSDB_QUERY_TYPE_SUBQUERY)) { - tscHandleMasterJoinQuery(pSql); - } else { // for first stage sub query, iterate all vnodes to get all timestamp - if (!TSDB_QUERY_HAS_TYPE(type, TSDB_QUERY_TYPE_JOIN_SEC_STAGE)) { - tscBuildAndSendRequest(pSql, NULL); - } else { // secondary stage join query. - if (tscIsTwoStageSTableQuery(pQueryInfo, 0)) { // super table query - tscLockByThread(&pSql->squeryLock); - tscHandleMasterSTableQuery(pSql); - tscUnlockByThread(&pSql->squeryLock); - } else { - tscBuildAndSendRequest(pSql, NULL); - } - } - } - - return; - } else if (tscMultiRoundQuery(pQueryInfo, 0) && pQueryInfo->round == 0) { - tscHandleFirstRoundStableQuery(pSql); // todo lock? - return; - } else if (tscIsTwoStageSTableQuery(pQueryInfo, 0)) { // super table query - tscLockByThread(&pSql->squeryLock); - tscHandleMasterSTableQuery(pSql); - tscUnlockByThread(&pSql->squeryLock); - return; - } - - pCmd->active = pQueryInfo; - tscBuildAndSendRequest(pSql, NULL); - } -} - int16_t tscGetJoinTagColIdByUid(STagCond* pTagCond, uint64_t uid) { int32_t i = 0; while (i < TSDB_MAX_JOIN_TABLE_NUM) { @@ -3687,7 +3625,6 @@ void tscTryQueryNextClause(SSqlObj* pSql, __async_cb_func_t fp) { SSqlCmd* pCmd = &pSql->cmd; SSqlRes* pRes = &pSql->res; - pCmd->clauseIndex++; SQueryInfo* pQueryInfo = tscGetQueryInfo(pCmd); pSql->cmd.command = pQueryInfo->command; @@ -3708,7 +3645,7 @@ void tscTryQueryNextClause(SSqlObj* pSql, __async_cb_func_t fp) { pSql->subState.numOfSub = 0; pSql->fp = fp; - tscDebug("0x%"PRIx64" try data in the next subclause:%d", pSql->self, pCmd->clauseIndex); + tscDebug("0x%"PRIx64" try data in the next subclause", pSql->self); if (pCmd->command > TSDB_SQL_LOCAL) { tscProcessLocalCmd(pSql); } else { @@ -4283,7 +4220,7 @@ int tscTransferTableNameList(SSqlObj *pSql, const char *pNameList, int32_t lengt int code = TSDB_CODE_TSC_INVALID_TABLE_ID_LENGTH; char *str = (char *)pNameList; - SQueryInfo *pQueryInfo = tscGetQueryInfoS(pCmd, pCmd->clauseIndex); + SQueryInfo *pQueryInfo = tscGetQueryInfoS(pCmd); if (pQueryInfo == NULL) { pSql->res.code = terrno; return terrno;