Merge pull request #25462 from taosdata/enh/supportTmqInterface

Enh/support tmq interface
This commit is contained in:
Hongze Cheng 2024-04-26 13:05:33 +08:00 committed by GitHub
commit e678d2508f
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 92 additions and 54 deletions

View File

@ -115,6 +115,30 @@ static char* buildCreateTableJson(SSchemaWrapper* schemaRow, SSchemaWrapper* sch
return string;
}
static int32_t setCompressOption(cJSON* json, uint32_t para) {
uint8_t encode = COMPRESS_L1_TYPE_U32(para);
if (encode != 0) {
const char* encodeStr = columnEncodeStr(encode);
cJSON* encodeJson = cJSON_CreateString(encodeStr);
cJSON_AddItemToObject(json, "encode", encodeJson);
return 0;
}
uint8_t compress = COMPRESS_L2_TYPE_U32(para);
if (compress != 0) {
const char* compressStr = columnCompressStr(compress);
cJSON* compressJson = cJSON_CreateString(compressStr);
cJSON_AddItemToObject(json, "compress", compressJson);
return 0;
}
uint8_t level = COMPRESS_L2_TYPE_LEVEL_U32(para);
if (level != 0) {
const char* levelStr = columnLevelStr(level);
cJSON* levelJson = cJSON_CreateString(levelStr);
cJSON_AddItemToObject(json, "level", levelJson);
return 0;
}
return 0;
}
static char* buildAlterSTableJson(void* alterData, int32_t alterDataLen) {
SMAlterStbReq req = {0};
cJSON* json = NULL;
@ -199,6 +223,13 @@ static char* buildAlterSTableJson(void* alterData, int32_t alterDataLen) {
cJSON_AddItemToObject(json, "colNewName", colNewName);
break;
}
case TSDB_ALTER_TABLE_UPDATE_COLUMN_COMPRESS: {
TAOS_FIELD* field = taosArrayGet(req.pFields, 0);
cJSON* colName = cJSON_CreateString(field->name);
cJSON_AddItemToObject(json, "colName", colName);
setCompressOption(json, field->bytes);
break;
}
default:
break;
}
@ -568,6 +599,12 @@ static char* processAlterTable(SMqMetaRsp* metaRsp) {
cJSON_AddItemToObject(json, "colValueNull", isNullCJson);
break;
}
case TSDB_ALTER_TABLE_UPDATE_COLUMN_COMPRESS: {
cJSON* colName = cJSON_CreateString(vAlterTbReq.colName);
cJSON_AddItemToObject(json, "colName", colName);
setCompressOption(json, vAlterTbReq.compress);
break;
}
default:
break;
}

View File

@ -8709,6 +8709,7 @@ int32_t tEncodeSVAlterTbReq(SEncoder *pEncoder, const SVAlterTbReq *pReq) {
}
break;
case TSDB_ALTER_TABLE_UPDATE_COLUMN_COMPRESS:
if (tEncodeCStr(pEncoder, pReq->colName) < 0) return -1;
if (tEncodeU32(pEncoder, pReq->compress) < 0) return -1;
break;
default:
@ -8763,6 +8764,7 @@ static int32_t tDecodeSVAlterTbReqCommon(SDecoder *pDecoder, SVAlterTbReq *pReq)
}
break;
case TSDB_ALTER_TABLE_UPDATE_COLUMN_COMPRESS:
if (tDecodeCStr(pDecoder, &pReq->colName) < 0) return -1;
if (tDecodeU32(pDecoder, &pReq->compress) < 0) return -1;
break;
default:
@ -9272,9 +9274,7 @@ static void tDeleteMqDataRspCommon(void *rsp) {
tOffsetDestroy(&pRsp->rspOffset);
}
void tDeleteMqDataRsp(void *rsp) {
tDeleteMqDataRspCommon(rsp);
}
void tDeleteMqDataRsp(void *rsp) { tDeleteMqDataRspCommon(rsp); }
int32_t tEncodeSTaosxRsp(SEncoder *pEncoder, const void *rsp) {
if (tEncodeMqDataRspCommon(pEncoder, rsp) < 0) return -1;

View File

@ -2179,6 +2179,7 @@ int32_t copyDataAt(RocksdbCfInst* pSrc, STaskDbWrapper* pDst, int8_t i) {
}
_EXIT:
rocksdb_writebatch_destroy(wb);
rocksdb_iter_destroy(pIter);
rocksdb_readoptions_destroy(pRdOpt);
taosMemoryFree(err);

View File

@ -188,7 +188,7 @@ int32_t l2ComressInitImpl_zlib(char *lossyColumns, float fPrecision, double dPre
int32_t l2CompressImpl_zlib(const char *const input, const int32_t inputSize, char *const output, int32_t outputSize,
const char type, int8_t lvl) {
uLongf dstLen = outputSize - 1;
int32_t ret = compress2((Bytef *)(output + 1), (uLongf *)&dstLen, (Bytef *)input, (uLong)inputSize, 9);
int32_t ret = compress2((Bytef *)(output + 1), (uLongf *)&dstLen, (Bytef *)input, (uLong)inputSize, lvl);
if (ret == Z_OK) {
output[0] = 1;
return dstLen + 1;