feat:support decimal in raw block
This commit is contained in:
parent
0ca18d5585
commit
16979f9dc8
|
@ -219,6 +219,7 @@ typedef struct STqReader {
|
||||||
SSDataBlock *pResBlock;
|
SSDataBlock *pResBlock;
|
||||||
int64_t lastTs;
|
int64_t lastTs;
|
||||||
bool hasPrimaryKey;
|
bool hasPrimaryKey;
|
||||||
|
SExtSchema *extSchema;
|
||||||
} STqReader;
|
} STqReader;
|
||||||
|
|
||||||
STqReader *tqReaderOpen(SVnode *pVnode);
|
STqReader *tqReaderOpen(SVnode *pVnode);
|
||||||
|
|
|
@ -167,8 +167,10 @@ int32_t metaDropMultipleTables(SMeta* pMeta, int64_t version, SArray* tb
|
||||||
int metaTtlFindExpired(SMeta* pMeta, int64_t timePointMs, SArray* tbUids, int32_t ttlDropMaxCount);
|
int metaTtlFindExpired(SMeta* pMeta, int64_t timePointMs, SArray* tbUids, int32_t ttlDropMaxCount);
|
||||||
int metaAlterTable(SMeta* pMeta, int64_t version, SVAlterTbReq* pReq, STableMetaRsp* pMetaRsp);
|
int metaAlterTable(SMeta* pMeta, int64_t version, SVAlterTbReq* pReq, STableMetaRsp* pMetaRsp);
|
||||||
int metaUpdateChangeTimeWithLock(SMeta* pMeta, tb_uid_t uid, int64_t changeTimeMs);
|
int metaUpdateChangeTimeWithLock(SMeta* pMeta, tb_uid_t uid, int64_t changeTimeMs);
|
||||||
SSchemaWrapper* metaGetTableSchema(SMeta* pMeta, tb_uid_t uid, int32_t sver, int lock);
|
|
||||||
int64_t metaGetTableCreateTime(SMeta* pMeta, tb_uid_t uid, int lock);
|
int64_t metaGetTableCreateTime(SMeta* pMeta, tb_uid_t uid, int lock);
|
||||||
|
SSchemaWrapper* metaGetTableSchema(SMeta* pMeta, tb_uid_t uid, int32_t sver, int lock, int64_t* createTime, SExtSchema** extSchema);
|
||||||
|
SExtSchema* metaCloneSExtSchema(const SMetaEntry *pME);
|
||||||
|
void metaFreeSExtSchema(SExtSchema *p);
|
||||||
int32_t metaGetTbTSchemaNotNull(SMeta* pMeta, tb_uid_t uid, int32_t sver, int lock, STSchema** ppTSchema);
|
int32_t metaGetTbTSchemaNotNull(SMeta* pMeta, tb_uid_t uid, int32_t sver, int lock, STSchema** ppTSchema);
|
||||||
int32_t metaGetTbTSchemaMaybeNull(SMeta* pMeta, tb_uid_t uid, int32_t sver, int lock, STSchema** ppTSchema);
|
int32_t metaGetTbTSchemaMaybeNull(SMeta* pMeta, tb_uid_t uid, int32_t sver, int lock, STSchema** ppTSchema);
|
||||||
STSchema* metaGetTbTSchema(SMeta* pMeta, tb_uid_t uid, int32_t sver, int lock);
|
STSchema* metaGetTbTSchema(SMeta* pMeta, tb_uid_t uid, int32_t sver, int lock);
|
||||||
|
|
|
@ -70,6 +70,32 @@ static int32_t metaDecodeExtSchemas(SDecoder* pDecoder, SMetaEntry* pME) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
SExtSchema* metaCloneSExtSchema(const SMetaEntry *pME) {
|
||||||
|
const SSchemaWrapper *pSchWrapper = NULL;
|
||||||
|
bool hasTypeMods = false;
|
||||||
|
if (pME->type == TSDB_SUPER_TABLE) {
|
||||||
|
pSchWrapper = &pME->stbEntry.schemaRow;
|
||||||
|
} else if (pME->type == TSDB_NORMAL_TABLE) {
|
||||||
|
pSchWrapper = &pME->ntbEntry.schemaRow;
|
||||||
|
} else {
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
hasTypeMods = schemasHasTypeMod(pSchWrapper->pSchema, pSchWrapper->nCols);
|
||||||
|
|
||||||
|
if (hasTypeMods) {
|
||||||
|
SExtSchema* ret = taosMemoryMalloc(sizeof(SExtSchema) * pSchWrapper->nCols);
|
||||||
|
memcpy(ret, pME->pExtSchemas, pSchWrapper->nCols * sizeof(SExtSchema));
|
||||||
|
return ret;
|
||||||
|
}
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
void metaFreeSExtSchema(SExtSchema *p) {
|
||||||
|
if (p) {
|
||||||
|
taosMemoryFreeClear(p);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
int meteEncodeColCmprEntry(SEncoder *pCoder, const SMetaEntry *pME) {
|
int meteEncodeColCmprEntry(SEncoder *pCoder, const SMetaEntry *pME) {
|
||||||
const SColCmprWrapper *pw = &pME->colCmpr;
|
const SColCmprWrapper *pw = &pME->colCmpr;
|
||||||
TAOS_CHECK_RETURN(tEncodeI32v(pCoder, pw->nCols));
|
TAOS_CHECK_RETURN(tEncodeI32v(pCoder, pw->nCols));
|
||||||
|
|
|
@ -378,7 +378,7 @@ int32_t metaTbCursorPrev(SMTbCursor *pTbCur, ETableType jumpTableType) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
SSchemaWrapper *metaGetTableSchema(SMeta *pMeta, tb_uid_t uid, int32_t sver, int lock) {
|
SSchemaWrapper *metaGetTableSchema(SMeta *pMeta, tb_uid_t uid, int32_t sver, int lock, int64_t *createTime, SExtSchema** extSchema) {
|
||||||
void *pData = NULL;
|
void *pData = NULL;
|
||||||
int nData = 0;
|
int nData = 0;
|
||||||
int64_t version;
|
int64_t version;
|
||||||
|
@ -409,6 +409,7 @@ _query:
|
||||||
if (me.type == TSDB_SUPER_TABLE) {
|
if (me.type == TSDB_SUPER_TABLE) {
|
||||||
if (sver == -1 || sver == me.stbEntry.schemaRow.version) {
|
if (sver == -1 || sver == me.stbEntry.schemaRow.version) {
|
||||||
pSchema = tCloneSSchemaWrapper(&me.stbEntry.schemaRow);
|
pSchema = tCloneSSchemaWrapper(&me.stbEntry.schemaRow);
|
||||||
|
if (extSchema != NULL) *extSchema = metaCloneSExtSchema(&me);
|
||||||
tDecoderClear(&dc);
|
tDecoderClear(&dc);
|
||||||
goto _exit;
|
goto _exit;
|
||||||
}
|
}
|
||||||
|
@ -419,6 +420,7 @@ _query:
|
||||||
} else {
|
} else {
|
||||||
if (sver == -1 || sver == me.ntbEntry.schemaRow.version) {
|
if (sver == -1 || sver == me.ntbEntry.schemaRow.version) {
|
||||||
pSchema = tCloneSSchemaWrapper(&me.ntbEntry.schemaRow);
|
pSchema = tCloneSSchemaWrapper(&me.ntbEntry.schemaRow);
|
||||||
|
if (extSchema != NULL) *extSchema = metaCloneSExtSchema(&me);
|
||||||
tDecoderClear(&dc);
|
tDecoderClear(&dc);
|
||||||
goto _exit;
|
goto _exit;
|
||||||
}
|
}
|
||||||
|
@ -435,6 +437,7 @@ _query:
|
||||||
goto _err;
|
goto _err;
|
||||||
}
|
}
|
||||||
pSchema = tCloneSSchemaWrapper(&schema);
|
pSchema = tCloneSSchemaWrapper(&schema);
|
||||||
|
if (extSchema != NULL) *extSchema = metaCloneSExtSchema(&me);
|
||||||
tDecoderClear(&dc);
|
tDecoderClear(&dc);
|
||||||
|
|
||||||
_exit:
|
_exit:
|
||||||
|
@ -664,7 +667,7 @@ STSchema *metaGetTbTSchema(SMeta *pMeta, tb_uid_t uid, int32_t sver, int lock) {
|
||||||
STSchema *pTSchema = NULL;
|
STSchema *pTSchema = NULL;
|
||||||
SSchemaWrapper *pSW = NULL;
|
SSchemaWrapper *pSW = NULL;
|
||||||
|
|
||||||
pSW = metaGetTableSchema(pMeta, uid, sver, lock);
|
pSW = metaGetTableSchema(pMeta, uid, sver, lock, NULL, NULL);
|
||||||
if (!pSW) return NULL;
|
if (!pSW) return NULL;
|
||||||
|
|
||||||
pTSchema = tBuildTSchema(pSW->pSchema, pSW->nCols, pSW->version);
|
pTSchema = tBuildTSchema(pSW->pSchema, pSW->nCols, pSW->version);
|
||||||
|
|
|
@ -593,7 +593,7 @@ int32_t setForSnapShot(SSnapContext* ctx, int64_t uid) {
|
||||||
|
|
||||||
void taosXSetTablePrimaryKey(SSnapContext* ctx, int64_t uid) {
|
void taosXSetTablePrimaryKey(SSnapContext* ctx, int64_t uid) {
|
||||||
bool ret = false;
|
bool ret = false;
|
||||||
SSchemaWrapper* schema = metaGetTableSchema(ctx->pMeta, uid, -1, 1);
|
SSchemaWrapper* schema = metaGetTableSchema(ctx->pMeta, uid, -1, 1, NULL, NULL);
|
||||||
if (schema && schema->nCols >= 2 && schema->pSchema[1].flags & COL_IS_KEY) {
|
if (schema && schema->nCols >= 2 && schema->pSchema[1].flags & COL_IS_KEY) {
|
||||||
ret = true;
|
ret = true;
|
||||||
}
|
}
|
||||||
|
|
|
@ -283,7 +283,7 @@ void tqSetTablePrimaryKey(STqReader* pReader, int64_t uid) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
bool ret = false;
|
bool ret = false;
|
||||||
SSchemaWrapper* schema = metaGetTableSchema(pReader->pVnodeMeta, uid, -1, 1);
|
SSchemaWrapper* schema = metaGetTableSchema(pReader->pVnodeMeta, uid, -1, 1, NULL, NULL);
|
||||||
if (schema && schema->nCols >= 2 && schema->pSchema[1].flags & COL_IS_KEY) {
|
if (schema && schema->nCols >= 2 && schema->pSchema[1].flags & COL_IS_KEY) {
|
||||||
ret = true;
|
ret = true;
|
||||||
}
|
}
|
||||||
|
@ -336,6 +336,7 @@ void tqReaderClose(STqReader* pReader) {
|
||||||
tDeleteSchemaWrapper(pReader->pSchemaWrapper);
|
tDeleteSchemaWrapper(pReader->pSchemaWrapper);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
metaFreeSExtSchema(pReader->extSchema);
|
||||||
if (pReader->pColIdList) {
|
if (pReader->pColIdList) {
|
||||||
taosArrayDestroy(pReader->pColIdList);
|
taosArrayDestroy(pReader->pColIdList);
|
||||||
}
|
}
|
||||||
|
@ -597,7 +598,7 @@ END:
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t tqMaskBlock(SSchemaWrapper* pDst, SSDataBlock* pBlock, const SSchemaWrapper* pSrc, char* mask) {
|
int32_t tqMaskBlock(SSchemaWrapper* pDst, SSDataBlock* pBlock, const SSchemaWrapper* pSrc, char* mask, SExtSchema* extSchema) {
|
||||||
if (pDst == NULL || pBlock == NULL || pSrc == NULL || mask == NULL) {
|
if (pDst == NULL || pBlock == NULL || pSrc == NULL || mask == NULL) {
|
||||||
return TSDB_CODE_INVALID_PARA;
|
return TSDB_CODE_INVALID_PARA;
|
||||||
}
|
}
|
||||||
|
@ -620,6 +621,9 @@ int32_t tqMaskBlock(SSchemaWrapper* pDst, SSDataBlock* pBlock, const SSchemaWrap
|
||||||
pDst->pSchema[j++] = pSrc->pSchema[i];
|
pDst->pSchema[j++] = pSrc->pSchema[i];
|
||||||
SColumnInfoData colInfo =
|
SColumnInfoData colInfo =
|
||||||
createColumnInfoData(pSrc->pSchema[i].type, pSrc->pSchema[i].bytes, pSrc->pSchema[i].colId);
|
createColumnInfoData(pSrc->pSchema[i].type, pSrc->pSchema[i].bytes, pSrc->pSchema[i].colId);
|
||||||
|
if (extSchema != NULL) {
|
||||||
|
decimalFromTypeMod(extSchema[i].typeMod, &colInfo.info.precision, &colInfo.info.scale);
|
||||||
|
}
|
||||||
code = blockDataAppendColInfo(pBlock, &colInfo);
|
code = blockDataAppendColInfo(pBlock, &colInfo);
|
||||||
if (code != 0) {
|
if (code != 0) {
|
||||||
return code;
|
return code;
|
||||||
|
@ -653,6 +657,9 @@ static int32_t buildResSDataBlock(STqReader* pReader, SSchemaWrapper* pSchema, c
|
||||||
SSchema* pColSchema = &pSchema->pSchema[i];
|
SSchema* pColSchema = &pSchema->pSchema[i];
|
||||||
SColumnInfoData colInfo = createColumnInfoData(pColSchema->type, pColSchema->bytes, pColSchema->colId);
|
SColumnInfoData colInfo = createColumnInfoData(pColSchema->type, pColSchema->bytes, pColSchema->colId);
|
||||||
|
|
||||||
|
if (pReader->extSchema != NULL) {
|
||||||
|
decimalFromTypeMod(pReader->extSchema[i].typeMod, &colInfo.info.precision, &colInfo.info.scale);
|
||||||
|
}
|
||||||
int32_t code = blockDataAppendColInfo(pBlock, &colInfo);
|
int32_t code = blockDataAppendColInfo(pBlock, &colInfo);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
blockDataFreeRes(pBlock);
|
blockDataFreeRes(pBlock);
|
||||||
|
@ -741,8 +748,8 @@ int32_t tqRetrieveDataBlock(STqReader* pReader, SSDataBlock** pRes, const char*
|
||||||
if ((suid != 0 && pReader->cachedSchemaSuid != suid) || (suid == 0 && pReader->cachedSchemaUid != uid) ||
|
if ((suid != 0 && pReader->cachedSchemaSuid != suid) || (suid == 0 && pReader->cachedSchemaUid != uid) ||
|
||||||
(pReader->cachedSchemaVer != sversion)) {
|
(pReader->cachedSchemaVer != sversion)) {
|
||||||
tDeleteSchemaWrapper(pReader->pSchemaWrapper);
|
tDeleteSchemaWrapper(pReader->pSchemaWrapper);
|
||||||
|
metaFreeSExtSchema(pReader->extSchema);
|
||||||
pReader->pSchemaWrapper = metaGetTableSchema(pReader->pVnodeMeta, uid, sversion, 1);
|
pReader->pSchemaWrapper = metaGetTableSchema(pReader->pVnodeMeta, uid, sversion, 1, NULL, &pReader->extSchema);
|
||||||
if (pReader->pSchemaWrapper == NULL) {
|
if (pReader->pSchemaWrapper == NULL) {
|
||||||
tqWarn("vgId:%d, cannot found schema wrapper for table: suid:%" PRId64 ", uid:%" PRId64
|
tqWarn("vgId:%d, cannot found schema wrapper for table: suid:%" PRId64 ", uid:%" PRId64
|
||||||
"version %d, possibly dropped table",
|
"version %d, possibly dropped table",
|
||||||
|
@ -901,7 +908,7 @@ static int32_t processBuildNew(STqReader* pReader, SSubmitTbData* pSubmitTbData,
|
||||||
pSW = taosMemoryCalloc(1, sizeof(SSchemaWrapper));
|
pSW = taosMemoryCalloc(1, sizeof(SSchemaWrapper));
|
||||||
TQ_NULL_GO_TO_END(pSW);
|
TQ_NULL_GO_TO_END(pSW);
|
||||||
|
|
||||||
TQ_ERR_GO_TO_END(tqMaskBlock(pSW, block, pSchemaWrapper, assigned));
|
TQ_ERR_GO_TO_END(tqMaskBlock(pSW, block, pSchemaWrapper, assigned, pReader->extSchema));
|
||||||
tqTrace("vgId:%d, build new block, col %d", pReader->pWalReader->pWal->cfg.vgId,
|
tqTrace("vgId:%d, build new block, col %d", pReader->pWalReader->pWal->cfg.vgId,
|
||||||
(int32_t)taosArrayGetSize(block->pDataBlock));
|
(int32_t)taosArrayGetSize(block->pDataBlock));
|
||||||
|
|
||||||
|
@ -1119,7 +1126,8 @@ int32_t tqRetrieveTaosxBlock(STqReader* pReader, SMqDataRsp* pRsp, SArray* block
|
||||||
pReader->lastBlkUid = uid;
|
pReader->lastBlkUid = uid;
|
||||||
|
|
||||||
tDeleteSchemaWrapper(pReader->pSchemaWrapper);
|
tDeleteSchemaWrapper(pReader->pSchemaWrapper);
|
||||||
pReader->pSchemaWrapper = metaGetTableSchema(pReader->pVnodeMeta, uid, sversion, 1);
|
metaFreeSExtSchema(pReader->extSchema);
|
||||||
|
pReader->pSchemaWrapper = metaGetTableSchema(pReader->pVnodeMeta, uid, sversion, 1, createTime, &pReader->extSchema);
|
||||||
if (pReader->pSchemaWrapper == NULL) {
|
if (pReader->pSchemaWrapper == NULL) {
|
||||||
tqWarn("vgId:%d, cannot found schema wrapper for table: suid:%" PRId64 ", version %d, possibly dropped table",
|
tqWarn("vgId:%d, cannot found schema wrapper for table: suid:%" PRId64 ", version %d, possibly dropped table",
|
||||||
pReader->pWalReader->pWal->cfg.vgId, uid, pReader->cachedSchemaVer);
|
pReader->pWalReader->pWal->cfg.vgId, uid, pReader->cachedSchemaVer);
|
||||||
|
|
|
@ -760,7 +760,7 @@ int32_t vnodeGetCtbNum(SVnode *pVnode, int64_t suid, int64_t *num) {
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t vnodeGetStbColumnNum(SVnode *pVnode, tb_uid_t suid, int *num) {
|
int32_t vnodeGetStbColumnNum(SVnode *pVnode, tb_uid_t suid, int *num) {
|
||||||
SSchemaWrapper *pSW = metaGetTableSchema(pVnode->pMeta, suid, -1, 0);
|
SSchemaWrapper *pSW = metaGetTableSchema(pVnode->pMeta, suid, -1, 0, NULL, NULL);
|
||||||
if (pSW) {
|
if (pSW) {
|
||||||
*num = pSW->nCols;
|
*num = pSW->nCols;
|
||||||
tDeleteSchemaWrapper(pSW);
|
tDeleteSchemaWrapper(pSW);
|
||||||
|
|
|
@ -25,6 +25,7 @@ class TDTestCase:
|
||||||
tdSql.query("select * from d1")
|
tdSql.query("select * from d1")
|
||||||
tdSql.checkRows(1)
|
tdSql.checkRows(1)
|
||||||
tdSql.checkData(0, 1, 120)
|
tdSql.checkData(0, 1, 120)
|
||||||
|
tdSql.checkData(0, 4, 2.32)
|
||||||
|
|
||||||
tdSql.query("select * from d2")
|
tdSql.query("select * from d2")
|
||||||
tdSql.checkRows(1)
|
tdSql.checkRows(1)
|
||||||
|
|
|
@ -23,6 +23,13 @@ class TDTestCase:
|
||||||
tdSql.init(conn.cursor())
|
tdSql.init(conn.cursor())
|
||||||
#tdSql.init(conn.cursor(), logSql) # output sql.txt file
|
#tdSql.init(conn.cursor(), logSql) # output sql.txt file
|
||||||
|
|
||||||
|
def checkData(self):
|
||||||
|
tdSql.execute('use db_taosx')
|
||||||
|
tdSql.query("select * from ct0")
|
||||||
|
tdSql.checkRows(2)
|
||||||
|
tdSql.checkData(0, 4, 23.23)
|
||||||
|
|
||||||
|
return
|
||||||
def run(self):
|
def run(self):
|
||||||
buildPath = tdCom.getBuildPath()
|
buildPath = tdCom.getBuildPath()
|
||||||
cmdStr = '%s/build/bin/tmq_write_raw_test'%(buildPath)
|
cmdStr = '%s/build/bin/tmq_write_raw_test'%(buildPath)
|
||||||
|
@ -41,6 +48,7 @@ class TDTestCase:
|
||||||
tdLog.info(cmdStr)
|
tdLog.info(cmdStr)
|
||||||
os.system(cmdStr)
|
os.system(cmdStr)
|
||||||
|
|
||||||
|
self.checkData()
|
||||||
return
|
return
|
||||||
|
|
||||||
def stop(self):
|
def stop(self):
|
||||||
|
|
|
@ -42,6 +42,23 @@ static void msg_process(TAOS_RES* msg) {
|
||||||
printf("-----------topic-------------: %s\n", tmq_get_topic_name(msg));
|
printf("-----------topic-------------: %s\n", tmq_get_topic_name(msg));
|
||||||
printf("db: %s\n", tmq_get_db_name(msg));
|
printf("db: %s\n", tmq_get_db_name(msg));
|
||||||
printf("vg: %d\n", tmq_get_vgroup_id(msg));
|
printf("vg: %d\n", tmq_get_vgroup_id(msg));
|
||||||
|
if (strcmp(tmq_get_db_name(msg), "db_query") == 0){
|
||||||
|
TAOS_ROW row = NULL;
|
||||||
|
int32_t cnt = 0;
|
||||||
|
while ((row = taos_fetch_row(msg))) {
|
||||||
|
|
||||||
|
int numFields = taos_num_fields(msg);
|
||||||
|
TAOS_FIELD *fields = taos_fetch_fields(msg);
|
||||||
|
|
||||||
|
for (int i = 0; i < numFields; ++i) {
|
||||||
|
if (IS_DECIMAL_TYPE(fields[i].type)) {
|
||||||
|
ASSERT(strcmp(row[i], "3.122") == 0);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
TAOS* pConn = use_db();
|
TAOS* pConn = use_db();
|
||||||
if (tmq_get_res_type(msg) == TMQ_RES_TABLE_META || tmq_get_res_type(msg) == TMQ_RES_METADATA) {
|
if (tmq_get_res_type(msg) == TMQ_RES_TABLE_META || tmq_get_res_type(msg) == TMQ_RES_METADATA) {
|
||||||
char* result = tmq_get_json_meta(msg);
|
char* result = tmq_get_json_meta(msg);
|
||||||
|
@ -62,7 +79,7 @@ static void msg_process(TAOS_RES* msg) {
|
||||||
|
|
||||||
int buildDatabase(TAOS* pConn, TAOS_RES* pRes) {
|
int buildDatabase(TAOS* pConn, TAOS_RES* pRes) {
|
||||||
pRes = taos_query(pConn,
|
pRes = taos_query(pConn,
|
||||||
"create stable if not exists st1 (ts timestamp, c1 int, c2 float, c3 binary(16)) tags(t1 int, t3 "
|
"create stable if not exists st1 (ts timestamp, c1 int, c2 float, c3 binary(16), c4 decimal(10,3)) tags(t1 int, t3 "
|
||||||
"nchar(8), t4 bool)");
|
"nchar(8), t4 bool)");
|
||||||
if (taos_errno(pRes) != 0) {
|
if (taos_errno(pRes) != 0) {
|
||||||
printf("failed to create super table st1, reason:%s\n", taos_errstr(pRes));
|
printf("failed to create super table st1, reason:%s\n", taos_errstr(pRes));
|
||||||
|
@ -77,7 +94,7 @@ int buildDatabase(TAOS* pConn, TAOS_RES* pRes) {
|
||||||
}
|
}
|
||||||
taos_free_result(pRes);
|
taos_free_result(pRes);
|
||||||
|
|
||||||
pRes = taos_query(pConn, "insert into ct0 using st1 tags(1000, \"ttt\", true) values(1626006833400, 1, 2, 'a')");
|
pRes = taos_query(pConn, "insert into ct0 using st1 tags(1000, \"ttt\", true) values(1626006833400, 1, 2, 'a', 3.2)");
|
||||||
if (taos_errno(pRes) != 0) {
|
if (taos_errno(pRes) != 0) {
|
||||||
printf("failed to insert into ct0, reason:%s\n", taos_errstr(pRes));
|
printf("failed to insert into ct0, reason:%s\n", taos_errstr(pRes));
|
||||||
return -1;
|
return -1;
|
||||||
|
@ -98,7 +115,7 @@ int buildDatabase(TAOS* pConn, TAOS_RES* pRes) {
|
||||||
}
|
}
|
||||||
taos_free_result(pRes);
|
taos_free_result(pRes);
|
||||||
|
|
||||||
pRes = taos_query(pConn, "insert into ct1 using st1(t1) tags(2000) values(1626006833600, 3, 4, 'b')");
|
pRes = taos_query(pConn, "insert into ct1 using st1(t1) tags(2000) values(1626006833600, 3, 4, 'b', 4.32)");
|
||||||
if (taos_errno(pRes) != 0) {
|
if (taos_errno(pRes) != 0) {
|
||||||
printf("failed to insert into ct1, reason:%s\n", taos_errstr(pRes));
|
printf("failed to insert into ct1, reason:%s\n", taos_errstr(pRes));
|
||||||
return -1;
|
return -1;
|
||||||
|
@ -112,14 +129,14 @@ int buildDatabase(TAOS* pConn, TAOS_RES* pRes) {
|
||||||
}
|
}
|
||||||
taos_free_result(pRes);
|
taos_free_result(pRes);
|
||||||
|
|
||||||
pRes = taos_query(pConn, "insert into ct0 using st1 tags(1000, \"ttt\", true) values(1626006833400, 1, 2, 'a')");
|
pRes = taos_query(pConn, "insert into ct0 using st1 tags(1000, \"ttt\", true) values(1626006833400, 1, 2, 'a', 23.23)");
|
||||||
if (taos_errno(pRes) != 0) {
|
if (taos_errno(pRes) != 0) {
|
||||||
printf("failed to insert into ct0, reason:%s\n", taos_errstr(pRes));
|
printf("failed to insert into ct0, reason:%s\n", taos_errstr(pRes));
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
taos_free_result(pRes);
|
taos_free_result(pRes);
|
||||||
|
|
||||||
pRes = taos_query(pConn, "insert into ct1 using st1(t1) tags(2000) values(1626006833600, 3, 4, 'b')");
|
pRes = taos_query(pConn, "insert into ct1 using st1(t1) tags(2000) values(1626006833600, 3, 4, 'b', 43.53)");
|
||||||
if (taos_errno(pRes) != 0) {
|
if (taos_errno(pRes) != 0) {
|
||||||
printf("failed to insert into ct1, reason:%s\n", taos_errstr(pRes));
|
printf("failed to insert into ct1, reason:%s\n", taos_errstr(pRes));
|
||||||
return -1;
|
return -1;
|
||||||
|
@ -128,8 +145,8 @@ int buildDatabase(TAOS* pConn, TAOS_RES* pRes) {
|
||||||
|
|
||||||
pRes = taos_query(
|
pRes = taos_query(
|
||||||
pConn,
|
pConn,
|
||||||
"insert into ct3 using st1(t1) tags(3000) values(1626006833600, 5, 6, 'c') ct1 using st1(t1) tags(2000) values(1626006833601, 2, 3, 'sds') (1626006833602, 4, 5, "
|
"insert into ct3 using st1(t1) tags(3000) values(1626006833600, 5, 6, 'c', 43.53) ct1 using st1(t1) tags(2000) values(1626006833601, 2, 3, 'sds', 43.53) (1626006833602, 4, 5, "
|
||||||
"'ddd') ct0 using st1 tags(1000, \"ttt\", true) values(1626006833603, 4, 3, 'hwj') ct1 using st1(t1) tags(2000) values(now+5s, 23, 32, 's21ds')");
|
"'ddd', 43.53) ct0 using st1 tags(1000, \"ttt\", true) values(1626006833603, 4, 3, 'hwj', 43.53) ct1 using st1(t1) tags(2000) values(now+5s, 23, 32, 's21ds', 43.53)");
|
||||||
if (taos_errno(pRes) != 0) {
|
if (taos_errno(pRes) != 0) {
|
||||||
printf("failed to insert into ct3, reason:%s\n", taos_errstr(pRes));
|
printf("failed to insert into ct3, reason:%s\n", taos_errstr(pRes));
|
||||||
return -1;
|
return -1;
|
||||||
|
@ -196,6 +213,36 @@ int32_t init_env() {
|
||||||
|
|
||||||
buildDatabase(pConn, pRes);
|
buildDatabase(pConn, pRes);
|
||||||
|
|
||||||
|
pRes = taos_query(pConn, "drop database if exists db_query");
|
||||||
|
if (taos_errno(pRes) != 0) {
|
||||||
|
printf("error in drop db_taosx, reason:%s\n", taos_errstr(pRes));
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
taos_free_result(pRes);
|
||||||
|
|
||||||
|
pRes = taos_query(pConn, "create database if not exists db_query vgroups 1 wal_retention_period 3600");
|
||||||
|
if (taos_errno(pRes) != 0) {
|
||||||
|
printf("error in create db, reason:%s\n", taos_errstr(pRes));
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
taos_free_result(pRes);
|
||||||
|
|
||||||
|
// create and insert another stable
|
||||||
|
pRes = taos_query(pConn,
|
||||||
|
"create stable if not exists db_query.st11 (ts timestamp, c1 int, c2 float, c3 binary(16), c4 decimal(9,3)) tags(t1 int, t3 "
|
||||||
|
"nchar(8), t4 bool)");
|
||||||
|
if (taos_errno(pRes) != 0) {
|
||||||
|
printf("failed to create super table st1, reason:%s\n", taos_errstr(pRes));
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
taos_free_result(pRes);
|
||||||
|
|
||||||
|
pRes = taos_query(pConn, "insert into db_query.ct10 using db_query.st11 tags(1000, \"ttt\", true) values(1626006833400, 1, 2, 'a', 3.122)");
|
||||||
|
if (taos_errno(pRes) != 0) {
|
||||||
|
printf("failed to insert into ct0, reason:%s\n", taos_errstr(pRes));
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
taos_free_result(pRes);
|
||||||
taos_close(pConn);
|
taos_close(pConn);
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
@ -215,6 +262,13 @@ int32_t create_topic() {
|
||||||
}
|
}
|
||||||
taos_free_result(pRes);
|
taos_free_result(pRes);
|
||||||
|
|
||||||
|
pRes = taos_query(pConn, "create topic topic_query as select * from db_query.st11");
|
||||||
|
if (taos_errno(pRes) != 0) {
|
||||||
|
printf("failed to create topic topic_query, reason:%s\n", taos_errstr(pRes));
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
taos_free_result(pRes);
|
||||||
|
|
||||||
taos_close(pConn);
|
taos_close(pConn);
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
@ -254,6 +308,7 @@ tmq_t* build_consumer() {
|
||||||
tmq_list_t* build_topic_list() {
|
tmq_list_t* build_topic_list() {
|
||||||
tmq_list_t* topic_list = tmq_list_new();
|
tmq_list_t* topic_list = tmq_list_new();
|
||||||
tmq_list_append(topic_list, "topic_db");
|
tmq_list_append(topic_list, "topic_db");
|
||||||
|
tmq_list_append(topic_list, "topic_query");
|
||||||
return topic_list;
|
return topic_list;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue