[td-11818]fix bug in creating table.

This commit is contained in:
Haojun Liao 2022-01-20 22:57:29 +08:00
parent cc80848b56
commit 5a3a0f3d6c
7 changed files with 76 additions and 13 deletions

View File

@ -28,6 +28,7 @@ OP_ENUM_MACRO(DataBlocksOptScan)
OP_ENUM_MACRO(TableSeqScan) OP_ENUM_MACRO(TableSeqScan)
OP_ENUM_MACRO(TagScan) OP_ENUM_MACRO(TagScan)
OP_ENUM_MACRO(SystemTableScan) OP_ENUM_MACRO(SystemTableScan)
OP_ENUM_MACRO(StreamBlockScan)
OP_ENUM_MACRO(Aggregate) OP_ENUM_MACRO(Aggregate)
OP_ENUM_MACRO(Project) OP_ENUM_MACRO(Project)
// OP_ENUM_MACRO(Groupby) // OP_ENUM_MACRO(Groupby)

View File

@ -650,7 +650,13 @@ TEST(testCase, agg_query_tables) {
TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
ASSERT_NE(pConn, nullptr); ASSERT_NE(pConn, nullptr);
TAOS_RES* pRes = taos_query(pConn, "use abc1"); TAOS_RES* pRes = taos_query(pConn, "use dbv");
taos_free_result(pRes);
pRes = taos_query(pConn, "create table tx using st tags(111111111111111)");
if (taos_errno(pRes) != 0) {
printf("failed to create table, reason:%s\n", taos_errstr(pRes));
}
taos_free_result(pRes); taos_free_result(pRes);
pRes = taos_query(pConn, "select count(*) from tu"); pRes = taos_query(pConn, "select count(*) from tu");

View File

@ -376,7 +376,7 @@ typedef struct STaskParam {
typedef struct SExchangeInfo { typedef struct SExchangeInfo {
SArray *pSources; SArray *pSources;
int32_t bytes; // total load bytes from remote uint64_t bytes; // total load bytes from remote
tsem_t ready; tsem_t ready;
void *pTransporter; void *pTransporter;
SRetrieveTableRsp *pRsp; SRetrieveTableRsp *pRsp;
@ -385,7 +385,7 @@ typedef struct SExchangeInfo {
typedef struct STableScanInfo { typedef struct STableScanInfo {
void *pTsdbReadHandle; void *pTsdbReadHandle;
int32_t numOfBlocks; int32_t numOfBlocks; // extract basic running information.
int32_t numOfSkipped; int32_t numOfSkipped;
int32_t numOfBlockStatis; int32_t numOfBlockStatis;
int64_t numOfRows; int64_t numOfRows;
@ -415,7 +415,11 @@ typedef struct STagScanInfo {
} STagScanInfo; } STagScanInfo;
typedef struct SStreamBlockScanInfo { typedef struct SStreamBlockScanInfo {
SSDataBlock *pRes; // result SSDataBlock
SColumnInfo *pCols; // the output column info
uint64_t numOfRows; // total scanned rows
uint64_t numOfExec; // execution times
void *readerHandle;// stream block reader handle
} SStreamBlockScanInfo; } SStreamBlockScanInfo;
typedef struct SOptrBasicInfo { typedef struct SOptrBasicInfo {
@ -423,7 +427,6 @@ typedef struct SOptrBasicInfo {
int32_t *rowCellInfoOffset; // offset value for each row result cell info int32_t *rowCellInfoOffset; // offset value for each row result cell info
SQLFunctionCtx *pCtx; SQLFunctionCtx *pCtx;
SSDataBlock *pRes; SSDataBlock *pRes;
void *keyBuf;
} SOptrBasicInfo; } SOptrBasicInfo;
typedef struct SOptrBasicInfo STableIntervalOperatorInfo; typedef struct SOptrBasicInfo STableIntervalOperatorInfo;

View File

@ -12,7 +12,8 @@
* You should have received a copy of the GNU Affero General Public License * You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>. * along with this program. If not, see <http://www.gnu.org/licenses/>.
*/ */
#include <parser.h> #include "parser.h"
#include "tq.h"
#include "exception.h" #include "exception.h"
#include "os.h" #include "os.h"
#include "tglobal.h" #include "tglobal.h"
@ -3576,7 +3577,7 @@ void setDefaultOutputBuf_rv(SAggOperatorInfo* pAggInfo, int64_t uid, int32_t sta
SResultRowInfo* pResultRowInfo = &pInfo->resultRowInfo; SResultRowInfo* pResultRowInfo = &pInfo->resultRowInfo;
int64_t tid = 0; int64_t tid = 0;
pInfo->keyBuf = realloc(pInfo->keyBuf, sizeof(tid) + sizeof(int64_t) + POINTER_BYTES); pAggInfo->keyBuf = realloc(pAggInfo->keyBuf, sizeof(tid) + sizeof(int64_t) + POINTER_BYTES);
SResultRow* pRow = doSetResultOutBufByKey_rv(pResultRowInfo, tid, (char *)&tid, sizeof(tid), true, uid, pTaskInfo, false, pAggInfo); SResultRow* pRow = doSetResultOutBufByKey_rv(pResultRowInfo, tid, (char *)&tid, sizeof(tid), true, uid, pTaskInfo, false, pAggInfo);
for (int32_t i = 0; i < pDataBlock->info.numOfCols; ++i) { for (int32_t i = 0; i < pDataBlock->info.numOfCols; ++i) {
@ -5061,6 +5062,42 @@ static SSDataBlock* doBlockInfoScan(void* param, bool* newgroup) {
#endif #endif
} }
static SSDataBlock* doStreamBlockScan(void* param, bool* newgroup) {
SOperatorInfo* pOperator = (SOperatorInfo*)param;
// NOTE: this operator never check if current status is done or not
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
SStreamBlockScanInfo* pInfo = pOperator->info;
SDataBlockInfo* pBlockInfo = &pInfo->pRes->info;
while (tqNextDataBlock(pInfo->readerHandle)) {
pTaskInfo->code = tqRetrieveDataBlockInfo(pInfo->readerHandle, pBlockInfo);
if (pTaskInfo->code != TSDB_CODE_SUCCESS) {
terrno = pTaskInfo->code;
return NULL;
}
if (pBlockInfo->rows == 0) {
return NULL;
}
pInfo->pRes->pDataBlock = tqRetrieveDataBlock(pInfo->readerHandle);
if (pInfo->pRes->pDataBlock == NULL) {
// TODO add log
pTaskInfo->code = terrno;
return NULL;
}
break;
}
// record the scan action.
pInfo->numOfExec++;
pInfo->numOfRows += pBlockInfo->rows;
return (pBlockInfo->rows == 0)? NULL:pInfo->pRes;
}
int32_t loadRemoteDataCallback(void* param, const SDataBuf* pMsg, int32_t code) { int32_t loadRemoteDataCallback(void* param, const SDataBuf* pMsg, int32_t code) {
SExchangeInfo* pEx = (SExchangeInfo*) param; SExchangeInfo* pEx = (SExchangeInfo*) param;
pEx->pRsp = pMsg->pData; pEx->pRsp = pMsg->pData;
@ -5263,7 +5300,6 @@ SOperatorInfo* createTableScanOperatorInfo(void* pTsdbReadHandle, int32_t order,
STableScanInfo* pInfo = calloc(1, sizeof(STableScanInfo)); STableScanInfo* pInfo = calloc(1, sizeof(STableScanInfo));
SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo)); SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo));
if (pInfo == NULL || pOperator == NULL) { if (pInfo == NULL || pOperator == NULL) {
tfree(pInfo); tfree(pInfo);
tfree(pOperator); tfree(pOperator);
@ -5371,8 +5407,26 @@ SOperatorInfo* createTableBlockInfoScanOperator(void* pTsdbReadHandle, STaskRunt
return pOperator; return pOperator;
} }
SOperatorInfo* createSubmitBlockScanOperatorInfo(void *pSubmitBlockReadHandle, int32_t numOfOutput, SExecTaskInfo* pTaskInfo) { SOperatorInfo* createStreamBlockScanOperatorInfo(void *pStreamBlockHandle, int32_t numOfOutput, SExecTaskInfo* pTaskInfo) {
SStreamBlockScanInfo* pInfo = calloc(1, sizeof(SStreamBlockScanInfo));
SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo));
if (pInfo == NULL || pOperator == NULL) {
tfree(pInfo);
tfree(pOperator);
terrno = TSDB_CODE_QRY_OUT_OF_MEMORY;
return NULL;
}
pInfo->readerHandle = pStreamBlockHandle;
pOperator->name = "StreamBlockScanOperator";
pOperator->operatorType = OP_StreamBlockScan;
pOperator->blockingOptr = false;
pOperator->status = OP_IN_EXECUTING;
pOperator->info = pInfo;
pOperator->numOfOutput = numOfOutput;
pOperator->exec = doStreamBlockScan;
pOperator->pTaskInfo = pTaskInfo;
} }

View File

@ -680,10 +680,9 @@ int32_t doCheckAndBuildCreateTableReq(SCreateTableSql* pCreateTable, SParseConte
serializeVgroupTablesBatchImpl(&tbatch, pBufArray); serializeVgroupTablesBatchImpl(&tbatch, pBufArray);
destroyCreateTbReqBatch(&tbatch); destroyCreateTbReqBatch(&tbatch);
} else { // it is a child table, created according to a super table } else { // it is a child table, created according to a super table
code = doCheckAndBuildCreateCTableReq(pCreateTable, pCtx, pMsgBuf, &pBufArray); code = doCheckAndBuildCreateCTableReq(pCreateTable, pCtx, pMsgBuf, &pBufArray);
if (code != 0) { if (code != TSDB_CODE_SUCCESS) {
return code; return code;
} }
} }

View File

@ -1656,7 +1656,7 @@ static FORCE_INLINE int32_t checkAndTrimValue(SToken* pToken, uint32_t type, cha
} }
// Remove quotation marks // Remove quotation marks
if (TK_STRING == type) { if (TSDB_DATA_TYPE_BINARY == type) {
if (pToken->n >= TSDB_MAX_BYTES_PER_ROW) { if (pToken->n >= TSDB_MAX_BYTES_PER_ROW) {
return buildSyntaxErrMsg(pMsgBuf, "too long string", pToken->z); return buildSyntaxErrMsg(pMsgBuf, "too long string", pToken->z);
} }

View File

@ -84,7 +84,7 @@ void createDbAndStb() {
} }
taos_free_result(pRes); taos_free_result(pRes);
sprintf(qstr, "create table %s (ts timestamp, i int) tags (j int)", stbName); sprintf(qstr, "create table %s (ts timestamp, i int) tags (j bigint)", stbName);
pRes = taos_query(con, qstr); pRes = taos_query(con, qstr);
code = taos_errno(pRes); code = taos_errno(pRes);
if (code != 0) { if (code != 0) {