fix:add logic for auto create table in taosX
This commit is contained in:
parent
2f905064e5
commit
a3161bf9c1
|
@ -254,7 +254,7 @@ enum tmq_res_t {
|
|||
TMQ_RES_INVALID = -1,
|
||||
TMQ_RES_DATA = 1,
|
||||
TMQ_RES_TABLE_META = 2,
|
||||
TMQ_RES_TAOSX = 3,
|
||||
TMQ_RES_METADATA = 3,
|
||||
};
|
||||
|
||||
typedef struct tmq_raw_data {
|
||||
|
|
|
@ -52,7 +52,7 @@ enum {
|
|||
RES_TYPE__QUERY = 1,
|
||||
RES_TYPE__TMQ,
|
||||
RES_TYPE__TMQ_META,
|
||||
RES_TYPE__TAOSX,
|
||||
RES_TYPE__TMQ_METADATA,
|
||||
};
|
||||
|
||||
#define SHOW_VARIABLES_RESULT_COLS 2
|
||||
|
@ -60,9 +60,9 @@ enum {
|
|||
#define SHOW_VARIABLES_RESULT_FIELD2_LEN (TSDB_CONFIG_VALUE_LEN + VARSTR_HEADER_SIZE)
|
||||
|
||||
#define TD_RES_QUERY(res) (*(int8_t*)res == RES_TYPE__QUERY)
|
||||
#define TD_RES_TMQ(res) (*(int8_t*)res == RES_TYPE__TMQ || *(int8_t*)res == RES_TYPE__TAOSX)
|
||||
#define TD_RES_TMQ(res) (*(int8_t*)res == RES_TYPE__TMQ)
|
||||
#define TD_RES_TMQ_META(res) (*(int8_t*)res == RES_TYPE__TMQ_META)
|
||||
#define TD_RES_TMQ_TAOSX(res) (*(int8_t*)res == RES_TYPE__TAOSX)
|
||||
#define TD_RES_TMQ_METADATA(res) (*(int8_t*)res == RES_TYPE__TMQ_METADATA)
|
||||
|
||||
typedef struct SAppInstInfo SAppInstInfo;
|
||||
|
||||
|
|
|
@ -184,7 +184,7 @@ void taos_free_result(TAOS_RES *res) {
|
|||
SRequestObj *pRequest = (SRequestObj *)res;
|
||||
tscDebug("0x%" PRIx64 " taos_free_result start to free query", pRequest->requestId);
|
||||
destroyRequest(pRequest);
|
||||
} else if (TD_RES_TMQ_TAOSX(res)) {
|
||||
} else if (TD_RES_TMQ_METADATA(res)) {
|
||||
SMqTaosxRspObj *pRsp = (SMqTaosxRspObj *)res;
|
||||
if (pRsp->rsp.blockData) taosArrayDestroyP(pRsp->rsp.blockData, taosMemoryFree);
|
||||
if (pRsp->rsp.blockDataLen) taosArrayDestroy(pRsp->rsp.blockDataLen);
|
||||
|
|
|
@ -25,12 +25,11 @@
|
|||
#include "tref.h"
|
||||
#include "ttimer.h"
|
||||
|
||||
static char* buildCreateTableJson(SSchemaWrapper* schemaRow, SSchemaWrapper* schemaTag, char* name, int64_t id,
|
||||
int8_t t) {
|
||||
char* string = NULL;
|
||||
static void buildCreateTableJson(SSchemaWrapper* schemaRow, SSchemaWrapper* schemaTag, char* name, int64_t id,
|
||||
int8_t t, cJSON* tables) {
|
||||
cJSON* json = cJSON_CreateObject();
|
||||
if (json == NULL) {
|
||||
return string;
|
||||
return;
|
||||
}
|
||||
cJSON* type = cJSON_CreateString("create");
|
||||
cJSON_AddItemToObject(json, "type", type);
|
||||
|
@ -87,10 +86,7 @@ static char* buildCreateTableJson(SSchemaWrapper* schemaRow, SSchemaWrapper* sch
|
|||
cJSON_AddItemToArray(tags, tag);
|
||||
}
|
||||
cJSON_AddItemToObject(json, "tags", tags);
|
||||
|
||||
string = cJSON_PrintUnformatted(json);
|
||||
cJSON_Delete(json);
|
||||
return string;
|
||||
cJSON_AddItemToArray(tables, json);
|
||||
}
|
||||
|
||||
static char* buildAlterSTableJson(void* alterData, int32_t alterDataLen) {
|
||||
|
@ -189,6 +185,7 @@ static char* processCreateStb(SMqMetaRsp* metaRsp) {
|
|||
SVCreateStbReq req = {0};
|
||||
SDecoder coder;
|
||||
char* string = NULL;
|
||||
cJSON* tables = cJSON_CreateArray();
|
||||
|
||||
// decode and process req
|
||||
void* data = POINTER_SHIFT(metaRsp->metaRsp, sizeof(SMsgHead));
|
||||
|
@ -198,11 +195,11 @@ static char* processCreateStb(SMqMetaRsp* metaRsp) {
|
|||
if (tDecodeSVCreateStbReq(&coder, &req) < 0) {
|
||||
goto _err;
|
||||
}
|
||||
string = buildCreateTableJson(&req.schemaRow, &req.schemaTag, req.name, req.suid, TSDB_SUPER_TABLE);
|
||||
tDecoderClear(&coder);
|
||||
return string;
|
||||
buildCreateTableJson(&req.schemaRow, &req.schemaTag, req.name, req.suid, TSDB_SUPER_TABLE, tables);
|
||||
string = cJSON_PrintUnformatted(tables);
|
||||
|
||||
_err:
|
||||
cJSON_Delete(tables);
|
||||
tDecoderClear(&coder);
|
||||
return string;
|
||||
}
|
||||
|
@ -229,12 +226,11 @@ _err:
|
|||
return string;
|
||||
}
|
||||
|
||||
static char* buildCreateCTableJson(STag* pTag, char* sname, char* name, SArray* tagName, int64_t id, uint8_t tagNum) {
|
||||
char* string = NULL;
|
||||
static void buildCreateCTableJson(STag* pTag, char* sname, char* name, SArray* tagName, int64_t id, uint8_t tagNum, cJSON* tables) {
|
||||
SArray* pTagVals = NULL;
|
||||
cJSON* json = cJSON_CreateObject();
|
||||
if (json == NULL) {
|
||||
return string;
|
||||
return;
|
||||
}
|
||||
cJSON* type = cJSON_CreateString("create");
|
||||
cJSON_AddItemToObject(json, "type", type);
|
||||
|
@ -315,10 +311,8 @@ static char* buildCreateCTableJson(STag* pTag, char* sname, char* name, SArray*
|
|||
|
||||
end:
|
||||
cJSON_AddItemToObject(json, "tags", tags);
|
||||
string = cJSON_PrintUnformatted(json);
|
||||
cJSON_Delete(json);
|
||||
taosArrayDestroy(pTagVals);
|
||||
return string;
|
||||
cJSON_AddItemToArray(tables, json);
|
||||
}
|
||||
|
||||
static char* processCreateTable(SMqMetaRsp* metaRsp) {
|
||||
|
@ -335,24 +329,57 @@ static char* processCreateTable(SMqMetaRsp* metaRsp) {
|
|||
}
|
||||
|
||||
// loop to create table
|
||||
cJSON* tables = cJSON_CreateArray();
|
||||
for (int32_t iReq = 0; iReq < req.nReqs; iReq++) {
|
||||
pCreateReq = req.pReqs + iReq;
|
||||
if (pCreateReq->type == TSDB_CHILD_TABLE) {
|
||||
string = buildCreateCTableJson((STag*)pCreateReq->ctb.pTag, pCreateReq->ctb.name, pCreateReq->name,
|
||||
pCreateReq->ctb.tagName, pCreateReq->uid, pCreateReq->ctb.tagNum);
|
||||
buildCreateCTableJson((STag*)pCreateReq->ctb.pTag, pCreateReq->ctb.name, pCreateReq->name,
|
||||
pCreateReq->ctb.tagName, pCreateReq->uid, pCreateReq->ctb.tagNum, tables);
|
||||
} else if (pCreateReq->type == TSDB_NORMAL_TABLE) {
|
||||
string =
|
||||
buildCreateTableJson(&pCreateReq->ntb.schemaRow, NULL, pCreateReq->name, pCreateReq->uid, TSDB_NORMAL_TABLE);
|
||||
buildCreateTableJson(&pCreateReq->ntb.schemaRow, NULL, pCreateReq->name, pCreateReq->uid, TSDB_NORMAL_TABLE, tables);
|
||||
}
|
||||
}
|
||||
|
||||
tDecoderClear(&decoder);
|
||||
|
||||
string = cJSON_PrintUnformatted(tables);
|
||||
cJSON_Delete(tables);
|
||||
_exit:
|
||||
tDecoderClear(&decoder);
|
||||
return string;
|
||||
}
|
||||
|
||||
static char* processAutoCreateTable(STaosxRsp* rsp) {
|
||||
SDecoder decoder = {0};
|
||||
SVCreateTbReq* pCreateReq;
|
||||
char* string = NULL;
|
||||
|
||||
|
||||
// loop to create table
|
||||
cJSON* tables = cJSON_CreateArray();
|
||||
for (int32_t iReq = 0; iReq < rsp->createTableNum; iReq++) {
|
||||
// decode
|
||||
void** data = taosArrayGet(rsp->createTableReq, iReq);
|
||||
int32_t *len = taosArrayGet(rsp->createTableLen, iReq);
|
||||
tDecoderInit(&decoder, *data, *len);
|
||||
if (tDecodeSVCreateTbReq(&decoder, pCreateReq) < 0) {
|
||||
tDecoderClear(&decoder);
|
||||
goto _exit;
|
||||
}
|
||||
|
||||
if (pCreateReq->type == TSDB_CHILD_TABLE) {
|
||||
buildCreateCTableJson((STag*)pCreateReq->ctb.pTag, pCreateReq->ctb.name, pCreateReq->name,
|
||||
pCreateReq->ctb.tagName, pCreateReq->uid, pCreateReq->ctb.tagNum, tables);
|
||||
} else if (pCreateReq->type == TSDB_NORMAL_TABLE) {
|
||||
buildCreateTableJson(&pCreateReq->ntb.schemaRow, NULL, pCreateReq->name, pCreateReq->uid, TSDB_NORMAL_TABLE, tables);
|
||||
}
|
||||
tDecoderClear(&decoder);
|
||||
}
|
||||
|
||||
string = cJSON_PrintUnformatted(tables);
|
||||
_exit:
|
||||
cJSON_Delete(tables);
|
||||
return string;
|
||||
}
|
||||
|
||||
static char* processAlterTable(SMqMetaRsp* metaRsp) {
|
||||
SDecoder decoder = {0};
|
||||
SVAlterTbReq vAlterTbReq = {0};
|
||||
|
@ -1586,11 +1613,307 @@ end:
|
|||
return code;
|
||||
}
|
||||
|
||||
static int32_t tmqWriteRaqMetaDataImpl(TAOS* taos, void* data, int32_t dataLen) {
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
SHashObj* pVgHash = NULL;
|
||||
SQuery* pQuery = NULL;
|
||||
SMqTaosxRspObj rspObj = {0};
|
||||
SDecoder decoder = {0};
|
||||
STableMeta* pTableMeta = NULL;
|
||||
|
||||
terrno = TSDB_CODE_SUCCESS;
|
||||
SRequestObj* pRequest = (SRequestObj*)createRequest(*(int64_t*)taos, TSDB_SQL_INSERT);
|
||||
if (!pRequest) {
|
||||
uError("WriteRaw:createRequest error request is null");
|
||||
return terrno;
|
||||
}
|
||||
|
||||
rspObj.resIter = -1;
|
||||
rspObj.resType = RES_TYPE__TMQ_METADATA;
|
||||
|
||||
tDecoderInit(&decoder, data, dataLen);
|
||||
code = tDecodeSTaosxRsp(&decoder, &rspObj.rsp);
|
||||
if (code != 0) {
|
||||
uError("WriteRaw:decode smqDataRsp error");
|
||||
code = TSDB_CODE_INVALID_MSG;
|
||||
goto end;
|
||||
}
|
||||
|
||||
if (!pRequest->pDb) {
|
||||
uError("WriteRaw:not use db");
|
||||
code = TSDB_CODE_PAR_DB_NOT_SPECIFIED;
|
||||
goto end;
|
||||
}
|
||||
|
||||
pVgHash = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_NO_LOCK);
|
||||
taosHashSetFreeFp(pVgHash, destroyVgHash);
|
||||
struct SCatalog* pCatalog = NULL;
|
||||
code = catalogGetHandle(pRequest->pTscObj->pAppInfo->clusterId, &pCatalog);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
uError("WriteRaw: get gatlog error");
|
||||
goto end;
|
||||
}
|
||||
|
||||
SRequestConnInfo conn = {0};
|
||||
conn.pTrans = pRequest->pTscObj->pAppInfo->pTransporter;
|
||||
conn.requestId = pRequest->requestId;
|
||||
conn.requestObjRefId = pRequest->self;
|
||||
conn.mgmtEps = getEpSet_s(&pRequest->pTscObj->pAppInfo->mgmtEp);
|
||||
|
||||
printf("raw data block num:%d\n", rspObj.rsp.blockNum);
|
||||
while (++rspObj.resIter < rspObj.rsp.blockNum) {
|
||||
SRetrieveTableRsp* pRetrieve = (SRetrieveTableRsp*)taosArrayGetP(rspObj.rsp.blockData, rspObj.resIter);
|
||||
if (!rspObj.rsp.withSchema) {
|
||||
uError("WriteRaw:no schema, iter:%d", rspObj.resIter);
|
||||
goto end;
|
||||
}
|
||||
SSchemaWrapper* pSW = (SSchemaWrapper*)taosArrayGetP(rspObj.rsp.blockSchema, rspObj.resIter);
|
||||
setResSchemaInfo(&rspObj.resInfo, pSW->pSchema, pSW->nCols);
|
||||
|
||||
code = setQueryResultFromRsp(&rspObj.resInfo, pRetrieve, false, false);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
uError("WriteRaw: setQueryResultFromRsp error");
|
||||
goto end;
|
||||
}
|
||||
|
||||
const char* tbName = (const char*)taosArrayGetP(rspObj.rsp.blockTbName, rspObj.resIter);
|
||||
if (!tbName) {
|
||||
uError("WriteRaw: tbname is null");
|
||||
code = TSDB_CODE_TMQ_INVALID_MSG;
|
||||
goto end;
|
||||
}
|
||||
|
||||
printf("raw data tbname:%s\n", tbName);
|
||||
SName pName = {TSDB_TABLE_NAME_T, pRequest->pTscObj->acctId, {0}, {0}};
|
||||
strcpy(pName.dbname, pRequest->pDb);
|
||||
strcpy(pName.tname, tbName);
|
||||
|
||||
VgData vgData = {0};
|
||||
code = catalogGetTableHashVgroup(pCatalog, &conn, &pName, &(vgData.vg));
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
uError("WriteRaw:catalogGetTableHashVgroup failed. table name: %s", tbName);
|
||||
goto end;
|
||||
}
|
||||
|
||||
code = catalogGetTableMeta(pCatalog, &conn, &pName, &pTableMeta);
|
||||
if (code == TSDB_CODE_PAR_TABLE_NOT_EXIST) {
|
||||
uError("WriteRaw:catalogGetTableMeta table not exist. table name: %s", tbName);
|
||||
code = TSDB_CODE_SUCCESS;
|
||||
continue;
|
||||
}
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
uError("WriteRaw:catalogGetTableMeta failed. table name: %s", tbName);
|
||||
goto end;
|
||||
}
|
||||
|
||||
uint16_t fLen = 0;
|
||||
int32_t rowSize = 0;
|
||||
int16_t nVar = 0;
|
||||
for (int i = 0; i < pTableMeta->tableInfo.numOfColumns; i++) {
|
||||
SSchema* schema = &pTableMeta->schema[i];
|
||||
fLen += TYPE_BYTES[schema->type];
|
||||
rowSize += schema->bytes;
|
||||
if (IS_VAR_DATA_TYPE(schema->type)) {
|
||||
nVar++;
|
||||
}
|
||||
}
|
||||
|
||||
int32_t rows = rspObj.resInfo.numOfRows;
|
||||
int32_t extendedRowSize = rowSize + TD_ROW_HEAD_LEN - sizeof(TSKEY) + nVar * sizeof(VarDataOffsetT) +
|
||||
(int32_t)TD_BITMAP_BYTES(pTableMeta->tableInfo.numOfColumns - 1);
|
||||
|
||||
// find schema data info
|
||||
int32_t schemaLen = 0;
|
||||
void* schemaData = NULL;
|
||||
for(int j = 0; j < rspObj.rsp.createTableNum; j++){
|
||||
void** dataTmp = taosArrayGet(rspObj.rsp.createTableReq, j);
|
||||
int32_t* lenTmp = taosArrayGet(rspObj.rsp.createTableLen, j);
|
||||
|
||||
SDecoder decoderTmp = {0};
|
||||
SVCreateTbReq* pCreateReq;
|
||||
|
||||
tDecoderInit(&decoderTmp, *dataTmp, *lenTmp);
|
||||
if (tDecodeSVCreateTbReq(&decoderTmp, pCreateReq) < 0) {
|
||||
tDecoderClear(&decoderTmp);
|
||||
goto end;
|
||||
}
|
||||
|
||||
ASSERT (pCreateReq->type == TSDB_CHILD_TABLE);
|
||||
if(strcmp(tbName, pCreateReq->name) == 0){
|
||||
schemaLen = *lenTmp;
|
||||
schemaData = *dataTmp;
|
||||
tDecoderClear(&decoderTmp);
|
||||
break;
|
||||
}
|
||||
tDecoderClear(&decoderTmp);
|
||||
}
|
||||
|
||||
int32_t submitLen = sizeof(SSubmitBlk) + schemaLen + rows * extendedRowSize;
|
||||
|
||||
SSubmitReq* subReq = NULL;
|
||||
SSubmitBlk* blk = NULL;
|
||||
void* hData = taosHashGet(pVgHash, &vgData.vg.vgId, sizeof(vgData.vg.vgId));
|
||||
if (hData) {
|
||||
vgData = *(VgData*)hData;
|
||||
|
||||
int32_t totalLen = ((SSubmitReq*)(vgData.data))->length + submitLen;
|
||||
void* tmp = taosMemoryRealloc(vgData.data, totalLen);
|
||||
if (tmp == NULL) {
|
||||
code = TSDB_CODE_TSC_OUT_OF_MEMORY;
|
||||
goto end;
|
||||
}
|
||||
vgData.data = tmp;
|
||||
((VgData*)hData)->data = tmp;
|
||||
subReq = (SSubmitReq*)(vgData.data);
|
||||
blk = POINTER_SHIFT(vgData.data, subReq->length);
|
||||
} else {
|
||||
int32_t totalLen = sizeof(SSubmitReq) + submitLen;
|
||||
void* tmp = taosMemoryCalloc(1, totalLen);
|
||||
if (tmp == NULL) {
|
||||
code = TSDB_CODE_TSC_OUT_OF_MEMORY;
|
||||
goto end;
|
||||
}
|
||||
vgData.data = tmp;
|
||||
taosHashPut(pVgHash, (const char*)&vgData.vg.vgId, sizeof(vgData.vg.vgId), (char*)&vgData, sizeof(vgData));
|
||||
subReq = (SSubmitReq*)(vgData.data);
|
||||
subReq->length = sizeof(SSubmitReq);
|
||||
subReq->numOfBlocks = 0;
|
||||
|
||||
blk = POINTER_SHIFT(vgData.data, sizeof(SSubmitReq));
|
||||
}
|
||||
|
||||
// pSW->pSchema should be same as pTableMeta->schema
|
||||
// ASSERT(pSW->nCols == pTableMeta->tableInfo.numOfColumns);
|
||||
uint64_t suid = (TSDB_NORMAL_TABLE == pTableMeta->tableType ? 0 : pTableMeta->suid);
|
||||
uint64_t uid = pTableMeta->uid;
|
||||
int16_t sver = pTableMeta->sversion;
|
||||
|
||||
void* blkSchema = POINTER_SHIFT(blk, sizeof(SSubmitBlk));
|
||||
if(schemaData){
|
||||
memcpy(blkSchema, schemaData, schemaLen);
|
||||
}
|
||||
STSRow* rowData = POINTER_SHIFT(blkSchema, schemaLen);
|
||||
|
||||
SRowBuilder rb = {0};
|
||||
tdSRowInit(&rb, sver);
|
||||
tdSRowSetTpInfo(&rb, pTableMeta->tableInfo.numOfColumns, fLen);
|
||||
int32_t totalLen = 0;
|
||||
|
||||
SHashObj* schemaHash = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK);
|
||||
for (int i = 0; i < pSW->nCols; i++) {
|
||||
SSchema* schema = &pSW->pSchema[i];
|
||||
taosHashPut(schemaHash, schema->name, strlen(schema->name), &i, sizeof(int32_t));
|
||||
}
|
||||
|
||||
for (int32_t j = 0; j < rows; j++) {
|
||||
tdSRowResetBuf(&rb, rowData);
|
||||
|
||||
doSetOneRowPtr(&rspObj.resInfo);
|
||||
rspObj.resInfo.current += 1;
|
||||
|
||||
int32_t offset = 0;
|
||||
for (int32_t k = 0; k < pTableMeta->tableInfo.numOfColumns; k++) {
|
||||
const SSchema* pColumn = &pTableMeta->schema[k];
|
||||
int32_t* index = taosHashGet(schemaHash, pColumn->name, strlen(pColumn->name));
|
||||
if (!index) {
|
||||
tdAppendColValToRow(&rb, pColumn->colId, pColumn->type, TD_VTYPE_NULL, NULL, false, offset, k);
|
||||
} else {
|
||||
char* colData = rspObj.resInfo.row[*index];
|
||||
if (!colData) {
|
||||
tdAppendColValToRow(&rb, pColumn->colId, pColumn->type, TD_VTYPE_NULL, NULL, false, offset, k);
|
||||
} else {
|
||||
if (IS_VAR_DATA_TYPE(pColumn->type)) {
|
||||
colData -= VARSTR_HEADER_SIZE;
|
||||
}
|
||||
tdAppendColValToRow(&rb, pColumn->colId, pColumn->type, TD_VTYPE_NORM, colData, true, offset, k);
|
||||
}
|
||||
}
|
||||
|
||||
offset += TYPE_BYTES[pColumn->type];
|
||||
}
|
||||
tdSRowEnd(&rb);
|
||||
int32_t rowLen = TD_ROW_LEN(rowData);
|
||||
rowData = POINTER_SHIFT(rowData, rowLen);
|
||||
totalLen += rowLen;
|
||||
}
|
||||
|
||||
taosHashCleanup(schemaHash);
|
||||
blk->uid = htobe64(uid);
|
||||
blk->suid = htobe64(suid);
|
||||
blk->sversion = htonl(sver);
|
||||
blk->schemaLen = htonl(schemaLen);
|
||||
blk->numOfRows = htonl(rows);
|
||||
blk->dataLen = htonl(totalLen);
|
||||
subReq->length += sizeof(SSubmitBlk) + schemaLen + totalLen;
|
||||
subReq->numOfBlocks++;
|
||||
taosMemoryFreeClear(pTableMeta);
|
||||
}
|
||||
|
||||
pQuery = (SQuery*)nodesMakeNode(QUERY_NODE_QUERY);
|
||||
if (NULL == pQuery) {
|
||||
uError("create SQuery error");
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
goto end;
|
||||
}
|
||||
pQuery->execMode = QUERY_EXEC_MODE_SCHEDULE;
|
||||
pQuery->haveResultSet = false;
|
||||
pQuery->msgType = TDMT_VND_SUBMIT;
|
||||
pQuery->pRoot = (SNode*)nodesMakeNode(QUERY_NODE_VNODE_MODIF_STMT);
|
||||
if (NULL == pQuery->pRoot) {
|
||||
uError("create pQuery->pRoot error");
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
goto end;
|
||||
}
|
||||
SVnodeModifOpStmt* nodeStmt = (SVnodeModifOpStmt*)(pQuery->pRoot);
|
||||
nodeStmt->payloadType = PAYLOAD_TYPE_KV;
|
||||
|
||||
int32_t numOfVg = taosHashGetSize(pVgHash);
|
||||
nodeStmt->pDataBlocks = taosArrayInit(numOfVg, POINTER_BYTES);
|
||||
|
||||
VgData* vData = (VgData*)taosHashIterate(pVgHash, NULL);
|
||||
while (vData) {
|
||||
SVgDataBlocks* dst = taosMemoryCalloc(1, sizeof(SVgDataBlocks));
|
||||
if (NULL == dst) {
|
||||
code = TSDB_CODE_TSC_OUT_OF_MEMORY;
|
||||
goto end;
|
||||
}
|
||||
dst->vg = vData->vg;
|
||||
SSubmitReq* subReq = (SSubmitReq*)(vData->data);
|
||||
dst->numOfTables = subReq->numOfBlocks;
|
||||
dst->size = subReq->length;
|
||||
dst->pData = (char*)subReq;
|
||||
vData->data = NULL; // no need free
|
||||
subReq->header.vgId = htonl(dst->vg.vgId);
|
||||
subReq->version = htonl(1);
|
||||
subReq->header.contLen = htonl(subReq->length);
|
||||
subReq->length = htonl(subReq->length);
|
||||
subReq->numOfBlocks = htonl(subReq->numOfBlocks);
|
||||
taosArrayPush(nodeStmt->pDataBlocks, &dst);
|
||||
vData = (VgData*)taosHashIterate(pVgHash, vData);
|
||||
}
|
||||
|
||||
launchQueryImpl(pRequest, pQuery, true, NULL);
|
||||
code = pRequest->code;
|
||||
|
||||
end:
|
||||
tDecoderClear(&decoder);
|
||||
qDestroyQuery(pQuery);
|
||||
destroyRequest(pRequest);
|
||||
taosHashCleanup(pVgHash);
|
||||
taosMemoryFreeClear(pTableMeta);
|
||||
return code;
|
||||
}
|
||||
|
||||
char* tmq_get_json_meta(TAOS_RES* res) {
|
||||
if (!TD_RES_TMQ_META(res)) {
|
||||
if (!TD_RES_TMQ_META(res) && !TD_RES_TMQ_METADATA(res)) {
|
||||
return NULL;
|
||||
}
|
||||
|
||||
if(TD_RES_TMQ_METADATA(res)){
|
||||
SMqTaosxRspObj* pMetaDataRspObj = (SMqTaosxRspObj*)res;
|
||||
return processAutoCreateTable(&pMetaDataRspObj->rsp);
|
||||
}
|
||||
|
||||
SMqMetaRspObj* pMetaRspObj = (SMqMetaRspObj*)res;
|
||||
if (pMetaRspObj->metaRsp.resMsgType == TDMT_VND_CREATE_STB) {
|
||||
return processCreateStb(&pMetaRspObj->metaRsp);
|
||||
|
@ -1638,6 +1961,25 @@ int32_t tmq_get_raw(TAOS_RES* res, tmq_raw_data* raw) {
|
|||
raw->raw = buf;
|
||||
raw->raw_len = len;
|
||||
raw->raw_type = RES_TYPE__TMQ;
|
||||
} else if (TD_RES_TMQ_METADATA(res)) {
|
||||
SMqTaosxRspObj* rspObj = ((SMqTaosxRspObj*)res);
|
||||
|
||||
int32_t len = 0;
|
||||
int32_t code = 0;
|
||||
tEncodeSize(tEncodeSTaosxRsp, &rspObj->rsp, len, code);
|
||||
if (code < 0) {
|
||||
return -1;
|
||||
}
|
||||
|
||||
void* buf = taosMemoryCalloc(1, len);
|
||||
SEncoder encoder = {0};
|
||||
tEncoderInit(&encoder, buf, len);
|
||||
tEncodeSTaosxRsp(&encoder, &rspObj->rsp);
|
||||
tEncoderClear(&encoder);
|
||||
|
||||
raw->raw = buf;
|
||||
raw->raw_len = len;
|
||||
raw->raw_type = RES_TYPE__TMQ_METADATA;
|
||||
} else {
|
||||
return TSDB_CODE_TMQ_INVALID_MSG;
|
||||
}
|
||||
|
@ -1671,6 +2013,8 @@ int32_t tmq_write_raw(TAOS* taos, tmq_raw_data raw) {
|
|||
return taosDeleteData(taos, raw.raw, raw.raw_len);
|
||||
} else if (raw.raw_type == RES_TYPE__TMQ) {
|
||||
return tmqWriteRawDataImpl(taos, raw.raw, raw.raw_len);
|
||||
} else if (raw.raw_type == RES_TYPE__TMQ_METADATA) {
|
||||
return tmqWriteRaqMetaDataImpl(taos, raw.raw, raw.raw_len);
|
||||
}
|
||||
return TSDB_CODE_INVALID_PARA;
|
||||
}
|
||||
|
|
|
@ -1465,7 +1465,7 @@ SMqRspObj* tmqBuildRspFromWrapper(SMqPollRspWrapper* pWrapper) {
|
|||
|
||||
SMqTaosxRspObj* tmqBuildTaosxRspFromWrapper(SMqPollRspWrapper* pWrapper) {
|
||||
SMqTaosxRspObj* pRspObj = taosMemoryCalloc(1, sizeof(SMqTaosxRspObj));
|
||||
pRspObj->resType = RES_TYPE__TAOSX;
|
||||
pRspObj->resType = RES_TYPE__TMQ_METADATA;
|
||||
tstrncpy(pRspObj->topic, pWrapper->topicHandle->topicName, TSDB_TOPIC_FNAME_LEN);
|
||||
tstrncpy(pRspObj->db, pWrapper->topicHandle->db, TSDB_DB_FNAME_LEN);
|
||||
pRspObj->vgId = pWrapper->vgHandle->vgId;
|
||||
|
@ -1649,7 +1649,7 @@ void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) {
|
|||
continue;
|
||||
}
|
||||
// build rsp
|
||||
SMqRspObj* pRsp = tmqBuildRspFromWrapper(pollRspWrapper);
|
||||
SMqTaosxRspObj* pRsp = tmqBuildTaosxRspFromWrapper(pollRspWrapper);
|
||||
taosFreeQitem(pollRspWrapper);
|
||||
return pRsp;
|
||||
} else {
|
||||
|
@ -1769,11 +1769,11 @@ tmq_res_t tmq_get_res_type(TAOS_RES* res) {
|
|||
} else if (TD_RES_TMQ_META(res)) {
|
||||
SMqMetaRspObj* pMetaRspObj = (SMqMetaRspObj*)res;
|
||||
if (pMetaRspObj->metaRsp.resMsgType == TDMT_VND_DELETE) {
|
||||
return TMQ_RES_TAOSX;
|
||||
return TMQ_RES_DATA;
|
||||
}
|
||||
return TMQ_RES_TABLE_META;
|
||||
} else if (TD_RES_TMQ_TAOSX(res)) {
|
||||
return TMQ_RES_DATA;
|
||||
} else if (TD_RES_TMQ_METADATA(res)) {
|
||||
return TMQ_RES_METADATA;
|
||||
} else {
|
||||
return TMQ_RES_INVALID;
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue