diff --git a/source/client/src/clientRawBlockWrite.c b/source/client/src/clientRawBlockWrite.c index eb33263286..330f16bfd2 100644 --- a/source/client/src/clientRawBlockWrite.c +++ b/source/client/src/clientRawBlockWrite.c @@ -16,6 +16,8 @@ #include "cJSON.h" #include "clientInt.h" #include "parser.h" +#include "tcol.h" +#include "tcompression.h" #include "tdatablock.h" #include "tdef.h" #include "tglobal.h" @@ -27,7 +29,7 @@ static tb_uid_t processSuid(tb_uid_t suid, char* db) { return suid + MurmurHash3_32(db, strlen(db)); } static char* buildCreateTableJson(SSchemaWrapper* schemaRow, SSchemaWrapper* schemaTag, char* name, int64_t id, - int8_t t) { + int8_t t, SColCmprWrapper* pColCmprRow) { char* string = NULL; cJSON* json = cJSON_CreateObject(); if (json == NULL) { @@ -67,6 +69,23 @@ static char* buildCreateTableJson(SSchemaWrapper* schemaRow, SSchemaWrapper* sch cJSON* isPk = cJSON_CreateBool(s->flags & COL_IS_KEY); cJSON_AddItemToObject(column, "isPrimarykey", isPk); cJSON_AddItemToArray(columns, column); + + if (pColCmprRow == NULL || pColCmprRow->nCols <= i) { + continue; + } + SColCmpr* pColCmpr = pColCmprRow->pColCmpr + i; + const char* encode = columnEncodeStr(COMPRESS_L1_TYPE_U32(pColCmpr->alg)); + const char* compress = columnCompressStr(COMPRESS_L2_TYPE_U32(pColCmpr->alg)); + const char* level = columnLevelStr(COMPRESS_L2_TYPE_LEVEL_U32(pColCmpr->alg)); + + cJSON* encodeJson = cJSON_CreateString(encode); + cJSON_AddItemToObject(column, "encode", encodeJson); + + cJSON* compressJson = cJSON_CreateString(compress); + cJSON_AddItemToObject(column, "compress", compressJson); + + cJSON* levelJson = cJSON_CreateString(level); + cJSON_AddItemToObject(column, "level", levelJson); } cJSON_AddItemToObject(json, "columns", columns); @@ -205,7 +224,7 @@ static char* processCreateStb(SMqMetaRsp* metaRsp) { if (tDecodeSVCreateStbReq(&coder, &req) < 0) { goto _err; } - string = buildCreateTableJson(&req.schemaRow, &req.schemaTag, req.name, req.suid, TSDB_SUPER_TABLE); + string = buildCreateTableJson(&req.schemaRow, &req.schemaTag, req.name, req.suid, TSDB_SUPER_TABLE, &req.colCmpr); _err: uDebug("create stable return, sql json:%s", string); tDecoderClear(&coder); @@ -373,8 +392,8 @@ static char* processCreateTable(SMqMetaRsp* metaRsp) { if (pCreateReq->type == TSDB_CHILD_TABLE) { string = buildCreateCTableJson(req.pReqs, req.nReqs); } else if (pCreateReq->type == TSDB_NORMAL_TABLE) { - string = - buildCreateTableJson(&pCreateReq->ntb.schemaRow, NULL, pCreateReq->name, pCreateReq->uid, TSDB_NORMAL_TABLE); + string = buildCreateTableJson(&pCreateReq->ntb.schemaRow, NULL, pCreateReq->name, pCreateReq->uid, + TSDB_NORMAL_TABLE, &pCreateReq->colCmpr); } } @@ -714,8 +733,8 @@ static int32_t taosCreateStb(TAOS* taos, void* meta, int32_t metaLen) { // build create stable pReq.pColumns = taosArrayInit(req.schemaRow.nCols, sizeof(SFieldWithOptions)); for (int32_t i = 0; i < req.schemaRow.nCols; i++) { - SSchema* pSchema = req.schemaRow.pSchema + i; - SFieldWithOptions field = {.type = pSchema->type, .flags = pSchema->flags, .bytes = pSchema->bytes}; + SSchema* pSchema = req.schemaRow.pSchema + i; + SFieldWithOptions field = {.type = pSchema->type, .flags = pSchema->flags, .bytes = pSchema->bytes}; strcpy(field.name, pSchema->name); // todo get active compress param setDefaultOptionsForField(&field); @@ -1346,10 +1365,10 @@ static int32_t taosAlterTable(TAOS* taos, void* meta, int32_t metaLen) { } pVgData->vg = pInfo; - int tlen = 0; + int tlen = 0; req.source = TD_REQ_FROM_TAOX; tEncodeSize(tEncodeSVAlterTbReq, &req, tlen, code); - if(code != 0){ + if (code != 0) { code = TSDB_CODE_OUT_OF_MEMORY; goto end; } @@ -1365,7 +1384,7 @@ static int32_t taosAlterTable(TAOS* taos, void* meta, int32_t metaLen) { SEncoder coder = {0}; tEncoderInit(&coder, pBuf, tlen - sizeof(SMsgHead)); code = tEncodeSVAlterTbReq(&coder, &req); - if(code != 0){ + if (code != 0) { tEncoderClear(&coder); code = TSDB_CODE_OUT_OF_MEMORY; goto end; @@ -1631,7 +1650,7 @@ static int32_t tmqWriteRawDataImpl(TAOS* taos, void* data, int32_t dataLen) { rspObj.common.resType = RES_TYPE__TMQ; int8_t dataVersion = *(int8_t*)data; - if (dataVersion >= MQ_DATA_RSP_VERSION){ + if (dataVersion >= MQ_DATA_RSP_VERSION) { data = POINTER_SHIFT(data, sizeof(int8_t) + sizeof(int32_t)); dataLen -= sizeof(int8_t) + sizeof(int32_t); } @@ -1777,7 +1796,7 @@ static int32_t tmqWriteRawMetaDataImpl(TAOS* taos, void* data, int32_t dataLen) rspObj.common.resType = RES_TYPE__TMQ_METADATA; int8_t dataVersion = *(int8_t*)data; - if (dataVersion >= MQ_DATA_RSP_VERSION){ + if (dataVersion >= MQ_DATA_RSP_VERSION) { data = POINTER_SHIFT(data, sizeof(int8_t) + sizeof(int32_t)); dataLen -= sizeof(int8_t) + sizeof(int32_t); } @@ -1913,7 +1932,7 @@ static int32_t tmqWriteRawMetaDataImpl(TAOS* taos, void* data, int32_t dataLen) tstrncpy(fields[i].name, pSW->pSchema[i].name, tListLen(pSW->pSchema[i].name)); } void* rawData = getRawDataFromRes(pRetrieve); - char err[ERR_MSG_LEN] = {0}; + char err[ERR_MSG_LEN] = {0}; code = rawBlockBindData(pQuery, pTableMeta, rawData, &pCreateReqDst, fields, pSW->nCols, true, err, ERR_MSG_LEN); taosMemoryFree(fields); if (code != TSDB_CODE_SUCCESS) { @@ -1982,9 +2001,9 @@ char* tmq_get_json_meta(TAOS_RES* res) { void tmq_free_json_meta(char* jsonMeta) { taosMemoryFreeClear(jsonMeta); } -static int32_t getOffSetLen(const void *rsp){ - const SMqDataRspCommon *pRsp = rsp; - SEncoder coder = {0}; +static int32_t getOffSetLen(const void* rsp) { + const SMqDataRspCommon* pRsp = rsp; + SEncoder coder = {0}; tEncoderInit(&coder, NULL, 0); if (tEncodeSTqOffsetVal(&coder, &pRsp->reqOffset) < 0) return -1; if (tEncodeSTqOffsetVal(&coder, &pRsp->rspOffset) < 0) return -1; @@ -1993,13 +2012,13 @@ static int32_t getOffSetLen(const void *rsp){ return pos; } -typedef int32_t __encode_func__(SEncoder *pEncoder, const void *pRsp); +typedef int32_t __encode_func__(SEncoder* pEncoder, const void* pRsp); -static int32_t encodeMqDataRsp(__encode_func__* encodeFunc, void* rspObj, tmq_raw_data* raw){ - int32_t len = 0; +static int32_t encodeMqDataRsp(__encode_func__* encodeFunc, void* rspObj, tmq_raw_data* raw) { + int32_t len = 0; int32_t code = 0; SEncoder encoder = {0}; - void* buf = NULL; + void* buf = NULL; tEncodeSize(encodeFunc, rspObj, len, code); if (code < 0) { terrno = TSDB_CODE_INVALID_MSG; @@ -2007,17 +2026,17 @@ static int32_t encodeMqDataRsp(__encode_func__* encodeFunc, void* rspObj, tmq_ra } len += sizeof(int8_t) + sizeof(int32_t); buf = taosMemoryCalloc(1, len); - if(buf == NULL){ + if (buf == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; goto FAILED; } tEncoderInit(&encoder, buf, len); if (tEncodeI8(&encoder, MQ_DATA_RSP_VERSION) < 0) { - terrno = TSDB_CODE_INVALID_MSG; - goto FAILED; + terrno = TSDB_CODE_INVALID_MSG; + goto FAILED; } int32_t offsetLen = getOffSetLen(rspObj); - if(offsetLen <= 0){ + if (offsetLen <= 0) { terrno = TSDB_CODE_INVALID_MSG; goto FAILED; } @@ -2025,7 +2044,7 @@ static int32_t encodeMqDataRsp(__encode_func__* encodeFunc, void* rspObj, tmq_ra terrno = TSDB_CODE_INVALID_MSG; goto FAILED; } - if(encodeFunc(&encoder, rspObj) < 0){ + if (encodeFunc(&encoder, rspObj) < 0) { terrno = TSDB_CODE_INVALID_MSG; goto FAILED; } @@ -2053,7 +2072,7 @@ int32_t tmq_get_raw(TAOS_RES* res, tmq_raw_data* raw) { uDebug("tmq get raw type meta:%p", raw); } else if (TD_RES_TMQ(res)) { SMqRspObj* rspObj = ((SMqRspObj*)res); - if(encodeMqDataRsp(tEncodeMqDataRsp, &rspObj->rsp, raw) != 0){ + if (encodeMqDataRsp(tEncodeMqDataRsp, &rspObj->rsp, raw) != 0) { uError("tmq get raw type error:%d", terrno); return terrno; } @@ -2062,7 +2081,7 @@ int32_t tmq_get_raw(TAOS_RES* res, tmq_raw_data* raw) { } else if (TD_RES_TMQ_METADATA(res)) { SMqTaosxRspObj* rspObj = ((SMqTaosxRspObj*)res); - if(encodeMqDataRsp(tEncodeSTaosxRsp, &rspObj->rsp, raw) != 0){ + if (encodeMqDataRsp(tEncodeSTaosxRsp, &rspObj->rsp, raw) != 0) { uError("tmq get raw type error:%d", terrno); return terrno; } diff --git a/source/dnode/vnode/src/meta/metaTable.c b/source/dnode/vnode/src/meta/metaTable.c index e9ddaf9fca..c9246b67f8 100644 --- a/source/dnode/vnode/src/meta/metaTable.c +++ b/source/dnode/vnode/src/meta/metaTable.c @@ -274,8 +274,6 @@ int metaCreateSTable(SMeta *pMeta, int64_t version, SVCreateStbReq *pReq) { me.name = pReq->name; me.stbEntry.schemaRow = pReq->schemaRow; me.stbEntry.schemaTag = pReq->schemaTag; - // me.stbEntry.colCmpr = pReq->colCmpr; - // me.stbEntry.colCmpr = pReq-> if (pReq->rollup) { TABLE_SET_ROLLUP(me.flags); me.stbEntry.rsmaParam = pReq->rsmaParam;