add tmq interface
This commit is contained in:
parent
0acc5f485a
commit
f0147770e0
|
@ -16,6 +16,8 @@
|
||||||
#include "cJSON.h"
|
#include "cJSON.h"
|
||||||
#include "clientInt.h"
|
#include "clientInt.h"
|
||||||
#include "parser.h"
|
#include "parser.h"
|
||||||
|
#include "tcol.h"
|
||||||
|
#include "tcompression.h"
|
||||||
#include "tdatablock.h"
|
#include "tdatablock.h"
|
||||||
#include "tdef.h"
|
#include "tdef.h"
|
||||||
#include "tglobal.h"
|
#include "tglobal.h"
|
||||||
|
@ -27,7 +29,7 @@
|
||||||
static tb_uid_t processSuid(tb_uid_t suid, char* db) { return suid + MurmurHash3_32(db, strlen(db)); }
|
static tb_uid_t processSuid(tb_uid_t suid, char* db) { return suid + MurmurHash3_32(db, strlen(db)); }
|
||||||
|
|
||||||
static char* buildCreateTableJson(SSchemaWrapper* schemaRow, SSchemaWrapper* schemaTag, char* name, int64_t id,
|
static char* buildCreateTableJson(SSchemaWrapper* schemaRow, SSchemaWrapper* schemaTag, char* name, int64_t id,
|
||||||
int8_t t) {
|
int8_t t, SColCmprWrapper* pColCmprRow) {
|
||||||
char* string = NULL;
|
char* string = NULL;
|
||||||
cJSON* json = cJSON_CreateObject();
|
cJSON* json = cJSON_CreateObject();
|
||||||
if (json == NULL) {
|
if (json == NULL) {
|
||||||
|
@ -67,6 +69,23 @@ static char* buildCreateTableJson(SSchemaWrapper* schemaRow, SSchemaWrapper* sch
|
||||||
cJSON* isPk = cJSON_CreateBool(s->flags & COL_IS_KEY);
|
cJSON* isPk = cJSON_CreateBool(s->flags & COL_IS_KEY);
|
||||||
cJSON_AddItemToObject(column, "isPrimarykey", isPk);
|
cJSON_AddItemToObject(column, "isPrimarykey", isPk);
|
||||||
cJSON_AddItemToArray(columns, column);
|
cJSON_AddItemToArray(columns, column);
|
||||||
|
|
||||||
|
if (pColCmprRow == NULL || pColCmprRow->nCols <= i) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
SColCmpr* pColCmpr = pColCmprRow->pColCmpr + i;
|
||||||
|
const char* encode = columnEncodeStr(COMPRESS_L1_TYPE_U32(pColCmpr->alg));
|
||||||
|
const char* compress = columnCompressStr(COMPRESS_L2_TYPE_U32(pColCmpr->alg));
|
||||||
|
const char* level = columnLevelStr(COMPRESS_L2_TYPE_LEVEL_U32(pColCmpr->alg));
|
||||||
|
|
||||||
|
cJSON* encodeJson = cJSON_CreateString(encode);
|
||||||
|
cJSON_AddItemToObject(column, "encode", encodeJson);
|
||||||
|
|
||||||
|
cJSON* compressJson = cJSON_CreateString(compress);
|
||||||
|
cJSON_AddItemToObject(column, "compress", compressJson);
|
||||||
|
|
||||||
|
cJSON* levelJson = cJSON_CreateString(level);
|
||||||
|
cJSON_AddItemToObject(column, "level", levelJson);
|
||||||
}
|
}
|
||||||
cJSON_AddItemToObject(json, "columns", columns);
|
cJSON_AddItemToObject(json, "columns", columns);
|
||||||
|
|
||||||
|
@ -205,7 +224,7 @@ static char* processCreateStb(SMqMetaRsp* metaRsp) {
|
||||||
if (tDecodeSVCreateStbReq(&coder, &req) < 0) {
|
if (tDecodeSVCreateStbReq(&coder, &req) < 0) {
|
||||||
goto _err;
|
goto _err;
|
||||||
}
|
}
|
||||||
string = buildCreateTableJson(&req.schemaRow, &req.schemaTag, req.name, req.suid, TSDB_SUPER_TABLE);
|
string = buildCreateTableJson(&req.schemaRow, &req.schemaTag, req.name, req.suid, TSDB_SUPER_TABLE, &req.colCmpr);
|
||||||
_err:
|
_err:
|
||||||
uDebug("create stable return, sql json:%s", string);
|
uDebug("create stable return, sql json:%s", string);
|
||||||
tDecoderClear(&coder);
|
tDecoderClear(&coder);
|
||||||
|
@ -373,8 +392,8 @@ static char* processCreateTable(SMqMetaRsp* metaRsp) {
|
||||||
if (pCreateReq->type == TSDB_CHILD_TABLE) {
|
if (pCreateReq->type == TSDB_CHILD_TABLE) {
|
||||||
string = buildCreateCTableJson(req.pReqs, req.nReqs);
|
string = buildCreateCTableJson(req.pReqs, req.nReqs);
|
||||||
} else if (pCreateReq->type == TSDB_NORMAL_TABLE) {
|
} else if (pCreateReq->type == TSDB_NORMAL_TABLE) {
|
||||||
string =
|
string = buildCreateTableJson(&pCreateReq->ntb.schemaRow, NULL, pCreateReq->name, pCreateReq->uid,
|
||||||
buildCreateTableJson(&pCreateReq->ntb.schemaRow, NULL, pCreateReq->name, pCreateReq->uid, TSDB_NORMAL_TABLE);
|
TSDB_NORMAL_TABLE, &pCreateReq->colCmpr);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -714,8 +733,8 @@ static int32_t taosCreateStb(TAOS* taos, void* meta, int32_t metaLen) {
|
||||||
// build create stable
|
// build create stable
|
||||||
pReq.pColumns = taosArrayInit(req.schemaRow.nCols, sizeof(SFieldWithOptions));
|
pReq.pColumns = taosArrayInit(req.schemaRow.nCols, sizeof(SFieldWithOptions));
|
||||||
for (int32_t i = 0; i < req.schemaRow.nCols; i++) {
|
for (int32_t i = 0; i < req.schemaRow.nCols; i++) {
|
||||||
SSchema* pSchema = req.schemaRow.pSchema + i;
|
SSchema* pSchema = req.schemaRow.pSchema + i;
|
||||||
SFieldWithOptions field = {.type = pSchema->type, .flags = pSchema->flags, .bytes = pSchema->bytes};
|
SFieldWithOptions field = {.type = pSchema->type, .flags = pSchema->flags, .bytes = pSchema->bytes};
|
||||||
strcpy(field.name, pSchema->name);
|
strcpy(field.name, pSchema->name);
|
||||||
// todo get active compress param
|
// todo get active compress param
|
||||||
setDefaultOptionsForField(&field);
|
setDefaultOptionsForField(&field);
|
||||||
|
@ -1346,10 +1365,10 @@ static int32_t taosAlterTable(TAOS* taos, void* meta, int32_t metaLen) {
|
||||||
}
|
}
|
||||||
pVgData->vg = pInfo;
|
pVgData->vg = pInfo;
|
||||||
|
|
||||||
int tlen = 0;
|
int tlen = 0;
|
||||||
req.source = TD_REQ_FROM_TAOX;
|
req.source = TD_REQ_FROM_TAOX;
|
||||||
tEncodeSize(tEncodeSVAlterTbReq, &req, tlen, code);
|
tEncodeSize(tEncodeSVAlterTbReq, &req, tlen, code);
|
||||||
if(code != 0){
|
if (code != 0) {
|
||||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
goto end;
|
goto end;
|
||||||
}
|
}
|
||||||
|
@ -1365,7 +1384,7 @@ static int32_t taosAlterTable(TAOS* taos, void* meta, int32_t metaLen) {
|
||||||
SEncoder coder = {0};
|
SEncoder coder = {0};
|
||||||
tEncoderInit(&coder, pBuf, tlen - sizeof(SMsgHead));
|
tEncoderInit(&coder, pBuf, tlen - sizeof(SMsgHead));
|
||||||
code = tEncodeSVAlterTbReq(&coder, &req);
|
code = tEncodeSVAlterTbReq(&coder, &req);
|
||||||
if(code != 0){
|
if (code != 0) {
|
||||||
tEncoderClear(&coder);
|
tEncoderClear(&coder);
|
||||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
goto end;
|
goto end;
|
||||||
|
@ -1631,7 +1650,7 @@ static int32_t tmqWriteRawDataImpl(TAOS* taos, void* data, int32_t dataLen) {
|
||||||
rspObj.common.resType = RES_TYPE__TMQ;
|
rspObj.common.resType = RES_TYPE__TMQ;
|
||||||
|
|
||||||
int8_t dataVersion = *(int8_t*)data;
|
int8_t dataVersion = *(int8_t*)data;
|
||||||
if (dataVersion >= MQ_DATA_RSP_VERSION){
|
if (dataVersion >= MQ_DATA_RSP_VERSION) {
|
||||||
data = POINTER_SHIFT(data, sizeof(int8_t) + sizeof(int32_t));
|
data = POINTER_SHIFT(data, sizeof(int8_t) + sizeof(int32_t));
|
||||||
dataLen -= sizeof(int8_t) + sizeof(int32_t);
|
dataLen -= sizeof(int8_t) + sizeof(int32_t);
|
||||||
}
|
}
|
||||||
|
@ -1777,7 +1796,7 @@ static int32_t tmqWriteRawMetaDataImpl(TAOS* taos, void* data, int32_t dataLen)
|
||||||
rspObj.common.resType = RES_TYPE__TMQ_METADATA;
|
rspObj.common.resType = RES_TYPE__TMQ_METADATA;
|
||||||
|
|
||||||
int8_t dataVersion = *(int8_t*)data;
|
int8_t dataVersion = *(int8_t*)data;
|
||||||
if (dataVersion >= MQ_DATA_RSP_VERSION){
|
if (dataVersion >= MQ_DATA_RSP_VERSION) {
|
||||||
data = POINTER_SHIFT(data, sizeof(int8_t) + sizeof(int32_t));
|
data = POINTER_SHIFT(data, sizeof(int8_t) + sizeof(int32_t));
|
||||||
dataLen -= sizeof(int8_t) + sizeof(int32_t);
|
dataLen -= sizeof(int8_t) + sizeof(int32_t);
|
||||||
}
|
}
|
||||||
|
@ -1913,7 +1932,7 @@ static int32_t tmqWriteRawMetaDataImpl(TAOS* taos, void* data, int32_t dataLen)
|
||||||
tstrncpy(fields[i].name, pSW->pSchema[i].name, tListLen(pSW->pSchema[i].name));
|
tstrncpy(fields[i].name, pSW->pSchema[i].name, tListLen(pSW->pSchema[i].name));
|
||||||
}
|
}
|
||||||
void* rawData = getRawDataFromRes(pRetrieve);
|
void* rawData = getRawDataFromRes(pRetrieve);
|
||||||
char err[ERR_MSG_LEN] = {0};
|
char err[ERR_MSG_LEN] = {0};
|
||||||
code = rawBlockBindData(pQuery, pTableMeta, rawData, &pCreateReqDst, fields, pSW->nCols, true, err, ERR_MSG_LEN);
|
code = rawBlockBindData(pQuery, pTableMeta, rawData, &pCreateReqDst, fields, pSW->nCols, true, err, ERR_MSG_LEN);
|
||||||
taosMemoryFree(fields);
|
taosMemoryFree(fields);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
@ -1982,9 +2001,9 @@ char* tmq_get_json_meta(TAOS_RES* res) {
|
||||||
|
|
||||||
void tmq_free_json_meta(char* jsonMeta) { taosMemoryFreeClear(jsonMeta); }
|
void tmq_free_json_meta(char* jsonMeta) { taosMemoryFreeClear(jsonMeta); }
|
||||||
|
|
||||||
static int32_t getOffSetLen(const void *rsp){
|
static int32_t getOffSetLen(const void* rsp) {
|
||||||
const SMqDataRspCommon *pRsp = rsp;
|
const SMqDataRspCommon* pRsp = rsp;
|
||||||
SEncoder coder = {0};
|
SEncoder coder = {0};
|
||||||
tEncoderInit(&coder, NULL, 0);
|
tEncoderInit(&coder, NULL, 0);
|
||||||
if (tEncodeSTqOffsetVal(&coder, &pRsp->reqOffset) < 0) return -1;
|
if (tEncodeSTqOffsetVal(&coder, &pRsp->reqOffset) < 0) return -1;
|
||||||
if (tEncodeSTqOffsetVal(&coder, &pRsp->rspOffset) < 0) return -1;
|
if (tEncodeSTqOffsetVal(&coder, &pRsp->rspOffset) < 0) return -1;
|
||||||
|
@ -1993,13 +2012,13 @@ static int32_t getOffSetLen(const void *rsp){
|
||||||
return pos;
|
return pos;
|
||||||
}
|
}
|
||||||
|
|
||||||
typedef int32_t __encode_func__(SEncoder *pEncoder, const void *pRsp);
|
typedef int32_t __encode_func__(SEncoder* pEncoder, const void* pRsp);
|
||||||
|
|
||||||
static int32_t encodeMqDataRsp(__encode_func__* encodeFunc, void* rspObj, tmq_raw_data* raw){
|
static int32_t encodeMqDataRsp(__encode_func__* encodeFunc, void* rspObj, tmq_raw_data* raw) {
|
||||||
int32_t len = 0;
|
int32_t len = 0;
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
SEncoder encoder = {0};
|
SEncoder encoder = {0};
|
||||||
void* buf = NULL;
|
void* buf = NULL;
|
||||||
tEncodeSize(encodeFunc, rspObj, len, code);
|
tEncodeSize(encodeFunc, rspObj, len, code);
|
||||||
if (code < 0) {
|
if (code < 0) {
|
||||||
terrno = TSDB_CODE_INVALID_MSG;
|
terrno = TSDB_CODE_INVALID_MSG;
|
||||||
|
@ -2007,17 +2026,17 @@ static int32_t encodeMqDataRsp(__encode_func__* encodeFunc, void* rspObj, tmq_ra
|
||||||
}
|
}
|
||||||
len += sizeof(int8_t) + sizeof(int32_t);
|
len += sizeof(int8_t) + sizeof(int32_t);
|
||||||
buf = taosMemoryCalloc(1, len);
|
buf = taosMemoryCalloc(1, len);
|
||||||
if(buf == NULL){
|
if (buf == NULL) {
|
||||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
goto FAILED;
|
goto FAILED;
|
||||||
}
|
}
|
||||||
tEncoderInit(&encoder, buf, len);
|
tEncoderInit(&encoder, buf, len);
|
||||||
if (tEncodeI8(&encoder, MQ_DATA_RSP_VERSION) < 0) {
|
if (tEncodeI8(&encoder, MQ_DATA_RSP_VERSION) < 0) {
|
||||||
terrno = TSDB_CODE_INVALID_MSG;
|
terrno = TSDB_CODE_INVALID_MSG;
|
||||||
goto FAILED;
|
goto FAILED;
|
||||||
}
|
}
|
||||||
int32_t offsetLen = getOffSetLen(rspObj);
|
int32_t offsetLen = getOffSetLen(rspObj);
|
||||||
if(offsetLen <= 0){
|
if (offsetLen <= 0) {
|
||||||
terrno = TSDB_CODE_INVALID_MSG;
|
terrno = TSDB_CODE_INVALID_MSG;
|
||||||
goto FAILED;
|
goto FAILED;
|
||||||
}
|
}
|
||||||
|
@ -2025,7 +2044,7 @@ static int32_t encodeMqDataRsp(__encode_func__* encodeFunc, void* rspObj, tmq_ra
|
||||||
terrno = TSDB_CODE_INVALID_MSG;
|
terrno = TSDB_CODE_INVALID_MSG;
|
||||||
goto FAILED;
|
goto FAILED;
|
||||||
}
|
}
|
||||||
if(encodeFunc(&encoder, rspObj) < 0){
|
if (encodeFunc(&encoder, rspObj) < 0) {
|
||||||
terrno = TSDB_CODE_INVALID_MSG;
|
terrno = TSDB_CODE_INVALID_MSG;
|
||||||
goto FAILED;
|
goto FAILED;
|
||||||
}
|
}
|
||||||
|
@ -2053,7 +2072,7 @@ int32_t tmq_get_raw(TAOS_RES* res, tmq_raw_data* raw) {
|
||||||
uDebug("tmq get raw type meta:%p", raw);
|
uDebug("tmq get raw type meta:%p", raw);
|
||||||
} else if (TD_RES_TMQ(res)) {
|
} else if (TD_RES_TMQ(res)) {
|
||||||
SMqRspObj* rspObj = ((SMqRspObj*)res);
|
SMqRspObj* rspObj = ((SMqRspObj*)res);
|
||||||
if(encodeMqDataRsp(tEncodeMqDataRsp, &rspObj->rsp, raw) != 0){
|
if (encodeMqDataRsp(tEncodeMqDataRsp, &rspObj->rsp, raw) != 0) {
|
||||||
uError("tmq get raw type error:%d", terrno);
|
uError("tmq get raw type error:%d", terrno);
|
||||||
return terrno;
|
return terrno;
|
||||||
}
|
}
|
||||||
|
@ -2062,7 +2081,7 @@ int32_t tmq_get_raw(TAOS_RES* res, tmq_raw_data* raw) {
|
||||||
} else if (TD_RES_TMQ_METADATA(res)) {
|
} else if (TD_RES_TMQ_METADATA(res)) {
|
||||||
SMqTaosxRspObj* rspObj = ((SMqTaosxRspObj*)res);
|
SMqTaosxRspObj* rspObj = ((SMqTaosxRspObj*)res);
|
||||||
|
|
||||||
if(encodeMqDataRsp(tEncodeSTaosxRsp, &rspObj->rsp, raw) != 0){
|
if (encodeMqDataRsp(tEncodeSTaosxRsp, &rspObj->rsp, raw) != 0) {
|
||||||
uError("tmq get raw type error:%d", terrno);
|
uError("tmq get raw type error:%d", terrno);
|
||||||
return terrno;
|
return terrno;
|
||||||
}
|
}
|
||||||
|
|
|
@ -274,8 +274,6 @@ int metaCreateSTable(SMeta *pMeta, int64_t version, SVCreateStbReq *pReq) {
|
||||||
me.name = pReq->name;
|
me.name = pReq->name;
|
||||||
me.stbEntry.schemaRow = pReq->schemaRow;
|
me.stbEntry.schemaRow = pReq->schemaRow;
|
||||||
me.stbEntry.schemaTag = pReq->schemaTag;
|
me.stbEntry.schemaTag = pReq->schemaTag;
|
||||||
// me.stbEntry.colCmpr = pReq->colCmpr;
|
|
||||||
// me.stbEntry.colCmpr = pReq->
|
|
||||||
if (pReq->rollup) {
|
if (pReq->rollup) {
|
||||||
TABLE_SET_ROLLUP(me.flags);
|
TABLE_SET_ROLLUP(me.flags);
|
||||||
me.stbEntry.rsmaParam = pReq->rsmaParam;
|
me.stbEntry.rsmaParam = pReq->rsmaParam;
|
||||||
|
|
Loading…
Reference in New Issue