diff --git a/source/client/src/clientRawBlockWrite.c b/source/client/src/clientRawBlockWrite.c index 7c78928333..82f7285357 100644 --- a/source/client/src/clientRawBlockWrite.c +++ b/source/client/src/clientRawBlockWrite.c @@ -25,11 +25,12 @@ #include "tref.h" #include "ttimer.h" -static void buildCreateTableJson(SSchemaWrapper* schemaRow, SSchemaWrapper* schemaTag, char* name, int64_t id, - int8_t t, cJSON* tables) { +static char* buildCreateTableJson(SSchemaWrapper* schemaRow, SSchemaWrapper* schemaTag, char* name, int64_t id, + int8_t t) { + char* string = NULL; cJSON* json = cJSON_CreateObject(); if (json == NULL) { - return; + return NULL; } cJSON* type = cJSON_CreateString("create"); cJSON_AddItemToObject(json, "type", type); @@ -86,7 +87,10 @@ static void buildCreateTableJson(SSchemaWrapper* schemaRow, SSchemaWrapper* sche cJSON_AddItemToArray(tags, tag); } cJSON_AddItemToObject(json, "tags", tags); - cJSON_AddItemToArray(tables, json); + + string = cJSON_PrintUnformatted(json); + cJSON_Delete(json); + return string; } static char* buildAlterSTableJson(void* alterData, int32_t alterDataLen) { @@ -185,7 +189,6 @@ static char* processCreateStb(SMqMetaRsp* metaRsp) { SVCreateStbReq req = {0}; SDecoder coder; char* string = NULL; - cJSON* tables = cJSON_CreateArray(); // decode and process req void* data = POINTER_SHIFT(metaRsp->metaRsp, sizeof(SMsgHead)); @@ -195,11 +198,9 @@ static char* processCreateStb(SMqMetaRsp* metaRsp) { if (tDecodeSVCreateStbReq(&coder, &req) < 0) { goto _err; } - buildCreateTableJson(&req.schemaRow, &req.schemaTag, req.name, req.suid, TSDB_SUPER_TABLE, tables); - string = cJSON_PrintUnformatted(tables); + string = buildCreateTableJson(&req.schemaRow, &req.schemaTag, req.name, req.suid, TSDB_SUPER_TABLE); _err: - cJSON_Delete(tables); tDecoderClear(&coder); return string; } @@ -226,23 +227,16 @@ _err: return string; } -static void buildCreateCTableJson(STag* pTag, char* sname, char* name, SArray* tagName, int64_t id, uint8_t tagNum, cJSON* tables) { - SArray* pTagVals = NULL; - cJSON* json = cJSON_CreateObject(); - if (json == NULL) { - return; - } - cJSON* type = cJSON_CreateString("create"); - cJSON_AddItemToObject(json, "type", type); - // char cid[32] = {0}; - // sprintf(cid, "%"PRIi64, id); - // cJSON* cid_ = cJSON_CreateString(cid); - // cJSON_AddItemToObject(json, "id", cid_); +static void buildChildElement(cJSON* json, SVCreateTbReq* pCreateReq){ + STag* pTag = (STag*)pCreateReq->ctb.pTag; + char* sname = pCreateReq->ctb.name; + char* name = pCreateReq->name; + SArray* tagName = pCreateReq->ctb.tagName; + int64_t id = pCreateReq->uid; + uint8_t tagNum = pCreateReq->ctb.tagNum; cJSON* tableName = cJSON_CreateString(name); cJSON_AddItemToObject(json, "tableName", tableName); - cJSON* tableType = cJSON_CreateString("child"); - cJSON_AddItemToObject(json, "tableType", tableType); cJSON* using = cJSON_CreateString(sname); cJSON_AddItemToObject(json, "using", using); cJSON* tagNumJson = cJSON_CreateNumber(tagNum); @@ -251,6 +245,7 @@ static void buildCreateCTableJson(STag* pTag, char* sname, char* name, SArray* t // cJSON_AddItemToObject(json, "version", version); cJSON* tags = cJSON_CreateArray(); + SArray* pTagVals = NULL; int32_t code = tTagToValArray(pTag, &pTagVals); if (code) { goto end; @@ -309,10 +304,38 @@ static void buildCreateCTableJson(STag* pTag, char* sname, char* name, SArray* t cJSON_AddItemToArray(tags, tag); } -end: + end: cJSON_AddItemToObject(json, "tags", tags); taosArrayDestroy(pTagVals); - cJSON_AddItemToArray(tables, json); +} + +static char* buildCreateCTableJson(SVCreateTbReq* pCreateReq, int32_t nReqs) { + char* string = NULL; + cJSON* json = cJSON_CreateObject(); + if (json == NULL) { + return NULL; + } + cJSON* type = cJSON_CreateString("create"); + cJSON_AddItemToObject(json, "type", type); + // char cid[32] = {0}; + // sprintf(cid, "%"PRIi64, id); + // cJSON* cid_ = cJSON_CreateString(cid); + // cJSON_AddItemToObject(json, "id", cid_); + + cJSON* tableType = cJSON_CreateString("child"); + cJSON_AddItemToObject(json, "tableType", tableType); + + buildChildElement(json, pCreateReq); + cJSON* createList = cJSON_CreateArray(); + for(int i = 0; nReqs > 1 && i < nReqs; i++){ + cJSON* create = cJSON_CreateObject(); + buildChildElement(create, pCreateReq + i); + cJSON_AddItemToArray(createList, create); + } + cJSON_AddItemToObject(json, "createList", createList); + string = cJSON_PrintUnformatted(json); + cJSON_Delete(json); + return string; } static char* processCreateTable(SMqMetaRsp* metaRsp) { @@ -329,54 +352,47 @@ static char* processCreateTable(SMqMetaRsp* metaRsp) { } // loop to create table - cJSON* tables = cJSON_CreateArray(); - for (int32_t iReq = 0; iReq < req.nReqs; iReq++) { - pCreateReq = req.pReqs + iReq; + if (req.nReqs > 0) { + pCreateReq = req.pReqs; if (pCreateReq->type == TSDB_CHILD_TABLE) { - buildCreateCTableJson((STag*)pCreateReq->ctb.pTag, pCreateReq->ctb.name, pCreateReq->name, - pCreateReq->ctb.tagName, pCreateReq->uid, pCreateReq->ctb.tagNum, tables); + string = buildCreateCTableJson(req.pReqs, req.nReqs); } else if (pCreateReq->type == TSDB_NORMAL_TABLE) { - buildCreateTableJson(&pCreateReq->ntb.schemaRow, NULL, pCreateReq->name, pCreateReq->uid, TSDB_NORMAL_TABLE, tables); + string = buildCreateTableJson(&pCreateReq->ntb.schemaRow, NULL, pCreateReq->name, pCreateReq->uid, TSDB_NORMAL_TABLE); } } - string = cJSON_PrintUnformatted(tables); - cJSON_Delete(tables); _exit: tDecoderClear(&decoder); return string; } static char* processAutoCreateTable(STaosxRsp* rsp) { - SDecoder decoder = {0}; - SVCreateTbReq* pCreateReq; + if(rsp->createTableNum == 0) return NULL; + + SDecoder* decoder = taosMemoryCalloc(rsp->createTableNum, sizeof(SDecoder)); + SVCreateTbReq* pCreateReq = taosMemoryCalloc(rsp->createTableNum, sizeof(SVCreateTbReq)); char* string = NULL; - // loop to create table - cJSON* tables = cJSON_CreateArray(); for (int32_t iReq = 0; iReq < rsp->createTableNum; iReq++) { // decode void** data = taosArrayGet(rsp->createTableReq, iReq); int32_t *len = taosArrayGet(rsp->createTableLen, iReq); - tDecoderInit(&decoder, *data, *len); - if (tDecodeSVCreateTbReq(&decoder, pCreateReq) < 0) { - tDecoderClear(&decoder); + tDecoderInit(&decoder[iReq], *data, *len); + if (tDecodeSVCreateTbReq(&decoder[iReq], pCreateReq + iReq) < 0) { goto _exit; } - if (pCreateReq->type == TSDB_CHILD_TABLE) { - buildCreateCTableJson((STag*)pCreateReq->ctb.pTag, pCreateReq->ctb.name, pCreateReq->name, - pCreateReq->ctb.tagName, pCreateReq->uid, pCreateReq->ctb.tagNum, tables); - } else if (pCreateReq->type == TSDB_NORMAL_TABLE) { - buildCreateTableJson(&pCreateReq->ntb.schemaRow, NULL, pCreateReq->name, pCreateReq->uid, TSDB_NORMAL_TABLE, tables); - } - tDecoderClear(&decoder); + ASSERT(pCreateReq[iReq].type == TSDB_CHILD_TABLE); } + string = buildCreateCTableJson(pCreateReq, rsp->createTableNum); - string = cJSON_PrintUnformatted(tables); _exit: - cJSON_Delete(tables); + for(int i = 0; i < rsp->createTableNum; i++){ + tDecoderClear(&decoder[i]); + } + taosMemoryFree(decoder); + taosMemoryFree(pCreateReq); return string; } diff --git a/source/client/src/clientTmq.c b/source/client/src/clientTmq.c index a84c36790a..047191ec27 100644 --- a/source/client/src/clientTmq.c +++ b/source/client/src/clientTmq.c @@ -1470,11 +1470,11 @@ SMqTaosxRspObj* tmqBuildTaosxRspFromWrapper(SMqPollRspWrapper* pWrapper) { tstrncpy(pRspObj->db, pWrapper->topicHandle->db, TSDB_DB_FNAME_LEN); pRspObj->vgId = pWrapper->vgHandle->vgId; pRspObj->resIter = -1; - memcpy(&pRspObj->rsp, &pWrapper->dataRsp, sizeof(SMqTaosxRspObj)); + memcpy(&pRspObj->rsp, &pWrapper->taosxRsp, sizeof(STaosxRsp)); pRspObj->resInfo.totalRows = 0; pRspObj->resInfo.precision = TSDB_TIME_PRECISION_MILLI; - if (!pWrapper->dataRsp.withSchema) { + if (!pWrapper->taosxRsp.withSchema) { setResSchemaInfo(&pRspObj->resInfo, pWrapper->topicHandle->schema.pSchema, pWrapper->topicHandle->schema.nCols); } @@ -1786,6 +1786,9 @@ const char* tmq_get_topic_name(TAOS_RES* res) { } else if (TD_RES_TMQ_META(res)) { SMqMetaRspObj* pMetaRspObj = (SMqMetaRspObj*)res; return strchr(pMetaRspObj->topic, '.') + 1; + } else if (TD_RES_TMQ_METADATA(res)) { + SMqTaosxRspObj* pRspObj = (SMqTaosxRspObj*)res; + return strchr(pRspObj->topic, '.') + 1; } else { return NULL; } @@ -1798,6 +1801,9 @@ const char* tmq_get_db_name(TAOS_RES* res) { } else if (TD_RES_TMQ_META(res)) { SMqMetaRspObj* pMetaRspObj = (SMqMetaRspObj*)res; return strchr(pMetaRspObj->db, '.') + 1; + } else if (TD_RES_TMQ_METADATA(res)) { + SMqTaosxRspObj* pRspObj = (SMqTaosxRspObj*)res; + return strchr(pRspObj->db, '.') + 1; } else { return NULL; } @@ -1810,6 +1816,9 @@ int32_t tmq_get_vgroup_id(TAOS_RES* res) { } else if (TD_RES_TMQ_META(res)) { SMqMetaRspObj* pMetaRspObj = (SMqMetaRspObj*)res; return pMetaRspObj->vgId; + } else if (TD_RES_TMQ_META(res)) { + SMqTaosxRspObj* pRspObj = (SMqTaosxRspObj*)res; + return pRspObj->vgId; } else { return -1; } diff --git a/utils/test/c/tmq_taosx_ci.c b/utils/test/c/tmq_taosx_ci.c index f917b9159e..4458a70748 100644 --- a/utils/test/c/tmq_taosx_ci.c +++ b/utils/test/c/tmq_taosx_ci.c @@ -54,12 +54,12 @@ static void msg_process(TAOS_RES* msg) { printf("db: %s\n", tmq_get_db_name(msg)); printf("vg: %d\n", tmq_get_vgroup_id(msg)); TAOS *pConn = use_db(); - if (tmq_get_res_type(msg) == TMQ_RES_TABLE_META) { + if (tmq_get_res_type(msg) == TMQ_RES_TABLE_META || tmq_get_res_type(msg) == TMQ_RES_METADATA) { char* result = tmq_get_json_meta(msg); if (result) { printf("meta result: %s\n", result); } - if(g_fp){ + if(g_fp && result){ taosFprintfFile(g_fp, result); taosFprintfFile(g_fp, "\n"); }