diff --git a/source/client/src/clientRawBlockWrite.c b/source/client/src/clientRawBlockWrite.c index b04e81c193..427e238a7b 100644 --- a/source/client/src/clientRawBlockWrite.c +++ b/source/client/src/clientRawBlockWrite.c @@ -115,6 +115,30 @@ static char* buildCreateTableJson(SSchemaWrapper* schemaRow, SSchemaWrapper* sch return string; } +static int32_t setCompressOption(cJSON* json, uint32_t para) { + uint8_t encode = COMPRESS_L1_TYPE_U32(para); + if (encode != 0) { + const char* encodeStr = columnEncodeStr(encode); + cJSON* encodeJson = cJSON_CreateString(encodeStr); + cJSON_AddItemToObject(json, "encode", encodeJson); + return 0; + } + uint8_t compress = COMPRESS_L2_TYPE_U32(para); + if (compress != 0) { + const char* compressStr = columnCompressStr(compress); + cJSON* compressJson = cJSON_CreateString(compressStr); + cJSON_AddItemToObject(json, "compress", compressJson); + return 0; + } + uint8_t level = COMPRESS_L2_TYPE_LEVEL_U32(para); + if (level != 0) { + const char* levelStr = columnLevelStr(level); + cJSON* levelJson = cJSON_CreateString(levelStr); + cJSON_AddItemToObject(json, "level", levelJson); + return 0; + } + return 0; +} static char* buildAlterSTableJson(void* alterData, int32_t alterDataLen) { SMAlterStbReq req = {0}; cJSON* json = NULL; @@ -199,6 +223,13 @@ static char* buildAlterSTableJson(void* alterData, int32_t alterDataLen) { cJSON_AddItemToObject(json, "colNewName", colNewName); break; } + case TSDB_ALTER_TABLE_UPDATE_COLUMN_COMPRESS: { + TAOS_FIELD* field = taosArrayGet(req.pFields, 0); + cJSON* colName = cJSON_CreateString(field->name); + cJSON_AddItemToObject(json, "colName", colName); + setCompressOption(json, field->bytes); + break; + } default: break; } @@ -568,6 +599,12 @@ static char* processAlterTable(SMqMetaRsp* metaRsp) { cJSON_AddItemToObject(json, "colValueNull", isNullCJson); break; } + case TSDB_ALTER_TABLE_UPDATE_COLUMN_COMPRESS: { + cJSON* colName = cJSON_CreateString(vAlterTbReq.colName); + cJSON_AddItemToObject(json, "colName", colName); + setCompressOption(json, vAlterTbReq.compress); + break; + } default: break; } diff --git a/source/common/src/tmsg.c b/source/common/src/tmsg.c index 6c2358a1d7..45b0b6ac2b 100644 --- a/source/common/src/tmsg.c +++ b/source/common/src/tmsg.c @@ -69,7 +69,7 @@ static int32_t tDecodeSVAlterTbReqCommon(SDecoder *pDecoder, SVAlterTbReq *pReq); static int32_t tDecodeSBatchDeleteReqCommon(SDecoder *pDecoder, SBatchDeleteReq *pReq); static int32_t tEncodeTableTSMAInfoRsp(SEncoder *pEncoder, const STableTSMAInfoRsp *pRsp); -static int32_t tDecodeTableTSMAInfoRsp(SDecoder* pDecoder, STableTSMAInfoRsp* pRsp); +static int32_t tDecodeTableTSMAInfoRsp(SDecoder *pDecoder, STableTSMAInfoRsp *pRsp); int32_t tInitSubmitMsgIter(const SSubmitReq *pMsg, SSubmitMsgIter *pIter) { if (pMsg == NULL) { @@ -895,8 +895,8 @@ int32_t tSerializeSMCreateSmaReq(void *buf, int32_t bufLen, SMCreateSmaReq *pReq if (tEncodeI64(&encoder, pReq->normSourceTbUid) < 0) return -1; if (tEncodeI32(&encoder, taosArrayGetSize(pReq->pVgroupVerList)) < 0) return -1; - for(int32_t i = 0; i < taosArrayGetSize(pReq->pVgroupVerList); ++i) { - SVgroupVer* p = taosArrayGet(pReq->pVgroupVerList, i); + for (int32_t i = 0; i < taosArrayGetSize(pReq->pVgroupVerList); ++i) { + SVgroupVer *p = taosArrayGet(pReq->pVgroupVerList, i); if (tEncodeI32(&encoder, p->vgId) < 0) return -1; if (tEncodeI64(&encoder, p->ver) < 0) return -1; } @@ -8000,7 +8000,7 @@ int32_t tDeserializeSCMCreateStreamReq(void *buf, int32_t bufLen, SCMCreateStrea } } if (!tDecodeIsEnd(&decoder)) { - if (tDecodeI64(&decoder, &pReq->smaId)< 0) return -1; + if (tDecodeI64(&decoder, &pReq->smaId) < 0) return -1; } tEndDecode(&decoder); @@ -8445,8 +8445,8 @@ static int32_t tDecodeSVDropTbRsp(SDecoder *pCoder, SVDropTbRsp *pReq) { } int32_t tEncodeSVDropTbBatchReq(SEncoder *pCoder, const SVDropTbBatchReq *pReq) { - int32_t nReqs = taosArrayGetSize(pReq->pArray); - SVDropTbReq *pDropTbReq; + int32_t nReqs = taosArrayGetSize(pReq->pArray); + SVDropTbReq *pDropTbReq; if (tStartEncode(pCoder) < 0) return -1; @@ -8709,6 +8709,7 @@ int32_t tEncodeSVAlterTbReq(SEncoder *pEncoder, const SVAlterTbReq *pReq) { } break; case TSDB_ALTER_TABLE_UPDATE_COLUMN_COMPRESS: + if (tEncodeCStr(pEncoder, pReq->colName) < 0) return -1; if (tEncodeU32(pEncoder, pReq->compress) < 0) return -1; break; default: @@ -8763,6 +8764,7 @@ static int32_t tDecodeSVAlterTbReqCommon(SDecoder *pDecoder, SVAlterTbReq *pReq) } break; case TSDB_ALTER_TABLE_UPDATE_COLUMN_COMPRESS: + if (tDecodeCStr(pDecoder, &pReq->colName) < 0) return -1; if (tDecodeU32(pDecoder, &pReq->compress) < 0) return -1; break; default: @@ -9200,7 +9202,7 @@ int32_t tEncodeMqDataRspCommon(SEncoder *pEncoder, const SMqDataRspCommon *pRsp) int32_t tEncodeMqDataRsp(SEncoder *pEncoder, const void *pRsp) { if (tEncodeMqDataRspCommon(pEncoder, pRsp) < 0) return -1; - if (tEncodeI64(pEncoder, ((SMqDataRsp*)pRsp)->sleepTime) < 0) return -1; + if (tEncodeI64(pEncoder, ((SMqDataRsp *)pRsp)->sleepTime) < 0) return -1; return 0; } @@ -9253,7 +9255,7 @@ int32_t tDecodeMqDataRspCommon(SDecoder *pDecoder, SMqDataRspCommon *pRsp) { int32_t tDecodeMqDataRsp(SDecoder *pDecoder, void *pRsp) { if (tDecodeMqDataRspCommon(pDecoder, pRsp) < 0) return -1; if (!tDecodeIsEnd(pDecoder)) { - if (tDecodeI64(pDecoder, &((SMqDataRsp*)pRsp)->sleepTime) < 0) return -1; + if (tDecodeI64(pDecoder, &((SMqDataRsp *)pRsp)->sleepTime) < 0) return -1; } return 0; @@ -9272,9 +9274,7 @@ static void tDeleteMqDataRspCommon(void *rsp) { tOffsetDestroy(&pRsp->rspOffset); } -void tDeleteMqDataRsp(void *rsp) { - tDeleteMqDataRspCommon(rsp); -} +void tDeleteMqDataRsp(void *rsp) { tDeleteMqDataRspCommon(rsp); } int32_t tEncodeSTaosxRsp(SEncoder *pEncoder, const void *rsp) { if (tEncodeMqDataRspCommon(pEncoder, rsp) < 0) return -1; @@ -9300,7 +9300,7 @@ int32_t tDecodeSTaosxRsp(SDecoder *pDecoder, void *rsp) { pRsp->createTableLen = taosArrayInit(pRsp->createTableNum, sizeof(int32_t)); pRsp->createTableReq = taosArrayInit(pRsp->createTableNum, sizeof(void *)); for (int32_t i = 0; i < pRsp->createTableNum; i++) { - void * pCreate = NULL; + void *pCreate = NULL; uint64_t len = 0; if (tDecodeBinaryAlloc(pDecoder, &pCreate, &len) < 0) return -1; int32_t l = (int32_t)len; @@ -10114,7 +10114,7 @@ void setFieldWithOptions(SFieldWithOptions *fieldWithOptions, SField *field) { fieldWithOptions->type = field->type; strncpy(fieldWithOptions->name, field->name, TSDB_COL_NAME_LEN); } -int32_t tSerializeTableTSMAInfoReq(void* buf, int32_t bufLen, const STableTSMAInfoReq* pReq) { +int32_t tSerializeTableTSMAInfoReq(void *buf, int32_t bufLen, const STableTSMAInfoReq *pReq) { SEncoder encoder = {0}; tEncoderInit(&encoder, buf, bufLen); @@ -10129,13 +10129,13 @@ int32_t tSerializeTableTSMAInfoReq(void* buf, int32_t bufLen, const STableTSMAIn return tlen; } -int32_t tDeserializeTableTSMAInfoReq(void* buf, int32_t bufLen, STableTSMAInfoReq* pReq) { +int32_t tDeserializeTableTSMAInfoReq(void *buf, int32_t bufLen, STableTSMAInfoReq *pReq) { SDecoder decoder = {0}; tDecoderInit(&decoder, buf, bufLen); if (tStartDecode(&decoder) < 0) return -1; if (tDecodeCStrTo(&decoder, pReq->name) < 0) return -1; - if (tDecodeI8(&decoder, (uint8_t*)&pReq->fetchingWithTsmaName) < 0) return -1; + if (tDecodeI8(&decoder, (uint8_t *)&pReq->fetchingWithTsmaName) < 0) return -1; tEndDecode(&decoder); @@ -10143,7 +10143,7 @@ int32_t tDeserializeTableTSMAInfoReq(void* buf, int32_t bufLen, STableTSMAInfoRe return 0; } -static int32_t tEncodeTableTSMAInfo(SEncoder* pEncoder, const STableTSMAInfo* pTsmaInfo) { +static int32_t tEncodeTableTSMAInfo(SEncoder *pEncoder, const STableTSMAInfo *pTsmaInfo) { if (tEncodeCStr(pEncoder, pTsmaInfo->name) < 0) return -1; if (tEncodeU64(pEncoder, pTsmaInfo->tsmaId) < 0) return -1; if (tEncodeCStr(pEncoder, pTsmaInfo->tb) < 0) return -1; @@ -10160,7 +10160,7 @@ static int32_t tEncodeTableTSMAInfo(SEncoder* pEncoder, const STableTSMAInfo* pT int32_t size = pTsmaInfo->pFuncs ? pTsmaInfo->pFuncs->size : 0; if (tEncodeI32(pEncoder, size) < 0) return -1; for (int32_t i = 0; i < size; ++i) { - STableTSMAFuncInfo* pFuncInfo = taosArrayGet(pTsmaInfo->pFuncs, i); + STableTSMAFuncInfo *pFuncInfo = taosArrayGet(pTsmaInfo->pFuncs, i); if (tEncodeI32(pEncoder, pFuncInfo->funcId) < 0) return -1; if (tEncodeI16(pEncoder, pFuncInfo->colId) < 0) return -1; } @@ -10168,13 +10168,13 @@ static int32_t tEncodeTableTSMAInfo(SEncoder* pEncoder, const STableTSMAInfo* pT size = pTsmaInfo->pTags ? pTsmaInfo->pTags->size : 0; if (tEncodeI32(pEncoder, size) < 0) return -1; for (int32_t i = 0; i < size; ++i) { - const SSchema* pSchema = taosArrayGet(pTsmaInfo->pTags, i); + const SSchema *pSchema = taosArrayGet(pTsmaInfo->pTags, i); if (tEncodeSSchema(pEncoder, pSchema) < 0) return -1; } size = pTsmaInfo->pUsedCols ? pTsmaInfo->pUsedCols->size : 0; if (tEncodeI32(pEncoder, size) < 0) return -1; for (int32_t i = 0; i < size; ++i) { - const SSchema* pSchema = taosArrayGet(pTsmaInfo->pUsedCols, i); + const SSchema *pSchema = taosArrayGet(pTsmaInfo->pUsedCols, i); if (tEncodeSSchema(pEncoder, pSchema) < 0) return -1; } @@ -10187,7 +10187,7 @@ static int32_t tEncodeTableTSMAInfo(SEncoder* pEncoder, const STableTSMAInfo* pT return 0; } -static int32_t tDecodeTableTSMAInfo(SDecoder* pDecoder, STableTSMAInfo* pTsmaInfo) { +static int32_t tDecodeTableTSMAInfo(SDecoder *pDecoder, STableTSMAInfo *pTsmaInfo) { if (tDecodeCStrTo(pDecoder, pTsmaInfo->name) < 0) return -1; if (tDecodeU64(pDecoder, &pTsmaInfo->tsmaId) < 0) return -1; if (tDecodeCStrTo(pDecoder, pTsmaInfo->tb) < 0) return -1; @@ -10219,7 +10219,7 @@ static int32_t tDecodeTableTSMAInfo(SDecoder* pDecoder, STableTSMAInfo* pTsmaInf if (!pTsmaInfo->pTags) return -1; for (int32_t i = 0; i < size; ++i) { SSchema schema = {0}; - if(tDecodeSSchema(pDecoder, &schema) < 0) return -1; + if (tDecodeSSchema(pDecoder, &schema) < 0) return -1; taosArrayPush(pTsmaInfo->pTags, &schema); } } @@ -10239,7 +10239,7 @@ static int32_t tDecodeTableTSMAInfo(SDecoder* pDecoder, STableTSMAInfo* pTsmaInf if (tDecodeI64(pDecoder, &pTsmaInfo->reqTs) < 0) return -1; if (tDecodeI64(pDecoder, &pTsmaInfo->rspTs) < 0) return -1; if (tDecodeI64(pDecoder, &pTsmaInfo->delayDuration) < 0) return -1; - if (tDecodeI8(pDecoder, (int8_t*)&pTsmaInfo->fillHistoryFinished) < 0) return -1; + if (tDecodeI8(pDecoder, (int8_t *)&pTsmaInfo->fillHistoryFinished) < 0) return -1; return 0; } @@ -10247,13 +10247,13 @@ static int32_t tEncodeTableTSMAInfoRsp(SEncoder *pEncoder, const STableTSMAInfoR int32_t size = pRsp->pTsmas ? pRsp->pTsmas->size : 0; if (tEncodeI32(pEncoder, size) < 0) return -1; for (int32_t i = 0; i < size; ++i) { - STableTSMAInfo* pInfo = taosArrayGetP(pRsp->pTsmas, i); + STableTSMAInfo *pInfo = taosArrayGetP(pRsp->pTsmas, i); if (tEncodeTableTSMAInfo(pEncoder, pInfo) < 0) return -1; } return 0; } -static int32_t tDecodeTableTSMAInfoRsp(SDecoder* pDecoder, STableTSMAInfoRsp* pRsp) { +static int32_t tDecodeTableTSMAInfoRsp(SDecoder *pDecoder, STableTSMAInfoRsp *pRsp) { int32_t size = 0; if (tDecodeI32(pDecoder, &size) < 0) return -1; if (size <= 0) return 0; @@ -10268,7 +10268,7 @@ static int32_t tDecodeTableTSMAInfoRsp(SDecoder* pDecoder, STableTSMAInfoRsp* pR return 0; } -int32_t tSerializeTableTSMAInfoRsp(void* buf, int32_t bufLen, const STableTSMAInfoRsp* pRsp) { +int32_t tSerializeTableTSMAInfoRsp(void *buf, int32_t bufLen, const STableTSMAInfoRsp *pRsp) { SEncoder encoder = {0}; tEncoderInit(&encoder, buf, bufLen); @@ -10282,7 +10282,7 @@ int32_t tSerializeTableTSMAInfoRsp(void* buf, int32_t bufLen, const STableTSMAIn return tlen; } -int32_t tDeserializeTableTSMAInfoRsp(void* buf, int32_t bufLen, STableTSMAInfoRsp* pRsp) { +int32_t tDeserializeTableTSMAInfoRsp(void *buf, int32_t bufLen, STableTSMAInfoRsp *pRsp) { SDecoder decoder = {0}; tDecoderInit(&decoder, buf, bufLen); @@ -10295,7 +10295,7 @@ int32_t tDeserializeTableTSMAInfoRsp(void* buf, int32_t bufLen, STableTSMAInfoRs return 0; } -void tFreeTableTSMAInfo(void* p) { +void tFreeTableTSMAInfo(void *p) { STableTSMAInfo *pTsmaInfo = p; if (pTsmaInfo) { taosArrayDestroy(pTsmaInfo->pFuncs); @@ -10305,20 +10305,20 @@ void tFreeTableTSMAInfo(void* p) { } } -void tFreeAndClearTableTSMAInfo(void* p) { - STableTSMAInfo* pTsmaInfo = (STableTSMAInfo*)p; +void tFreeAndClearTableTSMAInfo(void *p) { + STableTSMAInfo *pTsmaInfo = (STableTSMAInfo *)p; if (pTsmaInfo) { tFreeTableTSMAInfo(pTsmaInfo); taosMemoryFree(pTsmaInfo); } } -int32_t tCloneTbTSMAInfo(STableTSMAInfo* pInfo, STableTSMAInfo** pRes) { +int32_t tCloneTbTSMAInfo(STableTSMAInfo *pInfo, STableTSMAInfo **pRes) { int32_t code = TSDB_CODE_SUCCESS; if (NULL == pInfo) { return TSDB_CODE_SUCCESS; } - STableTSMAInfo* pRet = taosMemoryCalloc(1, sizeof(STableTSMAInfo)); + STableTSMAInfo *pRet = taosMemoryCalloc(1, sizeof(STableTSMAInfo)); if (!pRet) return TSDB_CODE_OUT_OF_MEMORY; *pRet = *pInfo; @@ -10357,7 +10357,7 @@ static int32_t tEncodeStreamProgressReq(SEncoder *pEncoder, const SStreamProgres return 0; } -int32_t tSerializeStreamProgressReq(void* buf, int32_t bufLen, const SStreamProgressReq* pReq) { +int32_t tSerializeStreamProgressReq(void *buf, int32_t bufLen, const SStreamProgressReq *pReq) { SEncoder encoder = {0}; tEncoderInit(&encoder, buf, bufLen); @@ -10371,7 +10371,7 @@ int32_t tSerializeStreamProgressReq(void* buf, int32_t bufLen, const SStreamProg return tlen; } -static int32_t tDecodeStreamProgressReq(SDecoder* pDecoder, SStreamProgressReq* pReq) { +static int32_t tDecodeStreamProgressReq(SDecoder *pDecoder, SStreamProgressReq *pReq) { if (tDecodeI64(pDecoder, &pReq->streamId) < 0) return -1; if (tDecodeI32(pDecoder, &pReq->vgId) < 0) return -1; if (tDecodeI32(pDecoder, &pReq->fetchIdx) < 0) return -1; @@ -10379,7 +10379,7 @@ static int32_t tDecodeStreamProgressReq(SDecoder* pDecoder, SStreamProgressReq* return 0; } -int32_t tDeserializeStreamProgressReq(void* buf, int32_t bufLen, SStreamProgressReq* pReq) { +int32_t tDeserializeStreamProgressReq(void *buf, int32_t bufLen, SStreamProgressReq *pReq) { SDecoder decoder = {0}; tDecoderInit(&decoder, (char *)buf, bufLen); @@ -10392,7 +10392,7 @@ int32_t tDeserializeStreamProgressReq(void* buf, int32_t bufLen, SStreamProgress return 0; } -static int32_t tEncodeStreamProgressRsp(SEncoder* pEncoder, const SStreamProgressRsp* pRsp) { +static int32_t tEncodeStreamProgressRsp(SEncoder *pEncoder, const SStreamProgressRsp *pRsp) { if (tEncodeI64(pEncoder, pRsp->streamId) < 0) return -1; if (tEncodeI32(pEncoder, pRsp->vgId) < 0) return -1; if (tEncodeI8(pEncoder, pRsp->fillHisFinished) < 0) return -1; @@ -10402,7 +10402,7 @@ static int32_t tEncodeStreamProgressRsp(SEncoder* pEncoder, const SStreamProgres return 0; } -int32_t tSerializeStreamProgressRsp(void* buf, int32_t bufLen, const SStreamProgressRsp* pRsp) { +int32_t tSerializeStreamProgressRsp(void *buf, int32_t bufLen, const SStreamProgressRsp *pRsp) { SEncoder encoder = {0}; tEncoderInit(&encoder, buf, bufLen); @@ -10416,17 +10416,17 @@ int32_t tSerializeStreamProgressRsp(void* buf, int32_t bufLen, const SStreamProg return tlen; } -static int32_t tDecodeStreamProgressRsp(SDecoder* pDecoder, SStreamProgressRsp* pRsp) { +static int32_t tDecodeStreamProgressRsp(SDecoder *pDecoder, SStreamProgressRsp *pRsp) { if (tDecodeI64(pDecoder, &pRsp->streamId) < 0) return -1; if (tDecodeI32(pDecoder, &pRsp->vgId) < 0) return -1; - if (tDecodeI8(pDecoder, (int8_t*)&pRsp->fillHisFinished) < 0) return -1; + if (tDecodeI8(pDecoder, (int8_t *)&pRsp->fillHisFinished) < 0) return -1; if (tDecodeI64(pDecoder, &pRsp->progressDelay) < 0) return -1; if (tDecodeI32(pDecoder, &pRsp->fetchIdx) < 0) return -1; if (tDecodeI32(pDecoder, &pRsp->subFetchIdx) < 0) return -1; return 0; } -int32_t tDeserializeSStreamProgressRsp(void* buf, int32_t bufLen, SStreamProgressRsp* pRsp) { +int32_t tDeserializeSStreamProgressRsp(void *buf, int32_t bufLen, SStreamProgressRsp *pRsp) { SDecoder decoder = {0}; tDecoderInit(&decoder, buf, bufLen); @@ -10440,22 +10440,22 @@ int32_t tDeserializeSStreamProgressRsp(void* buf, int32_t bufLen, SStreamProgres } int32_t tEncodeSMDropTbReqOnSingleVg(SEncoder *pEncoder, const SMDropTbReqsOnSingleVg *pReq) { - const SVgroupInfo* pVgInfo = &pReq->vgInfo; + const SVgroupInfo *pVgInfo = &pReq->vgInfo; if (tEncodeI32(pEncoder, pVgInfo->vgId) < 0) return -1; if (tEncodeU32(pEncoder, pVgInfo->hashBegin) < 0) return -1; if (tEncodeU32(pEncoder, pVgInfo->hashEnd) < 0) return -1; if (tEncodeSEpSet(pEncoder, &pVgInfo->epSet) < 0) return -1; if (tEncodeI32(pEncoder, pVgInfo->numOfTable) < 0) return -1; - int32_t size = pReq->pTbs ? pReq->pTbs->size: 0; + int32_t size = pReq->pTbs ? pReq->pTbs->size : 0; if (tEncodeI32(pEncoder, size) < 0) return -1; for (int32_t i = 0; i < size; ++i) { - const SVDropTbReq* pInfo = taosArrayGet(pReq->pTbs, i); + const SVDropTbReq *pInfo = taosArrayGet(pReq->pTbs, i); if (tEncodeSVDropTbReq(pEncoder, pInfo) < 0) return -1; } return 0; } -int32_t tDecodeSMDropTbReqOnSingleVg(SDecoder* pDecoder, SMDropTbReqsOnSingleVg* pReq) { +int32_t tDecodeSMDropTbReqOnSingleVg(SDecoder *pDecoder, SMDropTbReqsOnSingleVg *pReq) { if (tDecodeI32(pDecoder, &pReq->vgInfo.vgId) < 0) return -1; if (tDecodeU32(pDecoder, &pReq->vgInfo.hashBegin) < 0) return -1; if (tDecodeU32(pDecoder, &pReq->vgInfo.hashEnd) < 0) return -1; @@ -10477,18 +10477,18 @@ int32_t tDecodeSMDropTbReqOnSingleVg(SDecoder* pDecoder, SMDropTbReqsOnSingleVg* } void tFreeSMDropTbReqOnSingleVg(void *p) { - SMDropTbReqsOnSingleVg* pReq = p; + SMDropTbReqsOnSingleVg *pReq = p; taosArrayDestroy(pReq->pTbs); } -int32_t tSerializeSMDropTbsReq(void* buf, int32_t bufLen, const SMDropTbsReq* pReq){ +int32_t tSerializeSMDropTbsReq(void *buf, int32_t bufLen, const SMDropTbsReq *pReq) { SEncoder encoder = {0}; tEncoderInit(&encoder, buf, bufLen); tStartEncode(&encoder); int32_t size = pReq->pVgReqs ? pReq->pVgReqs->size : 0; if (tEncodeI32(&encoder, size) < 0) return -1; for (int32_t i = 0; i < size; ++i) { - SMDropTbReqsOnSingleVg* pVgReq = taosArrayGet(pReq->pVgReqs, i); + SMDropTbReqsOnSingleVg *pVgReq = taosArrayGet(pReq->pVgReqs, i); if (tEncodeSMDropTbReqOnSingleVg(&encoder, pVgReq) < 0) return -1; } tEndEncode(&encoder); @@ -10497,7 +10497,7 @@ int32_t tSerializeSMDropTbsReq(void* buf, int32_t bufLen, const SMDropTbsReq* pR return tlen; } -int32_t tDeserializeSMDropTbsReq(void* buf, int32_t bufLen, SMDropTbsReq* pReq) { +int32_t tDeserializeSMDropTbsReq(void *buf, int32_t bufLen, SMDropTbsReq *pReq) { SDecoder decoder = {0}; tDecoderInit(&decoder, buf, bufLen); tStartDecode(&decoder); @@ -10518,12 +10518,12 @@ int32_t tDeserializeSMDropTbsReq(void* buf, int32_t bufLen, SMDropTbsReq* pReq) return 0; } -void tFreeSMDropTbsReq(void* p) { - SMDropTbsReq* pReq = p; +void tFreeSMDropTbsReq(void *p) { + SMDropTbsReq *pReq = p; taosArrayDestroyEx(pReq->pVgReqs, tFreeSMDropTbReqOnSingleVg); } -int32_t tEncodeVFetchTtlExpiredTbsRsp(SEncoder* pCoder, const SVFetchTtlExpiredTbsRsp* pRsp) { +int32_t tEncodeVFetchTtlExpiredTbsRsp(SEncoder *pCoder, const SVFetchTtlExpiredTbsRsp *pRsp) { if (tEncodeI32(pCoder, pRsp->vgId) < 0) return -1; int32_t size = pRsp->pExpiredTbs ? pRsp->pExpiredTbs->size : 0; if (tEncodeI32(pCoder, size) < 0) return -1; @@ -10533,7 +10533,7 @@ int32_t tEncodeVFetchTtlExpiredTbsRsp(SEncoder* pCoder, const SVFetchTtlExpiredT return 0; } -int32_t tDecodeVFetchTtlExpiredTbsRsp(SDecoder* pCoder, SVFetchTtlExpiredTbsRsp* pRsp) { +int32_t tDecodeVFetchTtlExpiredTbsRsp(SDecoder *pCoder, SVFetchTtlExpiredTbsRsp *pRsp) { if (tDecodeI32(pCoder, &pRsp->vgId) < 0) return -1; int32_t size = 0; if (tDecodeI32(pCoder, &size) < 0) return -1; @@ -10549,7 +10549,7 @@ int32_t tDecodeVFetchTtlExpiredTbsRsp(SDecoder* pCoder, SVFetchTtlExpiredTbsRsp* return 0; } -void tFreeFetchTtlExpiredTbsRsp(void* p) { - SVFetchTtlExpiredTbsRsp* pRsp = p; +void tFreeFetchTtlExpiredTbsRsp(void *p) { + SVFetchTtlExpiredTbsRsp *pRsp = p; taosArrayDestroy(pRsp->pExpiredTbs); } diff --git a/source/libs/stream/src/streamBackendRocksdb.c b/source/libs/stream/src/streamBackendRocksdb.c index 06093cbaf8..0a2a4617fc 100644 --- a/source/libs/stream/src/streamBackendRocksdb.c +++ b/source/libs/stream/src/streamBackendRocksdb.c @@ -2179,6 +2179,7 @@ int32_t copyDataAt(RocksdbCfInst* pSrc, STaskDbWrapper* pDst, int8_t i) { } _EXIT: + rocksdb_writebatch_destroy(wb); rocksdb_iter_destroy(pIter); rocksdb_readoptions_destroy(pRdOpt); taosMemoryFree(err); diff --git a/source/util/src/tcompression.c b/source/util/src/tcompression.c index da39a2423b..da67b68c1c 100644 --- a/source/util/src/tcompression.c +++ b/source/util/src/tcompression.c @@ -188,7 +188,7 @@ int32_t l2ComressInitImpl_zlib(char *lossyColumns, float fPrecision, double dPre int32_t l2CompressImpl_zlib(const char *const input, const int32_t inputSize, char *const output, int32_t outputSize, const char type, int8_t lvl) { uLongf dstLen = outputSize - 1; - int32_t ret = compress2((Bytef *)(output + 1), (uLongf *)&dstLen, (Bytef *)input, (uLong)inputSize, 9); + int32_t ret = compress2((Bytef *)(output + 1), (uLongf *)&dstLen, (Bytef *)input, (uLong)inputSize, lvl); if (ret == Z_OK) { output[0] = 1; return dstLen + 1;