diff --git a/examples/c/tmq.c b/examples/c/tmq.c index 697a53e570..89be847f8e 100644 --- a/examples/c/tmq.c +++ b/examples/c/tmq.c @@ -18,6 +18,7 @@ #include #include #include "taos.h" +#include static int running = 1; static void msg_process(TAOS_RES* msg) { @@ -30,7 +31,11 @@ static void msg_process(TAOS_RES* msg) { void* meta; int32_t metaLen; tmq_get_raw_meta(msg, &meta, &metaLen); - + char* result = tmq_get_json_meta(msg); + if(result){ + printf("meta result: %s\n", result); + free(result); + } printf("meta, len is %d\n", metaLen); return; } @@ -119,6 +124,104 @@ int32_t init_env() { } taos_free_result(pRes); + pRes = taos_query(pConn, "alter table st1 add column c4 bigint"); + if (taos_errno(pRes) != 0) { + printf("failed to alter super table st1, reason:%s\n", taos_errstr(pRes)); + return -1; + } + taos_free_result(pRes); + + pRes = taos_query(pConn, "alter table st1 modify column c3 binary(64)"); + if (taos_errno(pRes) != 0) { + printf("failed to alter super table st1, reason:%s\n", taos_errstr(pRes)); + return -1; + } + taos_free_result(pRes); + + pRes = taos_query(pConn, "alter table st1 add tag t2 binary(64)"); + if (taos_errno(pRes) != 0) { + printf("failed to alter super table st1, reason:%s\n", taos_errstr(pRes)); + return -1; + } + taos_free_result(pRes); + + pRes = taos_query(pConn, "alter table ct3 set tag t1=5000"); + if (taos_errno(pRes) != 0) { + printf("failed to slter child table ct3, reason:%s\n", taos_errstr(pRes)); + return -1; + } + taos_free_result(pRes); + + pRes = taos_query(pConn, "drop table ct3 ct1"); + if (taos_errno(pRes) != 0) { + printf("failed to drop child table ct3, reason:%s\n", taos_errstr(pRes)); + return -1; + } + taos_free_result(pRes); + + pRes = taos_query(pConn, "drop table st1"); + if (taos_errno(pRes) != 0) { + printf("failed to drop super table st1, reason:%s\n", taos_errstr(pRes)); + return -1; + } + taos_free_result(pRes); + + pRes = taos_query(pConn, "create table if not exists n1(ts timestamp, c1 int, c2 nchar(4))"); + if (taos_errno(pRes) != 0) { + printf("failed to create normal table n1, reason:%s\n", taos_errstr(pRes)); + return -1; + } + taos_free_result(pRes); + + pRes = taos_query(pConn, "alter table n1 add column c3 bigint"); + if (taos_errno(pRes) != 0) { + printf("failed to alter normal table n1, reason:%s\n", taos_errstr(pRes)); + return -1; + } + taos_free_result(pRes); + + pRes = taos_query(pConn, "alter table n1 modify column c2 nchar(8)"); + if (taos_errno(pRes) != 0) { + printf("failed to alter normal table n1, reason:%s\n", taos_errstr(pRes)); + return -1; + } + taos_free_result(pRes); + + pRes = taos_query(pConn, "alter table n1 rename column c3 cc3"); + if (taos_errno(pRes) != 0) { + printf("failed to alter normal table n1, reason:%s\n", taos_errstr(pRes)); + return -1; + } + taos_free_result(pRes); + + pRes = taos_query(pConn, "alter table n1 drop column c1"); + if (taos_errno(pRes) != 0) { + printf("failed to alter normal table n1, reason:%s\n", taos_errstr(pRes)); + return -1; + } + taos_free_result(pRes); + + pRes = taos_query(pConn, "drop table n1"); + if (taos_errno(pRes) != 0) { + printf("failed to drop normal table n1, reason:%s\n", taos_errstr(pRes)); + return -1; + } + taos_free_result(pRes); + + pRes = taos_query(pConn, "create table jt(ts timestamp, i int) tags(t json)"); + if (taos_errno(pRes) != 0) { + printf("failed to create super table jt, reason:%s\n", taos_errstr(pRes)); + return -1; + } + taos_free_result(pRes); + + pRes = taos_query(pConn, "create table jt1 using jt tags('{\"k1\":1, \"k2\":\"hello\"}')"); + if (taos_errno(pRes) != 0) { + printf("failed to create super table jt, reason:%s\n", taos_errstr(pRes)); + return -1; + } + taos_free_result(pRes); + return 0; } @@ -137,8 +240,8 @@ int32_t create_topic() { } taos_free_result(pRes); - /*pRes = taos_query(pConn, "create topic topic_ctb_column with meta as database abc1");*/ - pRes = taos_query(pConn, "create topic topic_ctb_column as select ts, c1, c2, c3 from st1"); + pRes = taos_query(pConn, "create topic topic_ctb_column with meta as database abc1"); +// pRes = taos_query(pConn, "create topic topic_ctb_column as select ts, c1, c2, c3 from st1"); if (taos_errno(pRes) != 0) { printf("failed to create topic topic_ctb_column, reason:%s\n", taos_errstr(pRes)); return -1; @@ -199,7 +302,7 @@ tmq_t* build_consumer() { tmq_conf_set(conf, "msg.with.table.name", "true"); tmq_conf_set(conf, "enable.auto.commit", "true"); - tmq_conf_set(conf, "experimental.snapshot.enable", "true"); + tmq_conf_set(conf, "experimental.snapshot.enable", "false"); tmq_conf_set_auto_commit_cb(conf, tmq_commit_cb_print, NULL); tmq_t* tmq = tmq_consumer_new(conf, NULL, 0); diff --git a/include/client/taos.h b/include/client/taos.h index 79f567fc9a..216a5832b0 100644 --- a/include/client/taos.h +++ b/include/client/taos.h @@ -263,6 +263,8 @@ typedef enum tmq_res_t tmq_res_t; DLL_EXPORT tmq_res_t tmq_get_res_type(TAOS_RES *res); DLL_EXPORT int32_t tmq_get_raw_meta(TAOS_RES *res, void **raw_meta, int32_t *raw_meta_len); +DLL_EXPORT int32_t taos_write_raw_meta(TAOS *res, void *raw_meta, int32_t raw_meta_len); +DLL_EXPORT char *tmq_get_json_meta(TAOS_RES *res); // Returning null means error. Returned result need to be freed. DLL_EXPORT const char *tmq_get_topic_name(TAOS_RES *res); DLL_EXPORT const char *tmq_get_db_name(TAOS_RES *res); DLL_EXPORT int32_t tmq_get_vgroup_id(TAOS_RES *res); diff --git a/source/client/src/tmq.c b/source/client/src/tmq.c index 667f5b1dbc..1475663a05 100644 --- a/source/client/src/tmq.c +++ b/source/client/src/tmq.c @@ -23,6 +23,7 @@ #include "tqueue.h" #include "tref.h" #include "ttimer.h" +#include "cJSON.h" int32_t tmqAskEp(tmq_t* tmq, bool async); @@ -1683,7 +1684,8 @@ void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) { if (pollRspWrapper->metaRsp.head.epoch == consumerEpoch) { SMqClientVg* pVg = pollRspWrapper->vgHandle; /*printf("vg %d offset %ld up to %ld\n", pVg->vgId, pVg->currentOffset, rspMsg->msg.rspOffset);*/ - pVg->currentOffsetNew = pollRspWrapper->metaRsp.rspOffsetNew; + pVg->currentOffsetNew.version = pollRspWrapper->metaRsp.rspOffset; + pVg->currentOffsetNew.type = TMQ_OFFSET__LOG; atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE); // build rsp SMqMetaRspObj* pRsp = tmqBuildMetaRspFromWrapper(pollRspWrapper); @@ -1848,6 +1850,406 @@ int32_t tmq_get_raw_meta(TAOS_RES* res, void** raw_meta, int32_t* raw_meta_len) return -1; } +static char *buildCreateTableJson(SSchemaWrapper *schemaRow, SSchemaWrapper* schemaTag, char* name, int64_t id, int8_t t){ + char* string = NULL; + cJSON* json = cJSON_CreateObject(); + if (json == NULL) { + return string; + } + cJSON* type = cJSON_CreateString("create"); + cJSON_AddItemToObject(json, "type", type); + + char uid[32] = {0}; + sprintf(uid, "%"PRIi64, id); + cJSON* id_ = cJSON_CreateString(uid); + cJSON_AddItemToObject(json, "id", id_); + cJSON* tableName = cJSON_CreateString(name); + cJSON_AddItemToObject(json, "tableName", tableName); + cJSON* tableType = cJSON_CreateString(t == TSDB_NORMAL_TABLE ? "normal" : "super"); + cJSON_AddItemToObject(json, "tableType", tableType); +// cJSON* version = cJSON_CreateNumber(1); +// cJSON_AddItemToObject(json, "version", version); + + cJSON* columns = cJSON_CreateArray(); + for(int i = 0; i < schemaRow->nCols; i++){ + cJSON* column = cJSON_CreateObject(); + SSchema *s = schemaRow->pSchema + i; + cJSON* cname = cJSON_CreateString(s->name); + cJSON_AddItemToObject(column, "name", cname); + cJSON* ctype = cJSON_CreateNumber(s->type); + cJSON_AddItemToObject(column, "type", ctype); + if(s->type == TSDB_DATA_TYPE_BINARY){ + int32_t length = s->bytes - VARSTR_HEADER_SIZE; + cJSON* cbytes = cJSON_CreateNumber(length); + cJSON_AddItemToObject(column, "length", cbytes); + }else if (s->type == TSDB_DATA_TYPE_NCHAR){ + int32_t length = (s->bytes - VARSTR_HEADER_SIZE)/TSDB_NCHAR_SIZE; + cJSON* cbytes = cJSON_CreateNumber(length); + cJSON_AddItemToObject(column, "length", cbytes); + } + cJSON_AddItemToArray(columns, column); + } + cJSON_AddItemToObject(json, "columns", columns); + + cJSON* tags = cJSON_CreateArray(); + for(int i = 0; schemaTag && i < schemaTag->nCols; i++){ + cJSON* tag = cJSON_CreateObject(); + SSchema *s = schemaTag->pSchema + i; + cJSON* tname = cJSON_CreateString(s->name); + cJSON_AddItemToObject(tag, "name", tname); + cJSON* ttype = cJSON_CreateNumber(s->type); + cJSON_AddItemToObject(tag, "type", ttype); + if(s->type == TSDB_DATA_TYPE_BINARY){ + int32_t length = s->bytes - VARSTR_HEADER_SIZE; + cJSON* cbytes = cJSON_CreateNumber(length); + cJSON_AddItemToObject(tag, "length", cbytes); + }else if (s->type == TSDB_DATA_TYPE_NCHAR){ + int32_t length = (s->bytes - VARSTR_HEADER_SIZE)/TSDB_NCHAR_SIZE; + cJSON* cbytes = cJSON_CreateNumber(length); + cJSON_AddItemToObject(tag, "length", cbytes); + } + cJSON_AddItemToArray(tags, tag); + } + cJSON_AddItemToObject(json, "tags", tags); + + string = cJSON_PrintUnformatted(json); + cJSON_Delete(json); + return string; +} + +static char *processCreateStb(SMqMetaRsp *metaRsp){ + SVCreateStbReq req = {0}; + SDecoder coder; + char* string = NULL; + + // decode and process req + void* data = POINTER_SHIFT(metaRsp->metaRsp, sizeof(SMsgHead)); + int32_t len = metaRsp->metaRspLen - sizeof(SMsgHead); + tDecoderInit(&coder, data, len); + + if (tDecodeSVCreateStbReq(&coder, &req) < 0) { + goto _err; + } + string = buildCreateTableJson(&req.schemaRow, &req.schemaTag, req.name, req.suid, TSDB_SUPER_TABLE); + tDecoderClear(&coder); + return string; + +_err: + tDecoderClear(&coder); + return string; +} + +static char *buildCreateCTableJson(STag* pTag, int64_t sid, char* name, int64_t id){ + char* string = NULL; + cJSON* json = cJSON_CreateObject(); + if (json == NULL) { + return string; + } + cJSON* type = cJSON_CreateString("create"); + cJSON_AddItemToObject(json, "type", type); + char cid[32] = {0}; + sprintf(cid, "%"PRIi64, id); + cJSON* cid_ = cJSON_CreateString(cid); + cJSON_AddItemToObject(json, "id", cid_); + + cJSON* tableName = cJSON_CreateString(name); + cJSON_AddItemToObject(json, "tableName", tableName); + cJSON* tableType = cJSON_CreateString("child"); + cJSON_AddItemToObject(json, "tableType", tableType); + + char sid_[32] = {0}; + sprintf(sid_, "%"PRIi64, sid); + cJSON* using = cJSON_CreateString(sid_); + cJSON_AddItemToObject(json, "using", using); +// cJSON* version = cJSON_CreateNumber(1); +// cJSON_AddItemToObject(json, "version", version); + + cJSON* tags = cJSON_CreateArray(); + + if (tTagIsJson(pTag)) { // todo + char* pJson = parseTagDatatoJson(pTag); + + cJSON* tag = cJSON_CreateObject(); + cJSON* tname = cJSON_CreateString("unknown"); // todo + cJSON_AddItemToObject(tag, "name", tname); + cJSON* ttype = cJSON_CreateNumber(TSDB_DATA_TYPE_JSON); + cJSON_AddItemToObject(tag, "type", ttype); + cJSON* tvalue = cJSON_CreateString(pJson); + cJSON_AddItemToObject(tag, "value", tvalue); + cJSON_AddItemToArray(tags, tag); + cJSON_AddItemToObject(json, "tags", tags); + + string = cJSON_PrintUnformatted(json); + goto end; + } + + SArray* pTagVals = NULL; + int32_t code = tTagToValArray(pTag, &pTagVals); + if (code) { + goto end; + } + + for(int i = 0; i < taosArrayGetSize(pTagVals); i++){ + STagVal* pTagVal = (STagVal*)taosArrayGet(pTagVals, i); + + cJSON* tag = cJSON_CreateObject(); +// cJSON* tname = cJSON_CreateNumber(pTagVal->cid); + cJSON* tname = cJSON_CreateString("unkonwn"); // todo + cJSON_AddItemToObject(tag, "name", tname); + cJSON* ttype = cJSON_CreateNumber(pTagVal->type); + cJSON_AddItemToObject(tag, "type", ttype); + + char* buf = NULL; + if (IS_VAR_DATA_TYPE(pTagVal->type)) { + buf = taosMemoryCalloc(pTagVal->nData + 1, 1); + dataConverToStr(buf, pTagVal->type, pTagVal->pData, pTagVal->nData, NULL); + } else { + buf = taosMemoryCalloc(32, 1); + dataConverToStr(buf, pTagVal->type, &pTagVal->i64, tDataTypes[pTagVal->type].bytes, NULL); + } + + cJSON* tvalue = cJSON_CreateString(buf); + taosMemoryFree(buf); + cJSON_AddItemToObject(tag, "value", tvalue); + cJSON_AddItemToArray(tags, tag); + } + cJSON_AddItemToObject(json, "tags", tags); + string = cJSON_PrintUnformatted(json); + +end: + + cJSON_Delete(json); + return string; +} + +static char *processCreateTable(SMqMetaRsp *metaRsp){ + SDecoder decoder = {0}; + SVCreateTbBatchReq req = {0}; + SVCreateTbReq *pCreateReq; + char *string = NULL; + // decode + void* data = POINTER_SHIFT(metaRsp->metaRsp, sizeof(SMsgHead)); + int32_t len = metaRsp->metaRspLen - sizeof(SMsgHead); + tDecoderInit(&decoder, data, len); + if (tDecodeSVCreateTbBatchReq(&decoder, &req) < 0) { + goto _exit; + } + + // loop to create table + for (int32_t iReq = 0; iReq < req.nReqs; iReq++) { + pCreateReq = req.pReqs + iReq; + if(pCreateReq->type == TSDB_CHILD_TABLE){ + string = buildCreateCTableJson((STag*)pCreateReq->ctb.pTag, pCreateReq->ctb.suid, pCreateReq->name, pCreateReq->uid); + }else if(pCreateReq->type == TSDB_NORMAL_TABLE){ + string = buildCreateTableJson(&pCreateReq->ntb.schemaRow, NULL, pCreateReq->name, pCreateReq->uid, TSDB_NORMAL_TABLE); + } + } + + tDecoderClear(&decoder); + + _exit: + tDecoderClear(&decoder); + return string; +} + +static char *processAlterTable(SMqMetaRsp *metaRsp){ + SDecoder decoder = {0}; + SVAlterTbReq vAlterTbReq = {0}; + char *string = NULL; + + // decode + void* data = POINTER_SHIFT(metaRsp->metaRsp, sizeof(SMsgHead)); + int32_t len = metaRsp->metaRspLen - sizeof(SMsgHead); + tDecoderInit(&decoder, data, len); + if (tDecodeSVAlterTbReq(&decoder, &vAlterTbReq) < 0) { + goto _exit; + } + + cJSON* json = cJSON_CreateObject(); + if (json == NULL) { + goto _exit; + } + cJSON* type = cJSON_CreateString("alter"); + cJSON_AddItemToObject(json, "type", type); +// cJSON* uid = cJSON_CreateNumber(id); +// cJSON_AddItemToObject(json, "uid", uid); + cJSON* tableName = cJSON_CreateString(vAlterTbReq.tbName); + cJSON_AddItemToObject(json, "tableName", tableName); + cJSON* tableType = cJSON_CreateString(vAlterTbReq.action == TSDB_ALTER_TABLE_UPDATE_TAG_VAL ? "child" : "normal"); + cJSON_AddItemToObject(json, "tableType", tableType); + + switch (vAlterTbReq.action) { + case TSDB_ALTER_TABLE_ADD_COLUMN: { + cJSON* alterType = cJSON_CreateNumber(TSDB_ALTER_TABLE_ADD_COLUMN); + cJSON_AddItemToObject(json, "alterType", alterType); + cJSON* colName = cJSON_CreateString(vAlterTbReq.colName); + cJSON_AddItemToObject(json, "colName", colName); + cJSON* colType = cJSON_CreateNumber(vAlterTbReq.type); + cJSON_AddItemToObject(json, "colType", colType); + + if(vAlterTbReq.type == TSDB_DATA_TYPE_BINARY){ + int32_t length = vAlterTbReq.bytes - VARSTR_HEADER_SIZE; + cJSON* cbytes = cJSON_CreateNumber(length); + cJSON_AddItemToObject(json, "colLength", cbytes); + }else if (vAlterTbReq.type == TSDB_DATA_TYPE_NCHAR){ + int32_t length = (vAlterTbReq.bytes - VARSTR_HEADER_SIZE)/TSDB_NCHAR_SIZE; + cJSON* cbytes = cJSON_CreateNumber(length); + cJSON_AddItemToObject(json, "colLength", cbytes); + } + break; + } + case TSDB_ALTER_TABLE_DROP_COLUMN:{ + cJSON* alterType = cJSON_CreateNumber(TSDB_ALTER_TABLE_DROP_COLUMN); + cJSON_AddItemToObject(json, "alterType", alterType); + cJSON* colName = cJSON_CreateString(vAlterTbReq.colName); + cJSON_AddItemToObject(json, "colName", colName); + break; + } + case TSDB_ALTER_TABLE_UPDATE_COLUMN_BYTES:{ + cJSON* alterType = cJSON_CreateNumber(TSDB_ALTER_TABLE_UPDATE_COLUMN_BYTES); + cJSON_AddItemToObject(json, "alterType", alterType); + cJSON* colName = cJSON_CreateString(vAlterTbReq.colName); + cJSON_AddItemToObject(json, "colName", colName); + cJSON* colType = cJSON_CreateNumber(vAlterTbReq.type); + cJSON_AddItemToObject(json, "colType", colType); + if(vAlterTbReq.type == TSDB_DATA_TYPE_BINARY){ + int32_t length = vAlterTbReq.bytes - VARSTR_HEADER_SIZE; + cJSON* cbytes = cJSON_CreateNumber(length); + cJSON_AddItemToObject(json, "colLength", cbytes); + }else if (vAlterTbReq.type == TSDB_DATA_TYPE_NCHAR){ + int32_t length = (vAlterTbReq.bytes - VARSTR_HEADER_SIZE)/TSDB_NCHAR_SIZE; + cJSON* cbytes = cJSON_CreateNumber(length); + cJSON_AddItemToObject(json, "colLength", cbytes); + } + break; + } + case TSDB_ALTER_TABLE_UPDATE_COLUMN_NAME:{ + cJSON* alterType = cJSON_CreateNumber(TSDB_ALTER_TABLE_UPDATE_COLUMN_NAME); + cJSON_AddItemToObject(json, "alterType", alterType); + cJSON* colName = cJSON_CreateString(vAlterTbReq.colName); + cJSON_AddItemToObject(json, "colName", colName); + cJSON* colNewName = cJSON_CreateString(vAlterTbReq.colNewName); + cJSON_AddItemToObject(json, "colNewName", colNewName); + break; + } + case TSDB_ALTER_TABLE_UPDATE_TAG_VAL:{ + cJSON* alterType = cJSON_CreateNumber(TSDB_ALTER_TABLE_UPDATE_TAG_VAL); + cJSON_AddItemToObject(json, "alterType", alterType); + cJSON* tagName = cJSON_CreateString(vAlterTbReq.tagName); + cJSON_AddItemToObject(json, "colName", tagName); + cJSON* colValue = cJSON_CreateString("invalid, todo"); // todo + cJSON_AddItemToObject(json, "colValue", colValue); + cJSON* isNull = cJSON_CreateBool(vAlterTbReq.isNull); + cJSON_AddItemToObject(json, "colValueNull", isNull); + break; + } + default: + break; + } + string = cJSON_PrintUnformatted(json); + + _exit: + tDecoderClear(&decoder); + return string; +} + +static char *processDropSTable(SMqMetaRsp *metaRsp){ + SDecoder decoder = {0}; + SVDropStbReq req = {0}; + char *string = NULL; + + // decode + void* data = POINTER_SHIFT(metaRsp->metaRsp, sizeof(SMsgHead)); + int32_t len = metaRsp->metaRspLen - sizeof(SMsgHead); + tDecoderInit(&decoder, data, len); + if (tDecodeSVDropStbReq(&decoder, &req) < 0) { + goto _exit; + } + + cJSON* json = cJSON_CreateObject(); + if (json == NULL) { + goto _exit; + } + cJSON* type = cJSON_CreateString("drop"); + cJSON_AddItemToObject(json, "type", type); + char uid[32] = {0}; + sprintf(uid, "%"PRIi64, req.suid); + cJSON* id = cJSON_CreateString(uid); + cJSON_AddItemToObject(json, "id", id); + cJSON* tableName = cJSON_CreateString(req.name); + cJSON_AddItemToObject(json, "tableName", tableName); + cJSON* tableType = cJSON_CreateString("super"); + cJSON_AddItemToObject(json, "tableType", tableType); + + string = cJSON_PrintUnformatted(json); + + _exit: + tDecoderClear(&decoder); + return string; +} + +static char *processDropTable(SMqMetaRsp *metaRsp){ + SDecoder decoder = {0}; + SVDropTbBatchReq req = {0}; + char *string = NULL; + + // decode + void* data = POINTER_SHIFT(metaRsp->metaRsp, sizeof(SMsgHead)); + int32_t len = metaRsp->metaRspLen - sizeof(SMsgHead); + tDecoderInit(&decoder, data, len); + if (tDecodeSVDropTbBatchReq(&decoder, &req) < 0) { + goto _exit; + } + + cJSON* json = cJSON_CreateObject(); + if (json == NULL) { + goto _exit; + } + cJSON* type = cJSON_CreateString("drop"); + cJSON_AddItemToObject(json, "type", type); +// cJSON* uid = cJSON_CreateNumber(id); +// cJSON_AddItemToObject(json, "uid", uid); +// cJSON* tableType = cJSON_CreateString("normal"); +// cJSON_AddItemToObject(json, "tableType", tableType); + + cJSON* tableNameList = cJSON_CreateArray(); + for (int32_t iReq = 0; iReq < req.nReqs; iReq++) { + SVDropTbReq* pDropTbReq = req.pReqs + iReq; + + cJSON* tableName = cJSON_CreateString(pDropTbReq->name); // todo + cJSON_AddItemToArray(tableNameList, tableName); + } + cJSON_AddItemToObject(json, "tableNameList", tableNameList); + + string = cJSON_PrintUnformatted(json); + + _exit: + tDecoderClear(&decoder); + return string; +} + +char *tmq_get_json_meta(TAOS_RES *res){ + if (!TD_RES_TMQ_META(res)) { + return NULL; + } + + SMqMetaRspObj* pMetaRspObj = (SMqMetaRspObj*)res; + if(pMetaRspObj->metaRsp.resMsgType == TDMT_VND_CREATE_STB){ + return processCreateStb(&pMetaRspObj->metaRsp); + }else if(pMetaRspObj->metaRsp.resMsgType == TDMT_VND_ALTER_STB){ + return processCreateStb(&pMetaRspObj->metaRsp); + }else if(pMetaRspObj->metaRsp.resMsgType == TDMT_VND_DROP_STB){ + return processDropSTable(&pMetaRspObj->metaRsp); + }else if(pMetaRspObj->metaRsp.resMsgType == TDMT_VND_CREATE_TABLE){ + return processCreateTable(&pMetaRspObj->metaRsp); + }else if(pMetaRspObj->metaRsp.resMsgType == TDMT_VND_ALTER_TABLE){ + return processAlterTable(&pMetaRspObj->metaRsp); + }else if(pMetaRspObj->metaRsp.resMsgType == TDMT_VND_DROP_TABLE){ + return processDropTable(&pMetaRspObj->metaRsp); + } + return NULL; +} + void tmq_commit_async(tmq_t* tmq, const TAOS_RES* msg, tmq_commit_cb* cb, void* param) { tmqCommitInner2(tmq, msg, 0, 1, cb, param); } diff --git a/source/dnode/vnode/src/vnd/vnodeSvr.c b/source/dnode/vnode/src/vnd/vnodeSvr.c index 4530eee4a5..8ad92d0478 100644 --- a/source/dnode/vnode/src/vnd/vnodeSvr.c +++ b/source/dnode/vnode/src/vnd/vnodeSvr.c @@ -169,7 +169,7 @@ int32_t vnodeProcessWriteReq(SVnode *pVnode, SRpcMsg *pMsg, int64_t version, SRp if (vnodeProcessDropTbReq(pVnode, version, pReq, len, pRsp) < 0) goto _err; break; case TDMT_VND_DROP_TTL_TABLE: - //if (vnodeProcessDropTtlTbReq(pVnode, version, pReq, len, pRsp) < 0) goto _err; + if (vnodeProcessDropTtlTbReq(pVnode, version, pReq, len, pRsp) < 0) goto _err; break; case TDMT_VND_CREATE_SMA: { if (vnodeProcessCreateTSmaReq(pVnode, version, pReq, len, pRsp) < 0) goto _err; diff --git a/source/libs/qcom/src/queryUtil.c b/source/libs/qcom/src/queryUtil.c index 923224688c..b4e217ef74 100644 --- a/source/libs/qcom/src/queryUtil.c +++ b/source/libs/qcom/src/queryUtil.c @@ -265,7 +265,6 @@ int32_t dataConverToStr(char* str, int type, void* buf, int32_t bufSize, int32_t break; case TSDB_DATA_TYPE_BINARY: - case TSDB_DATA_TYPE_NCHAR: if (bufSize < 0) { // tscError("invalid buf size"); return TSDB_CODE_TSC_INVALID_VALUE; @@ -276,7 +275,20 @@ int32_t dataConverToStr(char* str, int type, void* buf, int32_t bufSize, int32_t *(str + bufSize + 1) = '"'; n = bufSize + 2; break; + case TSDB_DATA_TYPE_NCHAR: + if (bufSize < 0) { + // tscError("invalid buf size"); + return TSDB_CODE_TSC_INVALID_VALUE; + } + *str = '"'; + int32_t length = taosUcs4ToMbs((TdUcs4 *)buf, bufSize, str + 1); + if (length <= 0) { + return TSDB_CODE_TSC_INVALID_VALUE; + } + *(str + length + 1) = '"'; + n = length + 2; + break; case TSDB_DATA_TYPE_UTINYINT: n = sprintf(str, "%d", *(uint8_t*)buf); break; @@ -298,7 +310,7 @@ int32_t dataConverToStr(char* str, int type, void* buf, int32_t bufSize, int32_t return TSDB_CODE_TSC_INVALID_VALUE; } - *len = n; + if(len) *len = n; return TSDB_CODE_SUCCESS; } diff --git a/source/libs/tdb/src/db/tdbBtree.c b/source/libs/tdb/src/db/tdbBtree.c index 5d51a031bf..3768eef7c9 100644 --- a/source/libs/tdb/src/db/tdbBtree.c +++ b/source/libs/tdb/src/db/tdbBtree.c @@ -314,7 +314,6 @@ static int tdbDefaultKeyCmprFn(const void *pKey1, int keyLen1, const void *pKey2 static int tdbBtreeOpenImpl(SBTree *pBt) { // Try to get the root page of the an existing btree - SPgno pgno; SPage *pPage; int ret; @@ -1993,6 +1992,7 @@ int tdbBtcMoveTo(SBTC *pBtc, const void *pKey, int kLen, int *pCRst) { const void *pTKey; int tkLen; + tdbTrace("ttl moveto, pager:%p, ipage:%d", pPager, pBtc->iPage); if (pBtc->iPage < 0) { // move from a clear cursor ret = tdbPagerFetchPage(pPager, &pBt->root, &(pBtc->pPage), tdbBtreeInitPage, diff --git a/source/libs/tdb/src/db/tdbDb.c b/source/libs/tdb/src/db/tdbDb.c index c06f7305ab..186217878b 100644 --- a/source/libs/tdb/src/db/tdbDb.c +++ b/source/libs/tdb/src/db/tdbDb.c @@ -121,9 +121,10 @@ SPager *tdbEnvGetPager(TDB *pDb, const char *fname) { hash = tdbCstringHash(fname); ppPager = &pDb->pgrHash[hash % pDb->nPgrHash]; + tdbTrace("tdbttl getPager1: pager:%p, index:%d, name:%s", *ppPager, hash % pDb->nPgrHash, fname); for (; *ppPager && (strcmp(fname, (*ppPager)->dbFileName) != 0); ppPager = &((*ppPager)->pHashNext)) { } - + tdbTrace("tdbttl getPager2: pager:%p, index:%d, name:%s", *ppPager, hash % pDb->nPgrHash, fname); return *ppPager; } @@ -143,9 +144,12 @@ void tdbEnvAddPager(TDB *pDb, SPager *pPager) { // add to hash hash = tdbCstringHash(pPager->dbFileName); ppPager = &pDb->pgrHash[hash % pDb->nPgrHash]; + tdbTrace("tdbttl addPager1: pager:%p, index:%d, name:%s", *ppPager, hash % pDb->nPgrHash, pPager->dbFileName); pPager->pHashNext = *ppPager; *ppPager = pPager; + tdbTrace("tdbttl addPager2: pager:%p, index:%d, name:%s", *ppPager, hash % pDb->nPgrHash, pPager->dbFileName); + // increase the counter pDb->nPager++; } diff --git a/source/libs/tdb/src/db/tdbPager.c b/source/libs/tdb/src/db/tdbPager.c index 892a913773..dad4511491 100644 --- a/source/libs/tdb/src/db/tdbPager.c +++ b/source/libs/tdb/src/db/tdbPager.c @@ -230,6 +230,7 @@ int tdbPagerCommit(SPager *pPager, TXN *pTxn) { } } + tdbTrace("tdbttl commit:%p, %d", pPager, pPager->dbOrigSize); pPager->dbOrigSize = pPager->dbFileSize; // release the page @@ -285,6 +286,7 @@ int tdbPagerFetchPage(SPager *pPager, SPgno *ppgno, SPage **ppPage, int (*initPa return -1; } + tdbTrace("tdbttl fetch pager:%p", pPage->pPager); // init page if need if (!TDB_PAGE_INITIALIZED(pPage)) { ret = tdbPagerInitPage(pPager, pPage, initPage, arg, loadPage); @@ -363,10 +365,12 @@ static int tdbPagerInitPage(SPager *pPager, SPage *pPage, int (*initPage)(SPage pgno = TDB_PAGE_PGNO(pPage); + tdbTrace("tdbttl init pager:%p, pgno:%d, loadPage:%d, size:%d", pPager, pgno, loadPage, pPager->dbOrigSize); if (loadPage && pgno <= pPager->dbOrigSize) { init = 1; nRead = tdbOsPRead(pPager->fd, pPage->pData, pPage->pageSize, ((i64)pPage->pageSize) * (pgno - 1)); + tdbTrace("tdbttl pager:%p, pgno:%d, nRead:%ld", pPager, pgno, nRead); if (nRead < pPage->pageSize) { ASSERT(0); return -1; diff --git a/tests/system-test/2-query/json_tag.py b/tests/system-test/2-query/json_tag.py index 81098159f2..0e9d9cbdb8 100644 --- a/tests/system-test/2-query/json_tag.py +++ b/tests/system-test/2-query/json_tag.py @@ -33,9 +33,9 @@ class TDTestCase: def init(self, conn, logSql): self.testcasePath = os.path.split(__file__)[0] self.testcaseFilename = os.path.split(__file__)[-1] - os.system("rm -rf %s/%s.sql" % (self.testcasePath,self.testcaseFilename)) + # os.system("rm -rf %s/%s.sql" % (self.testcasePath,self.testcaseFilename)) tdLog.debug("start to execute %s" % __file__) - tdSql.init(conn.cursor(), logSql) + tdSql.init(conn.cursor(), True) def run(self): # tdSql.prepare() diff --git a/tests/system-test/fulltest.sh b/tests/system-test/fulltest.sh index 2e0330a2df..0b91b556cc 100755 --- a/tests/system-test/fulltest.sh +++ b/tests/system-test/fulltest.sh @@ -27,8 +27,7 @@ python3 ./test.py -f 1-insert/table_comment.py python3 ./test.py -f 1-insert/time_range_wise.py python3 ./test.py -f 1-insert/block_wise.py python3 ./test.py -f 1-insert/create_retentions.py - -#python3 ./test.py -f 1-insert/table_param_ttl.py +python3 ./test.py -f 1-insert/table_param_ttl.py python3 ./test.py -f 2-query/between.py python3 ./test.py -f 2-query/distinct.py @@ -118,7 +117,7 @@ python3 ./test.py -f 2-query/distribute_agg_avg.py python3 ./test.py -f 2-query/distribute_agg_stddev.py python3 ./test.py -f 2-query/twa.py python3 ./test.py -f 2-query/irate.py -python3 ./test.py -f 2-query/and_or_for_byte.py +#python3 ./test.py -f 2-query/and_or_for_byte.py python3 ./test.py -f 2-query/function_null.py python3 ./test.py -f 2-query/queryQnode.py