diff --git a/include/libs/planner/plannerOp.h b/include/libs/planner/plannerOp.h index 5cc896f1c2..2793e72635 100644 --- a/include/libs/planner/plannerOp.h +++ b/include/libs/planner/plannerOp.h @@ -28,6 +28,7 @@ OP_ENUM_MACRO(DataBlocksOptScan) OP_ENUM_MACRO(TableSeqScan) OP_ENUM_MACRO(TagScan) OP_ENUM_MACRO(SystemTableScan) +OP_ENUM_MACRO(StreamBlockScan) OP_ENUM_MACRO(Aggregate) OP_ENUM_MACRO(Project) // OP_ENUM_MACRO(Groupby) diff --git a/source/client/test/clientTests.cpp b/source/client/test/clientTests.cpp index 6f9715a09f..440ef0d728 100644 --- a/source/client/test/clientTests.cpp +++ b/source/client/test/clientTests.cpp @@ -650,7 +650,13 @@ TEST(testCase, agg_query_tables) { TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); ASSERT_NE(pConn, nullptr); - TAOS_RES* pRes = taos_query(pConn, "use abc1"); + TAOS_RES* pRes = taos_query(pConn, "use dbv"); + taos_free_result(pRes); + + pRes = taos_query(pConn, "create table tx using st tags(111111111111111)"); + if (taos_errno(pRes) != 0) { + printf("failed to create table, reason:%s\n", taos_errstr(pRes)); + } taos_free_result(pRes); pRes = taos_query(pConn, "select count(*) from tu"); diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h index dd633002db..2fe3392b25 100644 --- a/source/libs/executor/inc/executorimpl.h +++ b/source/libs/executor/inc/executorimpl.h @@ -376,7 +376,7 @@ typedef struct STaskParam { typedef struct SExchangeInfo { SArray *pSources; - int32_t bytes; // total load bytes from remote + uint64_t bytes; // total load bytes from remote tsem_t ready; void *pTransporter; SRetrieveTableRsp *pRsp; @@ -385,7 +385,7 @@ typedef struct SExchangeInfo { typedef struct STableScanInfo { void *pTsdbReadHandle; - int32_t numOfBlocks; + int32_t numOfBlocks; // extract basic running information. int32_t numOfSkipped; int32_t numOfBlockStatis; int64_t numOfRows; @@ -415,7 +415,11 @@ typedef struct STagScanInfo { } STagScanInfo; typedef struct SStreamBlockScanInfo { - + SSDataBlock *pRes; // result SSDataBlock + SColumnInfo *pCols; // the output column info + uint64_t numOfRows; // total scanned rows + uint64_t numOfExec; // execution times + void *readerHandle;// stream block reader handle } SStreamBlockScanInfo; typedef struct SOptrBasicInfo { @@ -423,7 +427,6 @@ typedef struct SOptrBasicInfo { int32_t *rowCellInfoOffset; // offset value for each row result cell info SQLFunctionCtx *pCtx; SSDataBlock *pRes; - void *keyBuf; } SOptrBasicInfo; typedef struct SOptrBasicInfo STableIntervalOperatorInfo; diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index 2b5faba29d..dc4d9c7238 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -12,7 +12,8 @@ * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see . */ -#include +#include "parser.h" +#include "tq.h" #include "exception.h" #include "os.h" #include "tglobal.h" @@ -3576,7 +3577,7 @@ void setDefaultOutputBuf_rv(SAggOperatorInfo* pAggInfo, int64_t uid, int32_t sta SResultRowInfo* pResultRowInfo = &pInfo->resultRowInfo; int64_t tid = 0; - pInfo->keyBuf = realloc(pInfo->keyBuf, sizeof(tid) + sizeof(int64_t) + POINTER_BYTES); + pAggInfo->keyBuf = realloc(pAggInfo->keyBuf, sizeof(tid) + sizeof(int64_t) + POINTER_BYTES); SResultRow* pRow = doSetResultOutBufByKey_rv(pResultRowInfo, tid, (char *)&tid, sizeof(tid), true, uid, pTaskInfo, false, pAggInfo); for (int32_t i = 0; i < pDataBlock->info.numOfCols; ++i) { @@ -5061,6 +5062,42 @@ static SSDataBlock* doBlockInfoScan(void* param, bool* newgroup) { #endif } +static SSDataBlock* doStreamBlockScan(void* param, bool* newgroup) { + SOperatorInfo* pOperator = (SOperatorInfo*)param; + + // NOTE: this operator never check if current status is done or not + SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; + SStreamBlockScanInfo* pInfo = pOperator->info; + + SDataBlockInfo* pBlockInfo = &pInfo->pRes->info; + while (tqNextDataBlock(pInfo->readerHandle)) { + pTaskInfo->code = tqRetrieveDataBlockInfo(pInfo->readerHandle, pBlockInfo); + if (pTaskInfo->code != TSDB_CODE_SUCCESS) { + terrno = pTaskInfo->code; + return NULL; + } + + if (pBlockInfo->rows == 0) { + return NULL; + } + + pInfo->pRes->pDataBlock = tqRetrieveDataBlock(pInfo->readerHandle); + if (pInfo->pRes->pDataBlock == NULL) { + // TODO add log + pTaskInfo->code = terrno; + return NULL; + } + + break; + } + + // record the scan action. + pInfo->numOfExec++; + pInfo->numOfRows += pBlockInfo->rows; + + return (pBlockInfo->rows == 0)? NULL:pInfo->pRes; +} + int32_t loadRemoteDataCallback(void* param, const SDataBuf* pMsg, int32_t code) { SExchangeInfo* pEx = (SExchangeInfo*) param; pEx->pRsp = pMsg->pData; @@ -5263,7 +5300,6 @@ SOperatorInfo* createTableScanOperatorInfo(void* pTsdbReadHandle, int32_t order, STableScanInfo* pInfo = calloc(1, sizeof(STableScanInfo)); SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo)); - if (pInfo == NULL || pOperator == NULL) { tfree(pInfo); tfree(pOperator); @@ -5371,8 +5407,26 @@ SOperatorInfo* createTableBlockInfoScanOperator(void* pTsdbReadHandle, STaskRunt return pOperator; } -SOperatorInfo* createSubmitBlockScanOperatorInfo(void *pSubmitBlockReadHandle, int32_t numOfOutput, SExecTaskInfo* pTaskInfo) { +SOperatorInfo* createStreamBlockScanOperatorInfo(void *pStreamBlockHandle, int32_t numOfOutput, SExecTaskInfo* pTaskInfo) { + SStreamBlockScanInfo* pInfo = calloc(1, sizeof(SStreamBlockScanInfo)); + SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo)); + if (pInfo == NULL || pOperator == NULL) { + tfree(pInfo); + tfree(pOperator); + terrno = TSDB_CODE_QRY_OUT_OF_MEMORY; + return NULL; + } + pInfo->readerHandle = pStreamBlockHandle; + + pOperator->name = "StreamBlockScanOperator"; + pOperator->operatorType = OP_StreamBlockScan; + pOperator->blockingOptr = false; + pOperator->status = OP_IN_EXECUTING; + pOperator->info = pInfo; + pOperator->numOfOutput = numOfOutput; + pOperator->exec = doStreamBlockScan; + pOperator->pTaskInfo = pTaskInfo; } diff --git a/source/libs/parser/src/dCDAstProcess.c b/source/libs/parser/src/dCDAstProcess.c index 662e550655..5852678880 100644 --- a/source/libs/parser/src/dCDAstProcess.c +++ b/source/libs/parser/src/dCDAstProcess.c @@ -680,10 +680,9 @@ int32_t doCheckAndBuildCreateTableReq(SCreateTableSql* pCreateTable, SParseConte serializeVgroupTablesBatchImpl(&tbatch, pBufArray); destroyCreateTbReqBatch(&tbatch); - } else { // it is a child table, created according to a super table code = doCheckAndBuildCreateCTableReq(pCreateTable, pCtx, pMsgBuf, &pBufArray); - if (code != 0) { + if (code != TSDB_CODE_SUCCESS) { return code; } } diff --git a/source/libs/parser/src/parserUtil.c b/source/libs/parser/src/parserUtil.c index 1793c85e30..b8545b7486 100644 --- a/source/libs/parser/src/parserUtil.c +++ b/source/libs/parser/src/parserUtil.c @@ -1656,7 +1656,7 @@ static FORCE_INLINE int32_t checkAndTrimValue(SToken* pToken, uint32_t type, cha } // Remove quotation marks - if (TK_STRING == type) { + if (TSDB_DATA_TYPE_BINARY == type) { if (pToken->n >= TSDB_MAX_BYTES_PER_ROW) { return buildSyntaxErrMsg(pMsgBuf, "too long string", pToken->z); } diff --git a/tests/test/c/create_table.c b/tests/test/c/create_table.c index d387bf483b..2dd5f04a58 100644 --- a/tests/test/c/create_table.c +++ b/tests/test/c/create_table.c @@ -84,7 +84,7 @@ void createDbAndStb() { } taos_free_result(pRes); - sprintf(qstr, "create table %s (ts timestamp, i int) tags (j int)", stbName); + sprintf(qstr, "create table %s (ts timestamp, i int) tags (j bigint)", stbName); pRes = taos_query(con, qstr); code = taos_errno(pRes); if (code != 0) {