Merge branch '3.0' into feature/TD-18581-D
This commit is contained in:
commit
b5caac4d6a
|
@ -254,7 +254,7 @@ enum tmq_res_t {
|
|||
TMQ_RES_INVALID = -1,
|
||||
TMQ_RES_DATA = 1,
|
||||
TMQ_RES_TABLE_META = 2,
|
||||
TMQ_RES_TAOSX = 3,
|
||||
TMQ_RES_METADATA = 3,
|
||||
};
|
||||
|
||||
typedef struct tmq_raw_data {
|
||||
|
|
|
@ -52,7 +52,7 @@ enum {
|
|||
RES_TYPE__QUERY = 1,
|
||||
RES_TYPE__TMQ,
|
||||
RES_TYPE__TMQ_META,
|
||||
RES_TYPE__TAOSX,
|
||||
RES_TYPE__TMQ_METADATA,
|
||||
};
|
||||
|
||||
#define SHOW_VARIABLES_RESULT_COLS 2
|
||||
|
@ -60,9 +60,9 @@ enum {
|
|||
#define SHOW_VARIABLES_RESULT_FIELD2_LEN (TSDB_CONFIG_VALUE_LEN + VARSTR_HEADER_SIZE)
|
||||
|
||||
#define TD_RES_QUERY(res) (*(int8_t*)res == RES_TYPE__QUERY)
|
||||
#define TD_RES_TMQ(res) (*(int8_t*)res == RES_TYPE__TMQ || *(int8_t*)res == RES_TYPE__TAOSX)
|
||||
#define TD_RES_TMQ(res) (*(int8_t*)res == RES_TYPE__TMQ)
|
||||
#define TD_RES_TMQ_META(res) (*(int8_t*)res == RES_TYPE__TMQ_META)
|
||||
#define TD_RES_TMQ_TAOSX(res) (*(int8_t*)res == RES_TYPE__TAOSX)
|
||||
#define TD_RES_TMQ_METADATA(res) (*(int8_t*)res == RES_TYPE__TMQ_METADATA)
|
||||
|
||||
typedef struct SAppInstInfo SAppInstInfo;
|
||||
|
||||
|
|
|
@ -189,6 +189,7 @@ int32_t buildRequest(uint64_t connId, const char* sql, int sqlLen, void* param,
|
|||
tscError("%d failed to add to request container, reqId:0x%" PRIx64 ", conn:%d, %s", (*pRequest)->self,
|
||||
(*pRequest)->requestId, pTscObj->id, sql);
|
||||
|
||||
taosMemoryFree(param);
|
||||
destroyRequest(*pRequest);
|
||||
*pRequest = NULL;
|
||||
return TSDB_CODE_TSC_OUT_OF_MEMORY;
|
||||
|
|
|
@ -148,7 +148,7 @@ int taos_errno(TAOS_RES *res) {
|
|||
return terrno;
|
||||
}
|
||||
|
||||
if (TD_RES_TMQ(res)) {
|
||||
if (TD_RES_TMQ(res) || TD_RES_TMQ_METADATA(res)) {
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
@ -162,7 +162,7 @@ const char *taos_errstr(TAOS_RES *res) {
|
|||
return (const char *)tstrerror(terrno);
|
||||
}
|
||||
|
||||
if (TD_RES_TMQ(res)) {
|
||||
if (TD_RES_TMQ(res) || TD_RES_TMQ_METADATA(res)) {
|
||||
return "success";
|
||||
}
|
||||
|
||||
|
@ -184,7 +184,7 @@ void taos_free_result(TAOS_RES *res) {
|
|||
SRequestObj *pRequest = (SRequestObj *)res;
|
||||
tscDebug("0x%" PRIx64 " taos_free_result start to free query", pRequest->requestId);
|
||||
destroyRequest(pRequest);
|
||||
} else if (TD_RES_TMQ_TAOSX(res)) {
|
||||
} else if (TD_RES_TMQ_METADATA(res)) {
|
||||
SMqTaosxRspObj *pRsp = (SMqTaosxRspObj *)res;
|
||||
if (pRsp->rsp.blockData) taosArrayDestroyP(pRsp->rsp.blockData, taosMemoryFree);
|
||||
if (pRsp->rsp.blockDataLen) taosArrayDestroy(pRsp->rsp.blockDataLen);
|
||||
|
@ -264,7 +264,7 @@ TAOS_ROW taos_fetch_row(TAOS_RES *res) {
|
|||
return doFetchRows(pRequest, true, true);
|
||||
#endif
|
||||
|
||||
} else if (TD_RES_TMQ(res)) {
|
||||
} else if (TD_RES_TMQ(res) || TD_RES_TMQ_METADATA(res)) {
|
||||
SMqRspObj *msg = ((SMqRspObj *)res);
|
||||
SReqResultInfo *pResultInfo;
|
||||
if (msg->resIter == -1) {
|
||||
|
@ -437,7 +437,7 @@ const char *taos_data_type(int type) {
|
|||
const char *taos_get_client_info() { return version; }
|
||||
|
||||
int taos_affected_rows(TAOS_RES *res) {
|
||||
if (res == NULL || TD_RES_TMQ(res) || TD_RES_TMQ_META(res)) {
|
||||
if (res == NULL || TD_RES_TMQ(res) || TD_RES_TMQ_META(res) || TD_RES_TMQ_METADATA(res)) {
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
@ -454,7 +454,7 @@ int taos_result_precision(TAOS_RES *res) {
|
|||
if (TD_RES_QUERY(res)) {
|
||||
SRequestObj *pRequest = (SRequestObj *)res;
|
||||
return pRequest->body.resInfo.precision;
|
||||
} else if (TD_RES_TMQ(res)) {
|
||||
} else if (TD_RES_TMQ(res) || TD_RES_TMQ_METADATA(res)) {
|
||||
SReqResultInfo *info = tmqGetCurResInfo(res);
|
||||
return info->precision;
|
||||
}
|
||||
|
@ -487,7 +487,7 @@ int taos_select_db(TAOS *taos, const char *db) {
|
|||
}
|
||||
|
||||
void taos_stop_query(TAOS_RES *res) {
|
||||
if (res == NULL || TD_RES_TMQ(res) || TD_RES_TMQ_META(res)) {
|
||||
if (res == NULL || TD_RES_TMQ(res) || TD_RES_TMQ_META(res) || TD_RES_TMQ_METADATA(res)) {
|
||||
return;
|
||||
}
|
||||
|
||||
|
@ -559,7 +559,7 @@ int taos_fetch_block_s(TAOS_RES *res, int *numOfRows, TAOS_ROW *rows) {
|
|||
(*rows) = pResultInfo->row;
|
||||
(*numOfRows) = pResultInfo->numOfRows;
|
||||
return pRequest->code;
|
||||
} else if (TD_RES_TMQ(res)) {
|
||||
} else if (TD_RES_TMQ(res) || TD_RES_TMQ_METADATA(res)) {
|
||||
SReqResultInfo *pResultInfo = tmqGetNextResInfo(res, true);
|
||||
if (pResultInfo == NULL) return -1;
|
||||
|
||||
|
@ -578,7 +578,7 @@ int taos_fetch_raw_block(TAOS_RES *res, int *numOfRows, void **pData) {
|
|||
return 0;
|
||||
}
|
||||
|
||||
if (TD_RES_TMQ(res)) {
|
||||
if (TD_RES_TMQ(res) || TD_RES_TMQ_METADATA(res)) {
|
||||
SReqResultInfo *pResultInfo = tmqGetNextResInfo(res, false);
|
||||
if (pResultInfo == NULL) {
|
||||
(*numOfRows) = 0;
|
||||
|
|
|
@ -30,7 +30,7 @@ static char* buildCreateTableJson(SSchemaWrapper* schemaRow, SSchemaWrapper* sch
|
|||
char* string = NULL;
|
||||
cJSON* json = cJSON_CreateObject();
|
||||
if (json == NULL) {
|
||||
return string;
|
||||
return NULL;
|
||||
}
|
||||
cJSON* type = cJSON_CreateString("create");
|
||||
cJSON_AddItemToObject(json, "type", type);
|
||||
|
@ -39,10 +39,10 @@ static char* buildCreateTableJson(SSchemaWrapper* schemaRow, SSchemaWrapper* sch
|
|||
// sprintf(uid, "%"PRIi64, id);
|
||||
// cJSON* id_ = cJSON_CreateString(uid);
|
||||
// cJSON_AddItemToObject(json, "id", id_);
|
||||
cJSON* tableName = cJSON_CreateString(name);
|
||||
cJSON_AddItemToObject(json, "tableName", tableName);
|
||||
cJSON* tableType = cJSON_CreateString(t == TSDB_NORMAL_TABLE ? "normal" : "super");
|
||||
cJSON_AddItemToObject(json, "tableType", tableType);
|
||||
cJSON* tableName = cJSON_CreateString(name);
|
||||
cJSON_AddItemToObject(json, "tableName", tableName);
|
||||
// cJSON* version = cJSON_CreateNumber(1);
|
||||
// cJSON_AddItemToObject(json, "version", version);
|
||||
|
||||
|
@ -112,10 +112,10 @@ static char* buildAlterSTableJson(void* alterData, int32_t alterDataLen) {
|
|||
// cJSON_AddItemToObject(json, "uid", uid);
|
||||
SName name = {0};
|
||||
tNameFromString(&name, req.name, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
|
||||
cJSON* tableName = cJSON_CreateString(name.tname);
|
||||
cJSON_AddItemToObject(json, "tableName", tableName);
|
||||
cJSON* tableType = cJSON_CreateString("super");
|
||||
cJSON_AddItemToObject(json, "tableType", tableType);
|
||||
cJSON* tableName = cJSON_CreateString(name.tname);
|
||||
cJSON_AddItemToObject(json, "tableName", tableName);
|
||||
|
||||
cJSON* alterType = cJSON_CreateNumber(req.alterType);
|
||||
cJSON_AddItemToObject(json, "alterType", alterType);
|
||||
|
@ -199,8 +199,6 @@ static char* processCreateStb(SMqMetaRsp* metaRsp) {
|
|||
goto _err;
|
||||
}
|
||||
string = buildCreateTableJson(&req.schemaRow, &req.schemaTag, req.name, req.suid, TSDB_SUPER_TABLE);
|
||||
tDecoderClear(&coder);
|
||||
return string;
|
||||
|
||||
_err:
|
||||
tDecoderClear(&coder);
|
||||
|
@ -221,32 +219,22 @@ static char* processAlterStb(SMqMetaRsp* metaRsp) {
|
|||
goto _err;
|
||||
}
|
||||
string = buildAlterSTableJson(req.alterOriData, req.alterOriDataLen);
|
||||
tDecoderClear(&coder);
|
||||
return string;
|
||||
|
||||
_err:
|
||||
tDecoderClear(&coder);
|
||||
return string;
|
||||
}
|
||||
|
||||
static char* buildCreateCTableJson(STag* pTag, char* sname, char* name, SArray* tagName, int64_t id, uint8_t tagNum) {
|
||||
char* string = NULL;
|
||||
SArray* pTagVals = NULL;
|
||||
cJSON* json = cJSON_CreateObject();
|
||||
if (json == NULL) {
|
||||
return string;
|
||||
}
|
||||
cJSON* type = cJSON_CreateString("create");
|
||||
cJSON_AddItemToObject(json, "type", type);
|
||||
// char cid[32] = {0};
|
||||
// sprintf(cid, "%"PRIi64, id);
|
||||
// cJSON* cid_ = cJSON_CreateString(cid);
|
||||
// cJSON_AddItemToObject(json, "id", cid_);
|
||||
static void buildChildElement(cJSON* json, SVCreateTbReq* pCreateReq){
|
||||
STag* pTag = (STag*)pCreateReq->ctb.pTag;
|
||||
char* sname = pCreateReq->ctb.name;
|
||||
char* name = pCreateReq->name;
|
||||
SArray* tagName = pCreateReq->ctb.tagName;
|
||||
int64_t id = pCreateReq->uid;
|
||||
uint8_t tagNum = pCreateReq->ctb.tagNum;
|
||||
|
||||
cJSON* tableName = cJSON_CreateString(name);
|
||||
cJSON_AddItemToObject(json, "tableName", tableName);
|
||||
cJSON* tableType = cJSON_CreateString("child");
|
||||
cJSON_AddItemToObject(json, "tableType", tableType);
|
||||
cJSON* using = cJSON_CreateString(sname);
|
||||
cJSON_AddItemToObject(json, "using", using);
|
||||
cJSON* tagNumJson = cJSON_CreateNumber(tagNum);
|
||||
|
@ -255,6 +243,7 @@ static char* buildCreateCTableJson(STag* pTag, char* sname, char* name, SArray*
|
|||
// cJSON_AddItemToObject(json, "version", version);
|
||||
|
||||
cJSON* tags = cJSON_CreateArray();
|
||||
SArray* pTagVals = NULL;
|
||||
int32_t code = tTagToValArray(pTag, &pTagVals);
|
||||
if (code) {
|
||||
goto end;
|
||||
|
@ -313,11 +302,37 @@ static char* buildCreateCTableJson(STag* pTag, char* sname, char* name, SArray*
|
|||
cJSON_AddItemToArray(tags, tag);
|
||||
}
|
||||
|
||||
end:
|
||||
end:
|
||||
cJSON_AddItemToObject(json, "tags", tags);
|
||||
taosArrayDestroy(pTagVals);
|
||||
}
|
||||
|
||||
static char* buildCreateCTableJson(SVCreateTbReq* pCreateReq, int32_t nReqs) {
|
||||
char* string = NULL;
|
||||
cJSON* json = cJSON_CreateObject();
|
||||
if (json == NULL) {
|
||||
return NULL;
|
||||
}
|
||||
cJSON* type = cJSON_CreateString("create");
|
||||
cJSON_AddItemToObject(json, "type", type);
|
||||
// char cid[32] = {0};
|
||||
// sprintf(cid, "%"PRIi64, id);
|
||||
// cJSON* cid_ = cJSON_CreateString(cid);
|
||||
// cJSON_AddItemToObject(json, "id", cid_);
|
||||
|
||||
cJSON* tableType = cJSON_CreateString("child");
|
||||
cJSON_AddItemToObject(json, "tableType", tableType);
|
||||
|
||||
buildChildElement(json, pCreateReq);
|
||||
cJSON* createList = cJSON_CreateArray();
|
||||
for(int i = 0; nReqs > 1 && i < nReqs; i++){
|
||||
cJSON* create = cJSON_CreateObject();
|
||||
buildChildElement(create, pCreateReq + i);
|
||||
cJSON_AddItemToArray(createList, create);
|
||||
}
|
||||
cJSON_AddItemToObject(json, "createList", createList);
|
||||
string = cJSON_PrintUnformatted(json);
|
||||
cJSON_Delete(json);
|
||||
taosArrayDestroy(pTagVals);
|
||||
return string;
|
||||
}
|
||||
|
||||
|
@ -335,24 +350,61 @@ static char* processCreateTable(SMqMetaRsp* metaRsp) {
|
|||
}
|
||||
|
||||
// loop to create table
|
||||
for (int32_t iReq = 0; iReq < req.nReqs; iReq++) {
|
||||
pCreateReq = req.pReqs + iReq;
|
||||
if (req.nReqs > 0) {
|
||||
pCreateReq = req.pReqs;
|
||||
if (pCreateReq->type == TSDB_CHILD_TABLE) {
|
||||
string = buildCreateCTableJson((STag*)pCreateReq->ctb.pTag, pCreateReq->ctb.name, pCreateReq->name,
|
||||
pCreateReq->ctb.tagName, pCreateReq->uid, pCreateReq->ctb.tagNum);
|
||||
string = buildCreateCTableJson(req.pReqs, req.nReqs);
|
||||
} else if (pCreateReq->type == TSDB_NORMAL_TABLE) {
|
||||
string =
|
||||
buildCreateTableJson(&pCreateReq->ntb.schemaRow, NULL, pCreateReq->name, pCreateReq->uid, TSDB_NORMAL_TABLE);
|
||||
string = buildCreateTableJson(&pCreateReq->ntb.schemaRow, NULL, pCreateReq->name, pCreateReq->uid, TSDB_NORMAL_TABLE);
|
||||
}
|
||||
}
|
||||
|
||||
tDecoderClear(&decoder);
|
||||
|
||||
_exit:
|
||||
for (int32_t iReq = 0; iReq < req.nReqs; iReq++) {
|
||||
pCreateReq = req.pReqs + iReq;
|
||||
taosMemoryFreeClear(pCreateReq->comment);
|
||||
if (pCreateReq->type == TSDB_CHILD_TABLE) {
|
||||
taosArrayDestroy(pCreateReq->ctb.tagName);
|
||||
}
|
||||
}
|
||||
tDecoderClear(&decoder);
|
||||
return string;
|
||||
}
|
||||
|
||||
static char* processAutoCreateTable(STaosxRsp* rsp) {
|
||||
ASSERT(rsp->createTableNum != 0);
|
||||
|
||||
SDecoder* decoder = taosMemoryCalloc(rsp->createTableNum, sizeof(SDecoder));
|
||||
SVCreateTbReq* pCreateReq = taosMemoryCalloc(rsp->createTableNum, sizeof(SVCreateTbReq));
|
||||
char* string = NULL;
|
||||
|
||||
// loop to create table
|
||||
for (int32_t iReq = 0; iReq < rsp->createTableNum; iReq++) {
|
||||
// decode
|
||||
void** data = taosArrayGet(rsp->createTableReq, iReq);
|
||||
int32_t *len = taosArrayGet(rsp->createTableLen, iReq);
|
||||
tDecoderInit(&decoder[iReq], *data, *len);
|
||||
if (tDecodeSVCreateTbReq(&decoder[iReq], pCreateReq + iReq) < 0) {
|
||||
goto _exit;
|
||||
}
|
||||
|
||||
ASSERT(pCreateReq[iReq].type == TSDB_CHILD_TABLE);
|
||||
}
|
||||
string = buildCreateCTableJson(pCreateReq, rsp->createTableNum);
|
||||
|
||||
_exit:
|
||||
for(int i = 0; i < rsp->createTableNum; i++){
|
||||
tDecoderClear(&decoder[i]);
|
||||
taosMemoryFreeClear(pCreateReq[i].comment);
|
||||
if (pCreateReq[i].type == TSDB_CHILD_TABLE) {
|
||||
taosArrayDestroy(pCreateReq[i].ctb.tagName);
|
||||
}
|
||||
}
|
||||
taosMemoryFree(decoder);
|
||||
taosMemoryFree(pCreateReq);
|
||||
return string;
|
||||
}
|
||||
|
||||
static char* processAlterTable(SMqMetaRsp* metaRsp) {
|
||||
SDecoder decoder = {0};
|
||||
SVAlterTbReq vAlterTbReq = {0};
|
||||
|
@ -374,10 +426,10 @@ static char* processAlterTable(SMqMetaRsp* metaRsp) {
|
|||
cJSON_AddItemToObject(json, "type", type);
|
||||
// cJSON* uid = cJSON_CreateNumber(id);
|
||||
// cJSON_AddItemToObject(json, "uid", uid);
|
||||
cJSON* tableName = cJSON_CreateString(vAlterTbReq.tbName);
|
||||
cJSON_AddItemToObject(json, "tableName", tableName);
|
||||
cJSON* tableType = cJSON_CreateString(vAlterTbReq.action == TSDB_ALTER_TABLE_UPDATE_TAG_VAL ? "child" : "normal");
|
||||
cJSON_AddItemToObject(json, "tableType", tableType);
|
||||
cJSON* tableName = cJSON_CreateString(vAlterTbReq.tbName);
|
||||
cJSON_AddItemToObject(json, "tableName", tableName);
|
||||
cJSON* alterType = cJSON_CreateNumber(vAlterTbReq.action);
|
||||
cJSON_AddItemToObject(json, "alterType", alterType);
|
||||
|
||||
|
@ -462,6 +514,7 @@ static char* processAlterTable(SMqMetaRsp* metaRsp) {
|
|||
string = cJSON_PrintUnformatted(json);
|
||||
|
||||
_exit:
|
||||
cJSON_Delete(json);
|
||||
tDecoderClear(&decoder);
|
||||
return string;
|
||||
}
|
||||
|
@ -485,14 +538,15 @@ static char* processDropSTable(SMqMetaRsp* metaRsp) {
|
|||
}
|
||||
cJSON* type = cJSON_CreateString("drop");
|
||||
cJSON_AddItemToObject(json, "type", type);
|
||||
cJSON* tableName = cJSON_CreateString(req.name);
|
||||
cJSON_AddItemToObject(json, "tableName", tableName);
|
||||
cJSON* tableType = cJSON_CreateString("super");
|
||||
cJSON_AddItemToObject(json, "tableType", tableType);
|
||||
cJSON* tableName = cJSON_CreateString(req.name);
|
||||
cJSON_AddItemToObject(json, "tableName", tableName);
|
||||
|
||||
string = cJSON_PrintUnformatted(json);
|
||||
|
||||
_exit:
|
||||
cJSON_Delete(json);
|
||||
tDecoderClear(&decoder);
|
||||
return string;
|
||||
}
|
||||
|
@ -533,6 +587,7 @@ static char* processDropTable(SMqMetaRsp* metaRsp) {
|
|||
string = cJSON_PrintUnformatted(json);
|
||||
|
||||
_exit:
|
||||
cJSON_Delete(json);
|
||||
tDecoderClear(&decoder);
|
||||
return string;
|
||||
}
|
||||
|
@ -549,6 +604,7 @@ static int32_t taosCreateStb(TAOS* taos, void* meta, int32_t metaLen) {
|
|||
goto end;
|
||||
}
|
||||
|
||||
pRequest->syncQuery = true;
|
||||
if (!pRequest->pDb) {
|
||||
code = TSDB_CODE_PAR_DB_NOT_SPECIFIED;
|
||||
goto end;
|
||||
|
@ -637,6 +693,7 @@ static int32_t taosDropStb(TAOS* taos, void* meta, int32_t metaLen) {
|
|||
goto end;
|
||||
}
|
||||
|
||||
pRequest->syncQuery = true;
|
||||
if (!pRequest->pDb) {
|
||||
code = TSDB_CODE_PAR_DB_NOT_SPECIFIED;
|
||||
goto end;
|
||||
|
@ -717,6 +774,7 @@ static int32_t taosCreateTable(TAOS* taos, void* meta, int32_t metaLen) {
|
|||
goto end;
|
||||
}
|
||||
|
||||
pRequest->syncQuery = true;
|
||||
if (!pRequest->pDb) {
|
||||
code = TSDB_CODE_PAR_DB_NOT_SPECIFIED;
|
||||
goto end;
|
||||
|
@ -830,6 +888,14 @@ static int32_t taosCreateTable(TAOS* taos, void* meta, int32_t metaLen) {
|
|||
code = pRequest->code;
|
||||
|
||||
end:
|
||||
for (int32_t iReq = 0; iReq < req.nReqs; iReq++) {
|
||||
pCreateReq = req.pReqs + iReq;
|
||||
taosMemoryFreeClear(pCreateReq->comment);
|
||||
if (pCreateReq->type == TSDB_CHILD_TABLE) {
|
||||
taosArrayDestroy(pCreateReq->ctb.tagName);
|
||||
}
|
||||
}
|
||||
|
||||
taosHashCleanup(pVgroupHashmap);
|
||||
destroyRequest(pRequest);
|
||||
tDecoderClear(&coder);
|
||||
|
@ -860,7 +926,7 @@ static int32_t taosDropTable(TAOS* taos, void* meta, int32_t metaLen) {
|
|||
if (code != TSDB_CODE_SUCCESS) {
|
||||
goto end;
|
||||
}
|
||||
|
||||
pRequest->syncQuery = true;
|
||||
if (!pRequest->pDb) {
|
||||
code = TSDB_CODE_PAR_DB_NOT_SPECIFIED;
|
||||
goto end;
|
||||
|
@ -1033,6 +1099,7 @@ static int32_t taosAlterTable(TAOS* taos, void* meta, int32_t metaLen) {
|
|||
goto end;
|
||||
}
|
||||
|
||||
pRequest->syncQuery = true;
|
||||
if (!pRequest->pDb) {
|
||||
code = TSDB_CODE_PAR_DB_NOT_SPECIFIED;
|
||||
goto end;
|
||||
|
@ -1152,6 +1219,7 @@ int taos_write_raw_block(TAOS* taos, int rows, char* pData, const char* tbname)
|
|||
goto end;
|
||||
}
|
||||
|
||||
pRequest->syncQuery = true;
|
||||
if (!pRequest->pDb) {
|
||||
uError("WriteRaw:not use db");
|
||||
code = TSDB_CODE_PAR_DB_NOT_SPECIFIED;
|
||||
|
@ -1339,6 +1407,7 @@ static int32_t tmqWriteRawDataImpl(TAOS* taos, void* data, int32_t dataLen) {
|
|||
return terrno;
|
||||
}
|
||||
|
||||
pRequest->syncQuery = true;
|
||||
rspObj.resIter = -1;
|
||||
rspObj.resType = RES_TYPE__TMQ;
|
||||
|
||||
|
@ -1529,6 +1598,8 @@ static int32_t tmqWriteRawDataImpl(TAOS* taos, void* data, int32_t dataLen) {
|
|||
subReq->length += sizeof(SSubmitBlk) + schemaLen + totalLen;
|
||||
subReq->numOfBlocks++;
|
||||
taosMemoryFreeClear(pTableMeta);
|
||||
rspObj.resInfo.pRspMsg = NULL;
|
||||
doFreeReqResultInfo(&rspObj.resInfo);
|
||||
}
|
||||
|
||||
pQuery = (SQuery*)nodesMakeNode(QUERY_NODE_QUERY);
|
||||
|
@ -1578,6 +1649,313 @@ static int32_t tmqWriteRawDataImpl(TAOS* taos, void* data, int32_t dataLen) {
|
|||
code = pRequest->code;
|
||||
|
||||
end:
|
||||
tDeleteSMqDataRsp(&rspObj.rsp);
|
||||
rspObj.resInfo.pRspMsg = NULL;
|
||||
doFreeReqResultInfo(&rspObj.resInfo);
|
||||
tDecoderClear(&decoder);
|
||||
qDestroyQuery(pQuery);
|
||||
destroyRequest(pRequest);
|
||||
taosHashCleanup(pVgHash);
|
||||
taosMemoryFreeClear(pTableMeta);
|
||||
return code;
|
||||
}
|
||||
|
||||
static int32_t tmqWriteRawMetaDataImpl(TAOS* taos, void* data, int32_t dataLen) {
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
SHashObj* pVgHash = NULL;
|
||||
SQuery* pQuery = NULL;
|
||||
SMqTaosxRspObj rspObj = {0};
|
||||
SDecoder decoder = {0};
|
||||
STableMeta* pTableMeta = NULL;
|
||||
|
||||
terrno = TSDB_CODE_SUCCESS;
|
||||
SRequestObj* pRequest = (SRequestObj*)createRequest(*(int64_t*)taos, TSDB_SQL_INSERT);
|
||||
if (!pRequest) {
|
||||
uError("WriteRaw:createRequest error request is null");
|
||||
return terrno;
|
||||
}
|
||||
|
||||
pRequest->syncQuery = true;
|
||||
rspObj.resIter = -1;
|
||||
rspObj.resType = RES_TYPE__TMQ_METADATA;
|
||||
|
||||
tDecoderInit(&decoder, data, dataLen);
|
||||
code = tDecodeSTaosxRsp(&decoder, &rspObj.rsp);
|
||||
if (code != 0) {
|
||||
uError("WriteRaw:decode smqDataRsp error");
|
||||
code = TSDB_CODE_INVALID_MSG;
|
||||
goto end;
|
||||
}
|
||||
|
||||
if (!pRequest->pDb) {
|
||||
uError("WriteRaw:not use db");
|
||||
code = TSDB_CODE_PAR_DB_NOT_SPECIFIED;
|
||||
goto end;
|
||||
}
|
||||
|
||||
pVgHash = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_NO_LOCK);
|
||||
taosHashSetFreeFp(pVgHash, destroyVgHash);
|
||||
struct SCatalog* pCatalog = NULL;
|
||||
code = catalogGetHandle(pRequest->pTscObj->pAppInfo->clusterId, &pCatalog);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
uError("WriteRaw: get gatlog error");
|
||||
goto end;
|
||||
}
|
||||
|
||||
SRequestConnInfo conn = {0};
|
||||
conn.pTrans = pRequest->pTscObj->pAppInfo->pTransporter;
|
||||
conn.requestId = pRequest->requestId;
|
||||
conn.requestObjRefId = pRequest->self;
|
||||
conn.mgmtEps = getEpSet_s(&pRequest->pTscObj->pAppInfo->mgmtEp);
|
||||
|
||||
printf("raw data block num:%d\n", rspObj.rsp.blockNum);
|
||||
while (++rspObj.resIter < rspObj.rsp.blockNum) {
|
||||
SRetrieveTableRsp* pRetrieve = (SRetrieveTableRsp*)taosArrayGetP(rspObj.rsp.blockData, rspObj.resIter);
|
||||
if (!rspObj.rsp.withSchema) {
|
||||
uError("WriteRaw:no schema, iter:%d", rspObj.resIter);
|
||||
goto end;
|
||||
}
|
||||
SSchemaWrapper* pSW = (SSchemaWrapper*)taosArrayGetP(rspObj.rsp.blockSchema, rspObj.resIter);
|
||||
setResSchemaInfo(&rspObj.resInfo, pSW->pSchema, pSW->nCols);
|
||||
|
||||
code = setQueryResultFromRsp(&rspObj.resInfo, pRetrieve, false, false);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
uError("WriteRaw: setQueryResultFromRsp error");
|
||||
goto end;
|
||||
}
|
||||
|
||||
const char* tbName = (const char*)taosArrayGetP(rspObj.rsp.blockTbName, rspObj.resIter);
|
||||
if (!tbName) {
|
||||
uError("WriteRaw: tbname is null");
|
||||
code = TSDB_CODE_TMQ_INVALID_MSG;
|
||||
goto end;
|
||||
}
|
||||
|
||||
printf("raw data tbname:%s\n", tbName);
|
||||
SName pName = {TSDB_TABLE_NAME_T, pRequest->pTscObj->acctId, {0}, {0}};
|
||||
strcpy(pName.dbname, pRequest->pDb);
|
||||
strcpy(pName.tname, tbName);
|
||||
|
||||
VgData vgData = {0};
|
||||
code = catalogGetTableHashVgroup(pCatalog, &conn, &pName, &(vgData.vg));
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
uError("WriteRaw:catalogGetTableHashVgroup failed. table name: %s", tbName);
|
||||
goto end;
|
||||
}
|
||||
|
||||
// find schema data info
|
||||
int32_t schemaLen = 0;
|
||||
void* schemaData = NULL;
|
||||
for(int j = 0; j < rspObj.rsp.createTableNum; j++){
|
||||
void** dataTmp = taosArrayGet(rspObj.rsp.createTableReq, j);
|
||||
int32_t* lenTmp = taosArrayGet(rspObj.rsp.createTableLen, j);
|
||||
|
||||
SDecoder decoderTmp = {0};
|
||||
SVCreateTbReq pCreateReq = {0};
|
||||
|
||||
tDecoderInit(&decoderTmp, *dataTmp, *lenTmp);
|
||||
if (tDecodeSVCreateTbReq(&decoderTmp, &pCreateReq) < 0) {
|
||||
tDecoderClear(&decoderTmp);
|
||||
taosMemoryFreeClear(pCreateReq.comment);
|
||||
taosArrayDestroy(pCreateReq.ctb.tagName);
|
||||
goto end;
|
||||
}
|
||||
|
||||
ASSERT (pCreateReq.type == TSDB_CHILD_TABLE);
|
||||
if(strcmp(tbName, pCreateReq.name) == 0){
|
||||
schemaLen = *lenTmp;
|
||||
schemaData = *dataTmp;
|
||||
strcpy(pName.tname, pCreateReq.ctb.name);
|
||||
tDecoderClear(&decoderTmp);
|
||||
taosMemoryFreeClear(pCreateReq.comment);
|
||||
taosArrayDestroy(pCreateReq.ctb.tagName);
|
||||
break;
|
||||
}
|
||||
tDecoderClear(&decoderTmp);
|
||||
taosMemoryFreeClear(pCreateReq.comment);
|
||||
taosArrayDestroy(pCreateReq.ctb.tagName);
|
||||
}
|
||||
|
||||
code = catalogGetTableMeta(pCatalog, &conn, &pName, &pTableMeta);
|
||||
if (code == TSDB_CODE_PAR_TABLE_NOT_EXIST) {
|
||||
uError("WriteRaw:catalogGetTableMeta table not exist. table name: %s", tbName);
|
||||
code = TSDB_CODE_SUCCESS;
|
||||
continue;
|
||||
}
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
uError("WriteRaw:catalogGetTableMeta failed. table name: %s", tbName);
|
||||
goto end;
|
||||
}
|
||||
|
||||
uint16_t fLen = 0;
|
||||
int32_t rowSize = 0;
|
||||
int16_t nVar = 0;
|
||||
for (int i = 0; i < pTableMeta->tableInfo.numOfColumns; i++) {
|
||||
SSchema* schema = &pTableMeta->schema[i];
|
||||
fLen += TYPE_BYTES[schema->type];
|
||||
rowSize += schema->bytes;
|
||||
if (IS_VAR_DATA_TYPE(schema->type)) {
|
||||
nVar++;
|
||||
}
|
||||
}
|
||||
|
||||
int32_t rows = rspObj.resInfo.numOfRows;
|
||||
int32_t extendedRowSize = rowSize + TD_ROW_HEAD_LEN - sizeof(TSKEY) + nVar * sizeof(VarDataOffsetT) +
|
||||
(int32_t)TD_BITMAP_BYTES(pTableMeta->tableInfo.numOfColumns - 1);
|
||||
|
||||
int32_t submitLen = sizeof(SSubmitBlk) + schemaLen + rows * extendedRowSize;
|
||||
|
||||
SSubmitReq* subReq = NULL;
|
||||
SSubmitBlk* blk = NULL;
|
||||
void* hData = taosHashGet(pVgHash, &vgData.vg.vgId, sizeof(vgData.vg.vgId));
|
||||
if (hData) {
|
||||
vgData = *(VgData*)hData;
|
||||
|
||||
int32_t totalLen = ((SSubmitReq*)(vgData.data))->length + submitLen;
|
||||
void* tmp = taosMemoryRealloc(vgData.data, totalLen);
|
||||
if (tmp == NULL) {
|
||||
code = TSDB_CODE_TSC_OUT_OF_MEMORY;
|
||||
goto end;
|
||||
}
|
||||
vgData.data = tmp;
|
||||
((VgData*)hData)->data = tmp;
|
||||
subReq = (SSubmitReq*)(vgData.data);
|
||||
blk = POINTER_SHIFT(vgData.data, subReq->length);
|
||||
} else {
|
||||
int32_t totalLen = sizeof(SSubmitReq) + submitLen;
|
||||
void* tmp = taosMemoryCalloc(1, totalLen);
|
||||
if (tmp == NULL) {
|
||||
code = TSDB_CODE_TSC_OUT_OF_MEMORY;
|
||||
goto end;
|
||||
}
|
||||
vgData.data = tmp;
|
||||
taosHashPut(pVgHash, (const char*)&vgData.vg.vgId, sizeof(vgData.vg.vgId), (char*)&vgData, sizeof(vgData));
|
||||
subReq = (SSubmitReq*)(vgData.data);
|
||||
subReq->length = sizeof(SSubmitReq);
|
||||
subReq->numOfBlocks = 0;
|
||||
|
||||
blk = POINTER_SHIFT(vgData.data, sizeof(SSubmitReq));
|
||||
}
|
||||
|
||||
// pSW->pSchema should be same as pTableMeta->schema
|
||||
// ASSERT(pSW->nCols == pTableMeta->tableInfo.numOfColumns);
|
||||
uint64_t suid = (TSDB_NORMAL_TABLE == pTableMeta->tableType ? 0 : pTableMeta->suid);
|
||||
uint64_t uid = pTableMeta->uid;
|
||||
int16_t sver = pTableMeta->sversion;
|
||||
|
||||
void* blkSchema = POINTER_SHIFT(blk, sizeof(SSubmitBlk));
|
||||
if(schemaData){
|
||||
memcpy(blkSchema, schemaData, schemaLen);
|
||||
}
|
||||
STSRow* rowData = POINTER_SHIFT(blkSchema, schemaLen);
|
||||
|
||||
SRowBuilder rb = {0};
|
||||
tdSRowInit(&rb, sver);
|
||||
tdSRowSetTpInfo(&rb, pTableMeta->tableInfo.numOfColumns, fLen);
|
||||
int32_t totalLen = 0;
|
||||
|
||||
SHashObj* schemaHash = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK);
|
||||
for (int i = 0; i < pSW->nCols; i++) {
|
||||
SSchema* schema = &pSW->pSchema[i];
|
||||
taosHashPut(schemaHash, schema->name, strlen(schema->name), &i, sizeof(int32_t));
|
||||
}
|
||||
|
||||
for (int32_t j = 0; j < rows; j++) {
|
||||
tdSRowResetBuf(&rb, rowData);
|
||||
|
||||
doSetOneRowPtr(&rspObj.resInfo);
|
||||
rspObj.resInfo.current += 1;
|
||||
|
||||
int32_t offset = 0;
|
||||
for (int32_t k = 0; k < pTableMeta->tableInfo.numOfColumns; k++) {
|
||||
const SSchema* pColumn = &pTableMeta->schema[k];
|
||||
int32_t* index = taosHashGet(schemaHash, pColumn->name, strlen(pColumn->name));
|
||||
if (!index) {
|
||||
tdAppendColValToRow(&rb, pColumn->colId, pColumn->type, TD_VTYPE_NULL, NULL, false, offset, k);
|
||||
} else {
|
||||
char* colData = rspObj.resInfo.row[*index];
|
||||
if (!colData) {
|
||||
tdAppendColValToRow(&rb, pColumn->colId, pColumn->type, TD_VTYPE_NULL, NULL, false, offset, k);
|
||||
} else {
|
||||
if (IS_VAR_DATA_TYPE(pColumn->type)) {
|
||||
colData -= VARSTR_HEADER_SIZE;
|
||||
}
|
||||
tdAppendColValToRow(&rb, pColumn->colId, pColumn->type, TD_VTYPE_NORM, colData, true, offset, k);
|
||||
}
|
||||
}
|
||||
|
||||
offset += TYPE_BYTES[pColumn->type];
|
||||
}
|
||||
tdSRowEnd(&rb);
|
||||
int32_t rowLen = TD_ROW_LEN(rowData);
|
||||
rowData = POINTER_SHIFT(rowData, rowLen);
|
||||
totalLen += rowLen;
|
||||
}
|
||||
|
||||
taosHashCleanup(schemaHash);
|
||||
blk->uid = htobe64(uid);
|
||||
blk->suid = htobe64(suid);
|
||||
blk->sversion = htonl(sver);
|
||||
blk->schemaLen = htonl(schemaLen);
|
||||
blk->numOfRows = htonl(rows);
|
||||
blk->dataLen = htonl(totalLen);
|
||||
subReq->length += sizeof(SSubmitBlk) + schemaLen + totalLen;
|
||||
subReq->numOfBlocks++;
|
||||
taosMemoryFreeClear(pTableMeta);
|
||||
rspObj.resInfo.pRspMsg = NULL;
|
||||
doFreeReqResultInfo(&rspObj.resInfo);
|
||||
}
|
||||
|
||||
pQuery = (SQuery*)nodesMakeNode(QUERY_NODE_QUERY);
|
||||
if (NULL == pQuery) {
|
||||
uError("create SQuery error");
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
goto end;
|
||||
}
|
||||
pQuery->execMode = QUERY_EXEC_MODE_SCHEDULE;
|
||||
pQuery->haveResultSet = false;
|
||||
pQuery->msgType = TDMT_VND_SUBMIT;
|
||||
pQuery->pRoot = (SNode*)nodesMakeNode(QUERY_NODE_VNODE_MODIF_STMT);
|
||||
if (NULL == pQuery->pRoot) {
|
||||
uError("create pQuery->pRoot error");
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
goto end;
|
||||
}
|
||||
SVnodeModifOpStmt* nodeStmt = (SVnodeModifOpStmt*)(pQuery->pRoot);
|
||||
nodeStmt->payloadType = PAYLOAD_TYPE_KV;
|
||||
|
||||
int32_t numOfVg = taosHashGetSize(pVgHash);
|
||||
nodeStmt->pDataBlocks = taosArrayInit(numOfVg, POINTER_BYTES);
|
||||
|
||||
VgData* vData = (VgData*)taosHashIterate(pVgHash, NULL);
|
||||
while (vData) {
|
||||
SVgDataBlocks* dst = taosMemoryCalloc(1, sizeof(SVgDataBlocks));
|
||||
if (NULL == dst) {
|
||||
code = TSDB_CODE_TSC_OUT_OF_MEMORY;
|
||||
goto end;
|
||||
}
|
||||
dst->vg = vData->vg;
|
||||
SSubmitReq* subReq = (SSubmitReq*)(vData->data);
|
||||
dst->numOfTables = subReq->numOfBlocks;
|
||||
dst->size = subReq->length;
|
||||
dst->pData = (char*)subReq;
|
||||
vData->data = NULL; // no need free
|
||||
subReq->header.vgId = htonl(dst->vg.vgId);
|
||||
subReq->version = htonl(1);
|
||||
subReq->header.contLen = htonl(subReq->length);
|
||||
subReq->length = htonl(subReq->length);
|
||||
subReq->numOfBlocks = htonl(subReq->numOfBlocks);
|
||||
taosArrayPush(nodeStmt->pDataBlocks, &dst);
|
||||
vData = (VgData*)taosHashIterate(pVgHash, vData);
|
||||
}
|
||||
|
||||
launchQueryImpl(pRequest, pQuery, true, NULL);
|
||||
code = pRequest->code;
|
||||
|
||||
end:
|
||||
tDeleteSTaosxRsp(&rspObj.rsp);
|
||||
rspObj.resInfo.pRspMsg = NULL;
|
||||
doFreeReqResultInfo(&rspObj.resInfo);
|
||||
tDecoderClear(&decoder);
|
||||
qDestroyQuery(pQuery);
|
||||
destroyRequest(pRequest);
|
||||
|
@ -1587,10 +1965,15 @@ end:
|
|||
}
|
||||
|
||||
char* tmq_get_json_meta(TAOS_RES* res) {
|
||||
if (!TD_RES_TMQ_META(res)) {
|
||||
if (!TD_RES_TMQ_META(res) && !TD_RES_TMQ_METADATA(res)) {
|
||||
return NULL;
|
||||
}
|
||||
|
||||
if(TD_RES_TMQ_METADATA(res)){
|
||||
SMqTaosxRspObj* pMetaDataRspObj = (SMqTaosxRspObj*)res;
|
||||
return processAutoCreateTable(&pMetaDataRspObj->rsp);
|
||||
}
|
||||
|
||||
SMqMetaRspObj* pMetaRspObj = (SMqMetaRspObj*)res;
|
||||
if (pMetaRspObj->metaRsp.resMsgType == TDMT_VND_CREATE_STB) {
|
||||
return processCreateStb(&pMetaRspObj->metaRsp);
|
||||
|
@ -1638,6 +2021,25 @@ int32_t tmq_get_raw(TAOS_RES* res, tmq_raw_data* raw) {
|
|||
raw->raw = buf;
|
||||
raw->raw_len = len;
|
||||
raw->raw_type = RES_TYPE__TMQ;
|
||||
} else if (TD_RES_TMQ_METADATA(res)) {
|
||||
SMqTaosxRspObj* rspObj = ((SMqTaosxRspObj*)res);
|
||||
|
||||
int32_t len = 0;
|
||||
int32_t code = 0;
|
||||
tEncodeSize(tEncodeSTaosxRsp, &rspObj->rsp, len, code);
|
||||
if (code < 0) {
|
||||
return -1;
|
||||
}
|
||||
|
||||
void* buf = taosMemoryCalloc(1, len);
|
||||
SEncoder encoder = {0};
|
||||
tEncoderInit(&encoder, buf, len);
|
||||
tEncodeSTaosxRsp(&encoder, &rspObj->rsp);
|
||||
tEncoderClear(&encoder);
|
||||
|
||||
raw->raw = buf;
|
||||
raw->raw_len = len;
|
||||
raw->raw_type = RES_TYPE__TMQ_METADATA;
|
||||
} else {
|
||||
return TSDB_CODE_TMQ_INVALID_MSG;
|
||||
}
|
||||
|
@ -1645,7 +2047,7 @@ int32_t tmq_get_raw(TAOS_RES* res, tmq_raw_data* raw) {
|
|||
}
|
||||
|
||||
void tmq_free_raw(tmq_raw_data raw) {
|
||||
if (raw.raw_type == RES_TYPE__TMQ) {
|
||||
if (raw.raw_type == RES_TYPE__TMQ || raw.raw_type == RES_TYPE__TMQ_METADATA) {
|
||||
taosMemoryFree(raw.raw);
|
||||
}
|
||||
}
|
||||
|
@ -1671,6 +2073,8 @@ int32_t tmq_write_raw(TAOS* taos, tmq_raw_data raw) {
|
|||
return taosDeleteData(taos, raw.raw, raw.raw_len);
|
||||
} else if (raw.raw_type == RES_TYPE__TMQ) {
|
||||
return tmqWriteRawDataImpl(taos, raw.raw, raw.raw_len);
|
||||
} else if (raw.raw_type == RES_TYPE__TMQ_METADATA) {
|
||||
return tmqWriteRawMetaDataImpl(taos, raw.raw, raw.raw_len);
|
||||
}
|
||||
return TSDB_CODE_INVALID_PARA;
|
||||
}
|
||||
|
|
|
@ -515,6 +515,10 @@ int32_t tmqCommitMsgImpl(tmq_t* tmq, const TAOS_RES* msg, int8_t async, tmq_comm
|
|||
SMqMetaRspObj* pMetaRspObj = (SMqMetaRspObj*)msg;
|
||||
topic = pMetaRspObj->topic;
|
||||
vgId = pMetaRspObj->vgId;
|
||||
} else if(TD_RES_TMQ_METADATA(msg)) {
|
||||
SMqTaosxRspObj* pRspObj = (SMqTaosxRspObj*)msg;
|
||||
topic = pRspObj->topic;
|
||||
vgId = pRspObj->vgId;
|
||||
} else {
|
||||
return TSDB_CODE_TMQ_INVALID_MSG;
|
||||
}
|
||||
|
@ -1471,16 +1475,16 @@ SMqRspObj* tmqBuildRspFromWrapper(SMqPollRspWrapper* pWrapper) {
|
|||
|
||||
SMqTaosxRspObj* tmqBuildTaosxRspFromWrapper(SMqPollRspWrapper* pWrapper) {
|
||||
SMqTaosxRspObj* pRspObj = taosMemoryCalloc(1, sizeof(SMqTaosxRspObj));
|
||||
pRspObj->resType = RES_TYPE__TAOSX;
|
||||
pRspObj->resType = RES_TYPE__TMQ_METADATA;
|
||||
tstrncpy(pRspObj->topic, pWrapper->topicHandle->topicName, TSDB_TOPIC_FNAME_LEN);
|
||||
tstrncpy(pRspObj->db, pWrapper->topicHandle->db, TSDB_DB_FNAME_LEN);
|
||||
pRspObj->vgId = pWrapper->vgHandle->vgId;
|
||||
pRspObj->resIter = -1;
|
||||
memcpy(&pRspObj->rsp, &pWrapper->dataRsp, sizeof(SMqTaosxRspObj));
|
||||
memcpy(&pRspObj->rsp, &pWrapper->taosxRsp, sizeof(STaosxRsp));
|
||||
|
||||
pRspObj->resInfo.totalRows = 0;
|
||||
pRspObj->resInfo.precision = TSDB_TIME_PRECISION_MILLI;
|
||||
if (!pWrapper->dataRsp.withSchema) {
|
||||
if (!pWrapper->taosxRsp.withSchema) {
|
||||
setResSchemaInfo(&pRspObj->resInfo, pWrapper->topicHandle->schema.pSchema, pWrapper->topicHandle->schema.nCols);
|
||||
}
|
||||
|
||||
|
@ -1654,8 +1658,14 @@ void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) {
|
|||
rspWrapper = NULL;
|
||||
continue;
|
||||
}
|
||||
|
||||
// build rsp
|
||||
SMqRspObj* pRsp = tmqBuildRspFromWrapper(pollRspWrapper);
|
||||
void* pRsp = NULL;
|
||||
if(pollRspWrapper->taosxRsp.createTableNum == 0){
|
||||
pRsp = tmqBuildRspFromWrapper(pollRspWrapper);
|
||||
}else{
|
||||
pRsp = tmqBuildTaosxRspFromWrapper(pollRspWrapper);
|
||||
}
|
||||
taosFreeQitem(pollRspWrapper);
|
||||
return pRsp;
|
||||
} else {
|
||||
|
@ -1775,11 +1785,11 @@ tmq_res_t tmq_get_res_type(TAOS_RES* res) {
|
|||
} else if (TD_RES_TMQ_META(res)) {
|
||||
SMqMetaRspObj* pMetaRspObj = (SMqMetaRspObj*)res;
|
||||
if (pMetaRspObj->metaRsp.resMsgType == TDMT_VND_DELETE) {
|
||||
return TMQ_RES_TAOSX;
|
||||
return TMQ_RES_DATA;
|
||||
}
|
||||
return TMQ_RES_TABLE_META;
|
||||
} else if (TD_RES_TMQ_TAOSX(res)) {
|
||||
return TMQ_RES_DATA;
|
||||
} else if (TD_RES_TMQ_METADATA(res)) {
|
||||
return TMQ_RES_METADATA;
|
||||
} else {
|
||||
return TMQ_RES_INVALID;
|
||||
}
|
||||
|
@ -1792,6 +1802,9 @@ const char* tmq_get_topic_name(TAOS_RES* res) {
|
|||
} else if (TD_RES_TMQ_META(res)) {
|
||||
SMqMetaRspObj* pMetaRspObj = (SMqMetaRspObj*)res;
|
||||
return strchr(pMetaRspObj->topic, '.') + 1;
|
||||
} else if (TD_RES_TMQ_METADATA(res)) {
|
||||
SMqTaosxRspObj* pRspObj = (SMqTaosxRspObj*)res;
|
||||
return strchr(pRspObj->topic, '.') + 1;
|
||||
} else {
|
||||
return NULL;
|
||||
}
|
||||
|
@ -1804,6 +1817,9 @@ const char* tmq_get_db_name(TAOS_RES* res) {
|
|||
} else if (TD_RES_TMQ_META(res)) {
|
||||
SMqMetaRspObj* pMetaRspObj = (SMqMetaRspObj*)res;
|
||||
return strchr(pMetaRspObj->db, '.') + 1;
|
||||
} else if (TD_RES_TMQ_METADATA(res)) {
|
||||
SMqTaosxRspObj* pRspObj = (SMqTaosxRspObj*)res;
|
||||
return strchr(pRspObj->db, '.') + 1;
|
||||
} else {
|
||||
return NULL;
|
||||
}
|
||||
|
@ -1816,6 +1832,9 @@ int32_t tmq_get_vgroup_id(TAOS_RES* res) {
|
|||
} else if (TD_RES_TMQ_META(res)) {
|
||||
SMqMetaRspObj* pMetaRspObj = (SMqMetaRspObj*)res;
|
||||
return pMetaRspObj->vgId;
|
||||
} else if (TD_RES_TMQ_METADATA(res)) {
|
||||
SMqTaosxRspObj* pRspObj = (SMqTaosxRspObj*)res;
|
||||
return pRspObj->vgId;
|
||||
} else {
|
||||
return -1;
|
||||
}
|
||||
|
@ -1829,6 +1848,13 @@ const char* tmq_get_table_name(TAOS_RES* res) {
|
|||
return NULL;
|
||||
}
|
||||
return (const char*)taosArrayGetP(pRspObj->rsp.blockTbName, pRspObj->resIter);
|
||||
} else if (TD_RES_TMQ_METADATA(res)) {
|
||||
SMqTaosxRspObj* pRspObj = (SMqTaosxRspObj*)res;
|
||||
if (!pRspObj->rsp.withTbName || pRspObj->rsp.blockTbName == NULL || pRspObj->resIter < 0 ||
|
||||
pRspObj->resIter >= pRspObj->rsp.blockNum) {
|
||||
return NULL;
|
||||
}
|
||||
return (const char*)taosArrayGetP(pRspObj->rsp.blockTbName, pRspObj->resIter);
|
||||
}
|
||||
return NULL;
|
||||
}
|
||||
|
|
|
@ -6018,12 +6018,18 @@ int32_t tDecodeSTaosxRsp(SDecoder *pDecoder, STaosxRsp *pRsp) {
|
|||
|
||||
void tDeleteSTaosxRsp(STaosxRsp *pRsp) {
|
||||
taosArrayDestroy(pRsp->blockDataLen);
|
||||
pRsp->blockDataLen = NULL;
|
||||
taosArrayDestroyP(pRsp->blockData, (FDelete)taosMemoryFree);
|
||||
pRsp->blockData = NULL;
|
||||
taosArrayDestroyP(pRsp->blockSchema, (FDelete)tDeleteSSchemaWrapper);
|
||||
pRsp->blockSchema = NULL;
|
||||
taosArrayDestroyP(pRsp->blockTbName, (FDelete)taosMemoryFree);
|
||||
pRsp->blockTbName = NULL;
|
||||
|
||||
taosArrayDestroy(pRsp->createTableLen);
|
||||
pRsp->createTableLen = NULL;
|
||||
taosArrayDestroyP(pRsp->createTableReq, (FDelete)taosMemoryFree);
|
||||
pRsp->createTableReq = NULL;
|
||||
}
|
||||
|
||||
int32_t tEncodeSSingleDeleteReq(SEncoder *pEncoder, const SSingleDeleteReq *pReq) {
|
||||
|
|
|
@ -540,7 +540,7 @@ typedef struct {
|
|||
} SMqConsumerEp;
|
||||
|
||||
SMqConsumerEp* tCloneSMqConsumerEp(const SMqConsumerEp* pEp);
|
||||
void tDeleteSMqConsumerEp(SMqConsumerEp* pEp);
|
||||
void tDeleteSMqConsumerEp(void* pEp);
|
||||
int32_t tEncodeSMqConsumerEp(void** buf, const SMqConsumerEp* pEp);
|
||||
void* tDecodeSMqConsumerEp(const void* buf, SMqConsumerEp* pEp);
|
||||
|
||||
|
|
|
@ -197,11 +197,12 @@ static int32_t mndProcessMqTimerMsg(SRpcMsg *pMsg) {
|
|||
SMqConsumerLostMsg *pLostMsg = rpcMallocCont(sizeof(SMqConsumerLostMsg));
|
||||
|
||||
pLostMsg->consumerId = pConsumer->consumerId;
|
||||
SRpcMsg *pRpcMsg = taosMemoryCalloc(1, sizeof(SRpcMsg));
|
||||
pRpcMsg->msgType = TDMT_MND_MQ_CONSUMER_LOST;
|
||||
pRpcMsg->pCont = pLostMsg;
|
||||
pRpcMsg->contLen = sizeof(SMqConsumerLostMsg);
|
||||
tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, pRpcMsg);
|
||||
SRpcMsg pRpcMsg = {
|
||||
.msgType = TDMT_MND_MQ_CONSUMER_LOST,
|
||||
.pCont = pLostMsg,
|
||||
.contLen = sizeof(SMqConsumerLostMsg),
|
||||
};
|
||||
tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &pRpcMsg);
|
||||
}
|
||||
if (status == MQ_CONSUMER_STATUS__LOST_REBD || status == MQ_CONSUMER_STATUS__READY) {
|
||||
// do nothing
|
||||
|
@ -280,11 +281,12 @@ static int32_t mndProcessMqHbReq(SRpcMsg *pMsg) {
|
|||
SMqConsumerRecoverMsg *pRecoverMsg = rpcMallocCont(sizeof(SMqConsumerRecoverMsg));
|
||||
|
||||
pRecoverMsg->consumerId = consumerId;
|
||||
SRpcMsg *pRpcMsg = taosMemoryCalloc(1, sizeof(SRpcMsg));
|
||||
pRpcMsg->msgType = TDMT_MND_MQ_CONSUMER_RECOVER;
|
||||
pRpcMsg->pCont = pRecoverMsg;
|
||||
pRpcMsg->contLen = sizeof(SMqConsumerRecoverMsg);
|
||||
tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, pRpcMsg);
|
||||
SRpcMsg pRpcMsg = {
|
||||
.msgType = TDMT_MND_MQ_CONSUMER_RECOVER,
|
||||
.pCont = pRecoverMsg,
|
||||
.contLen = sizeof(SMqConsumerRecoverMsg),
|
||||
};
|
||||
tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &pRpcMsg);
|
||||
}
|
||||
|
||||
mndReleaseConsumer(pMnode, pConsumer);
|
||||
|
@ -318,11 +320,12 @@ static int32_t mndProcessAskEpReq(SRpcMsg *pMsg) {
|
|||
SMqConsumerRecoverMsg *pRecoverMsg = rpcMallocCont(sizeof(SMqConsumerRecoverMsg));
|
||||
|
||||
pRecoverMsg->consumerId = consumerId;
|
||||
SRpcMsg *pRpcMsg = taosMemoryCalloc(1, sizeof(SRpcMsg));
|
||||
pRpcMsg->msgType = TDMT_MND_MQ_CONSUMER_RECOVER;
|
||||
pRpcMsg->pCont = pRecoverMsg;
|
||||
pRpcMsg->contLen = sizeof(SMqConsumerRecoverMsg);
|
||||
tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, pRpcMsg);
|
||||
SRpcMsg pRpcMsg = {
|
||||
.msgType = TDMT_MND_MQ_CONSUMER_RECOVER,
|
||||
.pCont = pRecoverMsg,
|
||||
.contLen = sizeof(SMqConsumerRecoverMsg),
|
||||
};
|
||||
tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &pRpcMsg);
|
||||
}
|
||||
#endif
|
||||
|
||||
|
|
|
@ -338,8 +338,8 @@ SMqConsumerEp *tCloneSMqConsumerEp(const SMqConsumerEp *pConsumerEpOld) {
|
|||
return pConsumerEpNew;
|
||||
}
|
||||
|
||||
void tDeleteSMqConsumerEp(SMqConsumerEp *pConsumerEp) {
|
||||
//
|
||||
void tDeleteSMqConsumerEp(void *data) {
|
||||
SMqConsumerEp *pConsumerEp = (SMqConsumerEp*)data;
|
||||
taosArrayDestroyP(pConsumerEp->vgs, (FDelete)tDeleteSMqVgEp);
|
||||
}
|
||||
|
||||
|
|
|
@ -185,7 +185,7 @@ int32_t tqProcessTaskRecoverRsp(STQ* pTq, SRpcMsg* pMsg);
|
|||
int32_t tqProcessTaskRetrieveReq(STQ* pTq, SRpcMsg* pMsg);
|
||||
int32_t tqProcessTaskRetrieveRsp(STQ* pTq, SRpcMsg* pMsg);
|
||||
|
||||
SSubmitReq* tqBlockToSubmit(SVnode* pVnode, const SArray* pBlocks, const STSchema* pSchema, bool createTb, int64_t suid,
|
||||
SSubmitReq* tqBlockToSubmit(SVnode* pVnode, const SArray* pBlocks, const STSchema* pSchema, SSchemaWrapper* pTagSchemaWrapper, bool createTb, int64_t suid,
|
||||
const char* stbFullName, SBatchDeleteReq* pDeleteReq);
|
||||
|
||||
// sma
|
||||
|
|
|
@ -319,8 +319,12 @@ _query:
|
|||
pSchema = tCloneSSchemaWrapper(&meNew.stbEntry.schemaRow);
|
||||
tDecoderClear(&dcNew);
|
||||
tdbTbcClose(pCur);
|
||||
tdbFree(pKey);
|
||||
tdbFree(pVal);
|
||||
goto _exit;
|
||||
}
|
||||
tdbFree(pKey);
|
||||
tdbFree(pVal);
|
||||
tdbTbcClose(pCur);
|
||||
}
|
||||
} else if (me.type == TSDB_CHILD_TABLE) {
|
||||
|
@ -347,11 +351,13 @@ _query:
|
|||
tDecoderClear(&dc);
|
||||
|
||||
_exit:
|
||||
tDecoderClear(&dc);
|
||||
metaULock(pMeta);
|
||||
tdbFree(pData);
|
||||
return pSchema;
|
||||
|
||||
_err:
|
||||
tDecoderClear(&dc);
|
||||
metaULock(pMeta);
|
||||
tdbFree(pData);
|
||||
return NULL;
|
||||
|
@ -383,10 +389,8 @@ int metaTtlSmaller(SMeta *pMeta, uint64_t ttl, SArray *uidList) {
|
|||
ttlKey = *(STtlIdxKey *)pKey;
|
||||
taosArrayPush(uidList, &ttlKey.uid);
|
||||
}
|
||||
tdbTbcClose(pCur);
|
||||
|
||||
tdbFree(pKey);
|
||||
|
||||
tdbTbcClose(pCur);
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
|
|
@ -353,6 +353,8 @@ int32_t buildSnapContext(SMeta* pMeta, int64_t snapVersion, int64_t suid, int8_t
|
|||
metaDebug("tmqsnap init idVersion uid:%" PRIi64 " version:%" PRIi64 " index:%d", *uid, idData->version, idData->index);
|
||||
}
|
||||
|
||||
tdbFree(pKey);
|
||||
tdbFree(pVal);
|
||||
return TDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
|
@ -528,6 +530,7 @@ int32_t getMetafromSnapShot(SSnapContext* ctx, void **pBuf, int32_t *contLen, in
|
|||
}
|
||||
}
|
||||
}
|
||||
taosArrayDestroy(pTagVals);
|
||||
}
|
||||
// SIdInfo* sidInfo = (SIdInfo*)taosHashGet(ctx->idVersion, &me.ctbEntry.suid, sizeof(tb_uid_t));
|
||||
// if(sidInfo->version >= idInfo->version){
|
||||
|
|
|
@ -1192,10 +1192,11 @@ static int metaUpdateTagIdx(SMeta *pMeta, const SMetaEntry *pCtbEntry) {
|
|||
const void *pTagData = NULL; //
|
||||
int32_t nTagData = 0;
|
||||
SDecoder dc = {0};
|
||||
|
||||
int32_t ret = 0;
|
||||
// get super table
|
||||
if (tdbTbGet(pMeta->pUidIdx, &pCtbEntry->ctbEntry.suid, sizeof(tb_uid_t), &pData, &nData) != 0) {
|
||||
return -1;
|
||||
ret = -1;
|
||||
goto end;
|
||||
}
|
||||
tbDbKey.uid = pCtbEntry->ctbEntry.suid;
|
||||
tbDbKey.version = ((SUidIdxVal *)pData)[0].version;
|
||||
|
@ -1221,17 +1222,20 @@ static int metaUpdateTagIdx(SMeta *pMeta, const SMetaEntry *pCtbEntry) {
|
|||
// nTagData = ((const STag *)pCtbEntry->ctbEntry.pTags)->len;
|
||||
pTagData = pCtbEntry->ctbEntry.pTags;
|
||||
nTagData = ((const STag *)pCtbEntry->ctbEntry.pTags)->len;
|
||||
return metaSaveJsonVarToIdx(pMeta, pCtbEntry, pTagColumn);
|
||||
ret = metaSaveJsonVarToIdx(pMeta, pCtbEntry, pTagColumn);
|
||||
goto end;
|
||||
}
|
||||
if (metaCreateTagIdxKey(pCtbEntry->ctbEntry.suid, pTagColumn->colId, pTagData, nTagData, pTagColumn->type,
|
||||
pCtbEntry->uid, &pTagIdxKey, &nTagIdxKey) < 0) {
|
||||
return -1;
|
||||
ret = -1;
|
||||
goto end;
|
||||
}
|
||||
tdbTbUpsert(pMeta->pTagIdx, pTagIdxKey, nTagIdxKey, NULL, 0, &pMeta->txn);
|
||||
end:
|
||||
metaDestroyTagIdxKey(pTagIdxKey);
|
||||
tDecoderClear(&dc);
|
||||
tdbFree(pData);
|
||||
return 0;
|
||||
return ret;
|
||||
}
|
||||
|
||||
static int metaSaveToSkmDb(SMeta *pMeta, const SMetaEntry *pME) {
|
||||
|
|
|
@ -204,7 +204,7 @@ static int32_t tdProcessTSmaInsertImpl(SSma *pSma, int64_t indexUid, const char
|
|||
}
|
||||
|
||||
SBatchDeleteReq deleteReq;
|
||||
SSubmitReq *pSubmitReq = tqBlockToSubmit(pSma->pVnode, (const SArray *)msg, pTsmaStat->pTSchema, true,
|
||||
SSubmitReq *pSubmitReq = tqBlockToSubmit(pSma->pVnode, (const SArray *)msg, pTsmaStat->pTSchema, &pTsmaStat->pTSma->schemaTag, true,
|
||||
pTsmaStat->pTSma->dstTbUid, pTsmaStat->pTSma->dstTbName, &deleteReq);
|
||||
|
||||
if (!pSubmitReq) {
|
||||
|
|
|
@ -51,6 +51,20 @@ void tqCleanUp() {
|
|||
}
|
||||
}
|
||||
|
||||
static void destroySTqHandle(void* data) {
|
||||
STqHandle* pData = (STqHandle*)data;
|
||||
qDestroyTask(pData->execHandle.task);
|
||||
if (pData->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) {
|
||||
} else if (pData->execHandle.subType == TOPIC_SUB_TYPE__DB) {
|
||||
tqCloseReader(pData->execHandle.pExecReader);
|
||||
walCloseReader(pData->pWalReader);
|
||||
taosHashCleanup(pData->execHandle.execDb.pFilterOutTbUid);
|
||||
} else if (pData->execHandle.subType == TOPIC_SUB_TYPE__TABLE){
|
||||
walCloseReader(pData->pWalReader);
|
||||
tqCloseReader(pData->execHandle.pExecReader);
|
||||
}
|
||||
}
|
||||
|
||||
STQ* tqOpen(const char* path, SVnode* pVnode) {
|
||||
STQ* pTq = taosMemoryCalloc(1, sizeof(STQ));
|
||||
if (pTq == NULL) {
|
||||
|
@ -62,6 +76,8 @@ STQ* tqOpen(const char* path, SVnode* pVnode) {
|
|||
|
||||
pTq->pHandle = taosHashInit(64, MurmurHash3_32, true, HASH_ENTRY_LOCK);
|
||||
|
||||
taosHashSetFreeFp(pTq->pHandle, destroySTqHandle);
|
||||
|
||||
pTq->pPushMgr = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_ENTRY_LOCK);
|
||||
|
||||
pTq->pCheckInfo = taosHashInit(64, MurmurHash3_32, true, HASH_ENTRY_LOCK);
|
||||
|
@ -517,6 +533,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) {
|
|||
int64_t fetchVer = fetchOffsetNew.version + 1;
|
||||
pCkHead = taosMemoryMalloc(sizeof(SWalCkHead) + 2048);
|
||||
if (pCkHead == NULL) {
|
||||
tDeleteSTaosxRsp(&taosxRsp);
|
||||
return -1;
|
||||
}
|
||||
|
||||
|
@ -577,14 +594,17 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) {
|
|||
if (tqSendMetaPollRsp(pTq, pMsg, pReq, &metaRsp) < 0) {
|
||||
code = -1;
|
||||
taosMemoryFree(pCkHead);
|
||||
tDeleteSTaosxRsp(&taosxRsp);
|
||||
return code;
|
||||
}
|
||||
code = 0;
|
||||
if (pCkHead) taosMemoryFree(pCkHead);
|
||||
tDeleteSTaosxRsp(&taosxRsp);
|
||||
return code;
|
||||
}
|
||||
}
|
||||
}
|
||||
tDeleteSTaosxRsp(&taosxRsp);
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
|
|
@ -243,14 +243,15 @@ int32_t tqTaosxScanLog(STQ* pTq, STqHandle* pHandle, SSubmitReq* pReq, STaosxRsp
|
|||
}
|
||||
if (pHandle->fetchMeta) {
|
||||
SSubmitBlk* pBlk = pReader->pBlock;
|
||||
if (pBlk->schemaLen > 0) {
|
||||
int32_t schemaLen = htonl(pBlk->schemaLen);
|
||||
if (schemaLen > 0) {
|
||||
if (pRsp->createTableNum == 0) {
|
||||
pRsp->createTableLen = taosArrayInit(0, sizeof(int32_t));
|
||||
pRsp->createTableReq = taosArrayInit(0, sizeof(void*));
|
||||
}
|
||||
void* createReq = taosMemoryCalloc(1, pBlk->schemaLen);
|
||||
memcpy(createReq, pBlk->data, pBlk->schemaLen);
|
||||
taosArrayPush(pRsp->createTableLen, &pBlk->schemaLen);
|
||||
void* createReq = taosMemoryCalloc(1, schemaLen);
|
||||
memcpy(createReq, pBlk->data, schemaLen);
|
||||
taosArrayPush(pRsp->createTableLen, &schemaLen);
|
||||
taosArrayPush(pRsp->createTableReq, &createReq);
|
||||
pRsp->createTableNum++;
|
||||
}
|
||||
|
@ -277,14 +278,15 @@ int32_t tqTaosxScanLog(STQ* pTq, STqHandle* pHandle, SSubmitReq* pReq, STaosxRsp
|
|||
}
|
||||
if (pHandle->fetchMeta) {
|
||||
SSubmitBlk* pBlk = pReader->pBlock;
|
||||
if (pBlk->schemaLen > 0) {
|
||||
int32_t schemaLen = htonl(pBlk->schemaLen);
|
||||
if (schemaLen > 0) {
|
||||
if (pRsp->createTableNum == 0) {
|
||||
pRsp->createTableLen = taosArrayInit(0, sizeof(int32_t));
|
||||
pRsp->createTableReq = taosArrayInit(0, sizeof(void*));
|
||||
}
|
||||
void* createReq = taosMemoryCalloc(1, pBlk->schemaLen);
|
||||
memcpy(createReq, pBlk->data, pBlk->schemaLen);
|
||||
taosArrayPush(pRsp->createTableLen, &pBlk->schemaLen);
|
||||
void* createReq = taosMemoryCalloc(1, schemaLen);
|
||||
memcpy(createReq, pBlk->data, schemaLen);
|
||||
taosArrayPush(pRsp->createTableLen, &schemaLen);
|
||||
taosArrayPush(pRsp->createTableReq, &createReq);
|
||||
pRsp->createTableNum++;
|
||||
}
|
||||
|
|
|
@ -145,6 +145,7 @@ int32_t tqOffsetCommitFile(STqOffsetStore* pStore) {
|
|||
ASSERT(0);
|
||||
tqError("write offset incomplete, len %d, write len %" PRId64, bodyLen, writeLen);
|
||||
taosHashCancelIterate(pStore->pHash, pIter);
|
||||
taosMemoryFree(buf);
|
||||
return -1;
|
||||
}
|
||||
taosMemoryFree(buf);
|
||||
|
|
|
@ -80,6 +80,13 @@ bool isValValidForTable(STqHandle* pHandle, SWalCont *pHead){
|
|||
void* buf = taosMemoryMalloc(tlen);
|
||||
if (NULL == buf) {
|
||||
taosArrayDestroy(reqNew.pArray);
|
||||
for (int32_t iReq = 0; iReq < req.nReqs; iReq++) {
|
||||
pCreateReq = req.pReqs + iReq;
|
||||
taosMemoryFreeClear(pCreateReq->comment);
|
||||
if (pCreateReq->type == TSDB_CHILD_TABLE) {
|
||||
taosArrayDestroy(pCreateReq->ctb.tagName);
|
||||
}
|
||||
}
|
||||
goto end;
|
||||
}
|
||||
SEncoder coderNew = {0};
|
||||
|
@ -91,6 +98,14 @@ bool isValValidForTable(STqHandle* pHandle, SWalCont *pHead){
|
|||
taosMemoryFree(buf);
|
||||
taosArrayDestroy(reqNew.pArray);
|
||||
}
|
||||
|
||||
for (int32_t iReq = 0; iReq < req.nReqs; iReq++) {
|
||||
pCreateReq = req.pReqs + iReq;
|
||||
taosMemoryFreeClear(pCreateReq->comment);
|
||||
if (pCreateReq->type == TSDB_CHILD_TABLE) {
|
||||
taosArrayDestroy(pCreateReq->ctb.tagName);
|
||||
}
|
||||
}
|
||||
} else if (msgType == TDMT_VND_ALTER_TABLE) {
|
||||
SVAlterTbReq req = {0};
|
||||
|
||||
|
|
|
@ -48,7 +48,7 @@ int32_t tqBuildDeleteReq(SVnode* pVnode, const char* stbFullName, const SSDataBl
|
|||
return 0;
|
||||
}
|
||||
|
||||
SSubmitReq* tqBlockToSubmit(SVnode* pVnode, const SArray* pBlocks, const STSchema* pTSchema, bool createTb,
|
||||
SSubmitReq* tqBlockToSubmit(SVnode* pVnode, const SArray* pBlocks, const STSchema* pTSchema, SSchemaWrapper* pTagSchemaWrapper, bool createTb,
|
||||
int64_t suid, const char* stbFullName, SBatchDeleteReq* pDeleteReq) {
|
||||
SSubmitReq* ret = NULL;
|
||||
SArray* schemaReqs = NULL;
|
||||
|
@ -89,6 +89,32 @@ SSubmitReq* tqBlockToSubmit(SVnode* pVnode, const SArray* pBlocks, const STSchem
|
|||
return NULL;
|
||||
}
|
||||
|
||||
SArray *tagName = taosArrayInit(1, TSDB_COL_NAME_LEN);
|
||||
char tagNameStr[TSDB_COL_NAME_LEN] = {0};
|
||||
strcpy(tagNameStr, "group_id");
|
||||
taosArrayPush(tagName, tagNameStr);
|
||||
|
||||
// STag* pTag = NULL;
|
||||
// taosArrayClear(tagArray);
|
||||
// SArray *tagName = taosArrayInit(1, TSDB_COL_NAME_LEN);
|
||||
// for(int j = 0; j < pTagSchemaWrapper->nCols; j++){
|
||||
// STagVal tagVal = {
|
||||
// .cid = pTagSchemaWrapper->pSchema[j].colId,
|
||||
// .type = pTagSchemaWrapper->pSchema[j].type,
|
||||
// .i64 = (int64_t)pDataBlock->info.groupId,
|
||||
// };
|
||||
// taosArrayPush(tagArray, &tagVal);
|
||||
// taosArrayPush(tagName, pTagSchemaWrapper->pSchema[j].name);
|
||||
// }
|
||||
//
|
||||
// tTagNew(tagArray, 1, false, &pTag);
|
||||
// if (pTag == NULL) {
|
||||
// terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
// taosArrayDestroy(tagArray);
|
||||
// taosArrayDestroy(tagName);
|
||||
// return NULL;
|
||||
// }
|
||||
|
||||
SVCreateTbReq createTbReq = {0};
|
||||
SName name = {0};
|
||||
tNameFromString(&name, stbFullName, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
|
||||
|
@ -99,6 +125,8 @@ SSubmitReq* tqBlockToSubmit(SVnode* pVnode, const SArray* pBlocks, const STSchem
|
|||
createTbReq.type = TSDB_CHILD_TABLE;
|
||||
createTbReq.ctb.suid = suid;
|
||||
createTbReq.ctb.pTag = (uint8_t*)pTag;
|
||||
createTbReq.ctb.tagNum = taosArrayGetSize(tagArray);
|
||||
createTbReq.ctb.tagName = tagName;
|
||||
|
||||
int32_t code;
|
||||
int32_t schemaLen;
|
||||
|
@ -113,6 +141,7 @@ SSubmitReq* tqBlockToSubmit(SVnode* pVnode, const SArray* pBlocks, const STSchem
|
|||
void* schemaStr = taosMemoryMalloc(schemaLen);
|
||||
if (schemaStr == NULL) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
tdDestroySVCreateTbReq(&createTbReq);
|
||||
return NULL;
|
||||
}
|
||||
taosArrayPush(schemaReqs, &schemaStr);
|
||||
|
@ -123,6 +152,7 @@ SSubmitReq* tqBlockToSubmit(SVnode* pVnode, const SArray* pBlocks, const STSchem
|
|||
code = tEncodeSVCreateTbReq(&encoder, &createTbReq);
|
||||
if (code < 0) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
tdDestroySVCreateTbReq(&createTbReq);
|
||||
return NULL;
|
||||
}
|
||||
tEncoderClear(&encoder);
|
||||
|
@ -231,7 +261,7 @@ void tqTableSink(SStreamTask* pTask, void* vnode, int64_t ver, void* data) {
|
|||
|
||||
ASSERT(pTask->tbSink.pTSchema);
|
||||
deleteReq.deleteReqs = taosArrayInit(0, sizeof(SSingleDeleteReq));
|
||||
SSubmitReq* submitReq = tqBlockToSubmit(pVnode, pRes, pTask->tbSink.pTSchema, true, pTask->tbSink.stbUid,
|
||||
SSubmitReq* submitReq = tqBlockToSubmit(pVnode, pRes, pTask->tbSink.pTSchema, pTask->tbSink.pSchemaWrapper, true, pTask->tbSink.stbUid,
|
||||
pTask->tbSink.stbFullName, &deleteReq);
|
||||
|
||||
tqDebug("vgId:%d, task %d convert blocks over, put into write-queue", TD_VID(pVnode), pTask->taskId);
|
||||
|
|
|
@ -1342,7 +1342,7 @@ int32_t initQueryTableDataCond(SQueryTableDataCond* pCond, const STableScanPhysi
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
void cleanupQueryTableDataCond(SQueryTableDataCond* pCond) { taosMemoryFree(pCond->colList); }
|
||||
void cleanupQueryTableDataCond(SQueryTableDataCond* pCond) { taosMemoryFreeClear(pCond->colList); }
|
||||
|
||||
int32_t convertFillType(int32_t mode) {
|
||||
int32_t type = TSDB_FILL_NONE;
|
||||
|
|
|
@ -876,6 +876,7 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT
|
|||
tsdbReaderOpen(pInfo->vnode, &pTaskInfo->streamInfo.tableCond, pTaskInfo->tableqinfoList.pTableList,
|
||||
&pInfo->dataReader, NULL);
|
||||
|
||||
cleanupQueryTableDataCond(&pTaskInfo->streamInfo.tableCond);
|
||||
strcpy(pTaskInfo->streamInfo.tbName, mtInfo.tbName);
|
||||
tDeleteSSchemaWrapper(pTaskInfo->streamInfo.schema);
|
||||
pTaskInfo->streamInfo.schema = mtInfo.schema;
|
||||
|
|
|
@ -3350,6 +3350,10 @@ static void cleanupTableSchemaInfo(SSchemaInfo* pSchemaInfo) {
|
|||
tDeleteSSchemaWrapper(pSchemaInfo->qsw);
|
||||
}
|
||||
|
||||
static void cleanupStreamInfo(SStreamTaskInfo* pStreamInfo) {
|
||||
tDeleteSSchemaWrapper(pStreamInfo->schema);
|
||||
}
|
||||
|
||||
static int32_t sortTableGroup(STableListInfo* pTableListInfo) {
|
||||
taosArrayClear(pTableListInfo->pGroupList);
|
||||
SArray* sortSupport = taosArrayInit(16, sizeof(uint64_t));
|
||||
|
@ -4043,6 +4047,7 @@ void doDestroyTask(SExecTaskInfo* pTaskInfo) {
|
|||
doDestroyTableList(&pTaskInfo->tableqinfoList);
|
||||
destroyOperatorInfo(pTaskInfo->pRoot);
|
||||
cleanupTableSchemaInfo(&pTaskInfo->schemaInfo);
|
||||
cleanupStreamInfo(&pTaskInfo->streamInfo);
|
||||
|
||||
nodesDestroyNode((SNode*)pTaskInfo->pSubplan);
|
||||
|
||||
|
|
|
@ -1780,6 +1780,7 @@ static SSDataBlock* doRawScan(SOperatorInfo* pOperator) {
|
|||
qDebug("tmqsnap change get data uid:%ld", mtInfo.uid);
|
||||
qStreamPrepareScan(pTaskInfo, &pTaskInfo->streamInfo.prepareStatus, pInfo->sContext->subType);
|
||||
}
|
||||
tDeleteSSchemaWrapper(mtInfo.schema);
|
||||
qDebug("tmqsnap stream scan tsdb return null");
|
||||
return NULL;
|
||||
} else if (pTaskInfo->streamInfo.prepareStatus.type == TMQ_OFFSET__SNAPSHOT_META) {
|
||||
|
|
|
@ -1059,7 +1059,7 @@ end:
|
|||
for (int i = 0; i < taosArrayGetSize(pTagVals); ++i) {
|
||||
STagVal* p = (STagVal*)taosArrayGet(pTagVals, i);
|
||||
if (IS_VAR_DATA_TYPE(p->type)) {
|
||||
taosMemoryFree(p->pData);
|
||||
taosMemoryFreeClear(p->pData);
|
||||
}
|
||||
}
|
||||
taosArrayDestroy(pTagVals);
|
||||
|
@ -2040,7 +2040,7 @@ end:
|
|||
for (int i = 0; i < taosArrayGetSize(pTagArray); ++i) {
|
||||
STagVal* p = (STagVal*)taosArrayGet(pTagArray, i);
|
||||
if (p->type == TSDB_DATA_TYPE_NCHAR) {
|
||||
taosMemoryFree(p->pData);
|
||||
taosMemoryFreeClear(p->pData);
|
||||
}
|
||||
}
|
||||
taosArrayDestroy(pTagArray);
|
||||
|
|
|
@ -6664,12 +6664,7 @@ static int32_t buildUpdateTagValReq(STranslateContext* pCxt, SAlterTableStmt* pS
|
|||
break;
|
||||
}
|
||||
} while (0);
|
||||
for (int i = 0; i < taosArrayGetSize(pTagVals); ++i) {
|
||||
STagVal* p = (STagVal*)taosArrayGet(pTagVals, i);
|
||||
if (IS_VAR_DATA_TYPE(p->type)) {
|
||||
taosMemoryFree(p->pData);
|
||||
}
|
||||
}
|
||||
|
||||
taosArrayDestroy(pTagVals);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
return code;
|
||||
|
|
|
@ -410,6 +410,12 @@ end:
|
|||
if (retCode == TSDB_CODE_SUCCESS) {
|
||||
tTagNew(pTagVals, 1, true, ppTag);
|
||||
}
|
||||
for (int i = 0; i < taosArrayGetSize(pTagVals); ++i) {
|
||||
STagVal* p = (STagVal*)taosArrayGet(pTagVals, i);
|
||||
if (IS_VAR_DATA_TYPE(p->type)) {
|
||||
taosMemoryFreeClear(p->pData);
|
||||
}
|
||||
}
|
||||
cJSON_Delete(root);
|
||||
return retCode;
|
||||
}
|
||||
|
|
|
@ -632,6 +632,7 @@ int32_t qWorkerProcessDeleteMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, SD
|
|||
|
||||
QW_ERR_JRET(qwProcessDelete(QW_FPARAMS(), &qwMsg, pRes));
|
||||
|
||||
taosMemoryFreeClear(req.msg);
|
||||
QW_SCH_TASK_DLOG("processDelete end, node:%p", node);
|
||||
|
||||
_return:
|
||||
|
|
|
@ -31,18 +31,20 @@ class TDTestCase:
|
|||
while True:
|
||||
dst = queryFile.readline()
|
||||
src = consumeFile.readline()
|
||||
|
||||
if dst:
|
||||
if src:
|
||||
if dst != src:
|
||||
tdLog.exit("compare error: %s != %s"%src, dst)
|
||||
tdLog.exit("compare error: %s != %s"%(src, dst))
|
||||
else:
|
||||
break
|
||||
return
|
||||
|
||||
def checkDropData(self):
|
||||
def checkDropData(self, drop):
|
||||
tdSql.execute('use db_taosx')
|
||||
tdSql.query("show tables")
|
||||
tdSql.checkRows(2)
|
||||
if drop:
|
||||
tdSql.checkRows(10)
|
||||
else:
|
||||
tdSql.checkRows(15)
|
||||
tdSql.query("select * from jt order by i")
|
||||
tdSql.checkRows(2)
|
||||
tdSql.checkData(0, 1, 1)
|
||||
|
@ -50,15 +52,72 @@ class TDTestCase:
|
|||
tdSql.checkData(0, 2, '{"k1":1,"k2":"hello"}')
|
||||
tdSql.checkData(1, 2, None)
|
||||
|
||||
tdSql.query("select * from sttb order by ts")
|
||||
tdSql.checkRows(2)
|
||||
tdSql.checkData(0, 1, 13)
|
||||
tdSql.checkData(1, 1, 16)
|
||||
tdSql.checkData(0, 2, 22)
|
||||
tdSql.checkData(1, 2, 25)
|
||||
tdSql.checkData(0, 5, "sttb3")
|
||||
tdSql.checkData(1, 5, "sttb4")
|
||||
|
||||
tdSql.query("select * from stt order by ts")
|
||||
tdSql.checkRows(2)
|
||||
tdSql.checkData(0, 1, 1)
|
||||
tdSql.checkData(1, 1, 21)
|
||||
tdSql.checkData(0, 2, 2)
|
||||
tdSql.checkData(1, 2, 21)
|
||||
tdSql.checkData(0, 5, "stt3")
|
||||
tdSql.checkData(1, 5, "stt4")
|
||||
|
||||
tdSql.execute('use abc1')
|
||||
tdSql.query("show tables")
|
||||
tdSql.checkRows(2)
|
||||
if drop:
|
||||
tdSql.checkRows(10)
|
||||
else:
|
||||
tdSql.checkRows(15)
|
||||
tdSql.query("select * from jt order by i")
|
||||
tdSql.checkRows(2)
|
||||
tdSql.checkData(0, 1, 1)
|
||||
tdSql.checkData(1, 1, 11)
|
||||
tdSql.checkData(0, 2, '{"k1":1,"k2":"hello"}')
|
||||
tdSql.checkData(1, 2, None)
|
||||
|
||||
tdSql.query("select * from sttb order by ts")
|
||||
tdSql.checkRows(2)
|
||||
tdSql.checkData(0, 1, 13)
|
||||
tdSql.checkData(1, 1, 16)
|
||||
tdSql.checkData(0, 2, 22)
|
||||
tdSql.checkData(1, 2, 25)
|
||||
tdSql.checkData(0, 5, "sttb3")
|
||||
tdSql.checkData(1, 5, "sttb4")
|
||||
|
||||
tdSql.query("select * from stt order by ts")
|
||||
tdSql.checkRows(2)
|
||||
tdSql.checkData(0, 1, 1)
|
||||
tdSql.checkData(1, 1, 21)
|
||||
tdSql.checkData(0, 2, 2)
|
||||
tdSql.checkData(1, 2, 21)
|
||||
tdSql.checkData(0, 5, "stt3")
|
||||
tdSql.checkData(1, 5, "stt4")
|
||||
|
||||
return
|
||||
|
||||
def checkDataTable(self):
|
||||
tdSql.execute('use db_taosx')
|
||||
tdSql.query("select * from meters_summary")
|
||||
tdSql.checkRows(1)
|
||||
tdSql.checkData(0, 1, 120)
|
||||
tdSql.checkData(0, 2, 1)
|
||||
tdSql.checkData(0, 3, "San Francisco")
|
||||
|
||||
tdSql.execute('use abc1')
|
||||
tdSql.query("select * from meters_summary")
|
||||
tdSql.checkRows(1)
|
||||
tdSql.checkData(0, 1, 120)
|
||||
tdSql.checkData(0, 2, 1)
|
||||
tdSql.checkData(0, 3, "San Francisco")
|
||||
|
||||
return
|
||||
|
||||
def checkData(self):
|
||||
|
@ -145,6 +204,19 @@ class TDTestCase:
|
|||
|
||||
self.checkJson(cfgPath, "tmq_taosx_tmp")
|
||||
self.checkData()
|
||||
self.checkDropData(False)
|
||||
|
||||
return
|
||||
|
||||
def checkWal1VgroupTable(self):
|
||||
buildPath = tdCom.getBuildPath()
|
||||
cfgPath = tdCom.getClientCfgPath()
|
||||
cmdStr = '%s/build/bin/tmq_taosx_ci -c %s -sv 1 -dv 1 -t'%(buildPath, cfgPath)
|
||||
tdLog.info(cmdStr)
|
||||
os.system(cmdStr)
|
||||
|
||||
self.checkJson(cfgPath, "tmq_taosx_tmp")
|
||||
self.checkDataTable()
|
||||
|
||||
return
|
||||
|
||||
|
@ -155,6 +227,7 @@ class TDTestCase:
|
|||
os.system(cmdStr)
|
||||
|
||||
self.checkData()
|
||||
self.checkDropData(False)
|
||||
|
||||
return
|
||||
|
||||
|
@ -164,7 +237,7 @@ class TDTestCase:
|
|||
tdLog.info(cmdStr)
|
||||
os.system(cmdStr)
|
||||
|
||||
self.checkDropData()
|
||||
self.checkDropData(True)
|
||||
|
||||
return
|
||||
|
||||
|
@ -177,6 +250,19 @@ class TDTestCase:
|
|||
|
||||
self.checkJson(cfgPath, "tmq_taosx_tmp_snapshot")
|
||||
self.checkData()
|
||||
self.checkDropData(False)
|
||||
|
||||
return
|
||||
|
||||
def checkSnapshot1VgroupTable(self):
|
||||
buildPath = tdCom.getBuildPath()
|
||||
cfgPath = tdCom.getClientCfgPath()
|
||||
cmdStr = '%s/build/bin/tmq_taosx_ci -c %s -sv 1 -dv 1 -s -t'%(buildPath, cfgPath)
|
||||
tdLog.info(cmdStr)
|
||||
os.system(cmdStr)
|
||||
|
||||
self.checkJson(cfgPath, "tmq_taosx_tmp_snapshot")
|
||||
self.checkDataTable()
|
||||
|
||||
return
|
||||
|
||||
|
@ -187,6 +273,7 @@ class TDTestCase:
|
|||
os.system(cmdStr)
|
||||
|
||||
self.checkData()
|
||||
self.checkDropData(False)
|
||||
|
||||
return
|
||||
|
||||
|
@ -196,7 +283,7 @@ class TDTestCase:
|
|||
tdLog.info(cmdStr)
|
||||
os.system(cmdStr)
|
||||
|
||||
self.checkDropData()
|
||||
self.checkDropData(True)
|
||||
|
||||
return
|
||||
|
||||
|
@ -205,6 +292,9 @@ class TDTestCase:
|
|||
self.checkWal1Vgroup()
|
||||
self.checkSnapshot1Vgroup()
|
||||
|
||||
self.checkWal1VgroupTable()
|
||||
self.checkSnapshot1VgroupTable()
|
||||
|
||||
self.checkWalMultiVgroups()
|
||||
self.checkSnapshotMultiVgroups()
|
||||
|
||||
|
|
|
@ -653,23 +653,23 @@ static int32_t meta_msg_process(TAOS_RES* msg, SThreadInfo* pInfo, int32_t msgIn
|
|||
int32_t code = tmq_get_raw(msg, &raw);
|
||||
|
||||
if(code == TSDB_CODE_SUCCESS){
|
||||
int retCode = queryDB(pInfo->taos, "use metadb");
|
||||
if (retCode != 0) {
|
||||
taosFprintfFile(g_fp, "error when use metadb\n");
|
||||
taosCloseFile(&g_fp);
|
||||
exit(-1);
|
||||
}
|
||||
taosFprintfFile(g_fp, "raw:%p\n", &raw);
|
||||
|
||||
tmq_write_raw(pInfo->taos, raw);
|
||||
// int retCode = queryDB(pInfo->taos, "use metadb");
|
||||
// if (retCode != 0) {
|
||||
// taosFprintfFile(g_fp, "error when use metadb\n");
|
||||
// taosCloseFile(&g_fp);
|
||||
// exit(-1);
|
||||
// }
|
||||
// taosFprintfFile(g_fp, "raw:%p\n", &raw);
|
||||
//
|
||||
// tmq_write_raw(pInfo->taos, raw);
|
||||
}
|
||||
|
||||
char* result = tmq_get_json_meta(msg);
|
||||
if(result){
|
||||
if(result && strcmp(result, "") != 0){
|
||||
//printf("meta result: %s\n", result);
|
||||
taosFprintfFile(pInfo->pConsumeMetaFile, "%s\n", result);
|
||||
taosMemoryFree(result);
|
||||
}
|
||||
tmq_free_json_meta(result);
|
||||
}
|
||||
|
||||
totalRows++;
|
||||
|
@ -818,8 +818,12 @@ void loop_consume(SThreadInfo* pInfo) {
|
|||
tmq_res_t msgType = tmq_get_res_type(tmqMsg);
|
||||
if (msgType == TMQ_RES_TABLE_META) {
|
||||
totalRows += meta_msg_process(tmqMsg, pInfo, totalMsgs);
|
||||
} else if (msgType == TMQ_RES_DATA)
|
||||
} else if (msgType == TMQ_RES_DATA){
|
||||
totalRows += data_msg_process(tmqMsg, pInfo, totalMsgs);
|
||||
} else if (msgType == TMQ_RES_METADATA){
|
||||
meta_msg_process(tmqMsg, pInfo, totalMsgs);
|
||||
totalRows += data_msg_process(tmqMsg, pInfo, totalMsgs);
|
||||
}
|
||||
}
|
||||
|
||||
taos_free_result(tmqMsg);
|
||||
|
|
|
@ -54,24 +54,24 @@ static void msg_process(TAOS_RES* msg) {
|
|||
printf("db: %s\n", tmq_get_db_name(msg));
|
||||
printf("vg: %d\n", tmq_get_vgroup_id(msg));
|
||||
TAOS *pConn = use_db();
|
||||
if (tmq_get_res_type(msg) == TMQ_RES_TABLE_META) {
|
||||
if (tmq_get_res_type(msg) == TMQ_RES_TABLE_META || tmq_get_res_type(msg) == TMQ_RES_METADATA) {
|
||||
char* result = tmq_get_json_meta(msg);
|
||||
if (result) {
|
||||
printf("meta result: %s\n", result);
|
||||
}
|
||||
if(g_fp){
|
||||
if(g_fp && strcmp(result, "") != 0){
|
||||
taosFprintfFile(g_fp, result);
|
||||
taosFprintfFile(g_fp, "\n");
|
||||
}
|
||||
|
||||
}
|
||||
tmq_free_json_meta(result);
|
||||
}
|
||||
|
||||
tmq_raw_data raw = {0};
|
||||
tmq_get_raw(msg, &raw);
|
||||
printf("write raw data type: %d\n", raw.raw_type);
|
||||
int32_t ret = tmq_write_raw(pConn, raw);
|
||||
printf("write raw data: %s\n", tmq_err2str(ret));
|
||||
|
||||
tmq_free_raw(raw);
|
||||
taos_close(pConn);
|
||||
}
|
||||
|
||||
|
@ -309,6 +309,41 @@ int buildDatabase(TAOS* pConn, TAOS_RES* pRes){
|
|||
}
|
||||
taos_free_result(pRes);
|
||||
}
|
||||
|
||||
pRes = taos_query(pConn,
|
||||
"create stable if not exists stt (ts timestamp, c1 int, c2 float, c3 binary(16)) tags(t1 int, t3 "
|
||||
"nchar(8), t4 bool)");
|
||||
if (taos_errno(pRes) != 0) {
|
||||
printf("failed to create super table stt, reason:%s\n", taos_errstr(pRes));
|
||||
return -1;
|
||||
}
|
||||
taos_free_result(pRes);
|
||||
|
||||
pRes = taos_query(pConn,
|
||||
"create stable if not exists sttb (ts timestamp, c1 int, c2 float, c3 binary(16)) tags(t1 int, t3 "
|
||||
"nchar(8), t4 bool)");
|
||||
if (taos_errno(pRes) != 0) {
|
||||
printf("failed to create super table sttb, reason:%s\n", taos_errstr(pRes));
|
||||
return -1;
|
||||
}
|
||||
taos_free_result(pRes);
|
||||
|
||||
pRes = taos_query(pConn, "create table if not exists stt1 using stt tags(2, \"stt1\", true) sttb1 using sttb tags(4, \"sttb1\", true) "
|
||||
"stt2 using stt tags(43, \"stt2\", false) sttb2 using sttb tags(54, \"sttb2\", true)");
|
||||
if (taos_errno(pRes) != 0) {
|
||||
printf("failed to create child table stt1, reason:%s\n", taos_errstr(pRes));
|
||||
return -1;
|
||||
}
|
||||
taos_free_result(pRes);
|
||||
|
||||
pRes = taos_query(pConn, "insert into stt3 using stt tags(23, \"stt3\", true) values(now + 1s, 1, 2, 'stt3') sttb3 using sttb tags(4, \"sttb3\", true) values(now + 2s, 13, 22, 'sttb3') "
|
||||
"stt4 using stt tags(433, \"stt4\", false) values(now + 3s, 21, 21, 'stt4') sttb4 using sttb tags(543, \"sttb4\", true) values(now + 4s, 16, 25, 'sttb4')");
|
||||
if (taos_errno(pRes) != 0) {
|
||||
printf("failed to create child table stt1, reason:%s\n", taos_errstr(pRes));
|
||||
return -1;
|
||||
}
|
||||
taos_free_result(pRes);
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
@ -542,16 +577,47 @@ void initLogFile() {
|
|||
}
|
||||
|
||||
if(g_conf.snapShot){
|
||||
if(g_conf.subTable){
|
||||
char *result[] = {
|
||||
"{\"type\":\"create\",\"tableName\":\"st1\",\"tableType\":\"super\",\"columns\":[{\"name\":\"ts\",\"type\":9},{\"name\":\"c1\",\"type\":4},{\"name\":\"c2\",\"type\":6},{\"name\":\"c3\",\"type\":8,\"length\":64},{\"name\":\"c4\",\"type\":5}],\"tags\":[{\"name\":\"t1\",\"type\":4},{\"name\":\"t3\",\"type\":10,\"length\":8},{\"name\":\"t4\",\"type\":1},{\"name\":\"t2\",\"type\":8,\"length\":64}]}",
|
||||
"{\"type\":\"create\",\"tableName\":\"ct0\",\"tableType\":\"child\",\"using\":\"st1\",\"tagNum\":4,\"tags\":[{\"name\":\"t1\",\"type\":4,\"value\":1000},{\"name\":\"t3\",\"type\":10,\"value\":\"\\\"ttt\\\"\"},{\"name\":\"t4\",\"type\":1,\"value\":1}]}",
|
||||
"{\"type\":\"create\",\"tableName\":\"ct1\",\"tableType\":\"child\",\"using\":\"st1\",\"tagNum\":4,\"tags\":[{\"name\":\"t1\",\"type\":4,\"value\":2000}]}",
|
||||
"{\"type\":\"create\",\"tableName\":\"ct2\",\"tableType\":\"child\",\"using\":\"st1\",\"tagNum\":4,\"tags\":[]}",
|
||||
"{\"type\":\"create\",\"tableName\":\"ct3\",\"tableType\":\"child\",\"using\":\"st1\",\"tagNum\":4,\"tags\":[{\"name\":\"t1\",\"type\":4,\"value\":5000}]}",
|
||||
"{\"type\":\"create\",\"tableName\":\"n1\",\"tableType\":\"normal\",\"columns\":[{\"name\":\"ts\",\"type\":9},{\"name\":\"c2\",\"type\":10,\"length\":8},{\"name\":\"cc3\",\"type\":5}],\"tags\":[]}",
|
||||
"{\"type\":\"create\",\"tableName\":\"jt\",\"tableType\":\"super\",\"columns\":[{\"name\":\"ts\",\"type\":9},{\"name\":\"i\",\"type\":4}],\"tags\":[{\"name\":\"t\",\"type\":15}]}",
|
||||
"{\"type\":\"create\",\"tableName\":\"jt1\",\"tableType\":\"child\",\"using\":\"jt\",\"tagNum\":1,\"tags\":[{\"name\":\"t\",\"type\":15,\"value\":\"{\\\"k1\\\":1,\\\"k2\\\":\\\"hello\\\"}\"}]}",
|
||||
"{\"type\":\"create\",\"tableName\":\"jt2\",\"tableType\":\"child\",\"using\":\"jt\",\"tagNum\":1,\"tags\":[]}",
|
||||
"{\"type\":\"create\",\"tableType\":\"super\",\"tableName\":\"meters_summary\",\"columns\":[{\"name\":\"_wstart\",\"type\":9},{\"name\":\"current\",\"type\":6},{\"name\":\"groupid\",\"type\":4},{\"name\":\"location\",\"type\":8,\"length\":16}],\"tags\":[{\"name\":\"group_id\",\"type\":14}]}",
|
||||
"{\"type\":\"create\",\"tableType\":\"child\",\"tableName\":\"t_d2a450ee819dcf7576f0282d9ac22dbc\",\"using\":\"meters_summary\",\"tagNum\":1,\"tags\":[{\"name\":\"group_id\",\"type\":14,\"value\":1.313555008277358e+19}],\"createList\":[]}"
|
||||
};
|
||||
for(int i = 0; i < sizeof(result)/sizeof(result[0]); i++){
|
||||
taosFprintfFile(pFile2, result[i]);
|
||||
taosFprintfFile(pFile2, "\n");
|
||||
}
|
||||
}else{
|
||||
char *result[] = {
|
||||
"{\"type\":\"create\",\"tableType\":\"super\",\"tableName\":\"st1\",\"columns\":[{\"name\":\"ts\",\"type\":9},{\"name\":\"c1\",\"type\":4},{\"name\":\"c2\",\"type\":6},{\"name\":\"c3\",\"type\":8,\"length\":64},{\"name\":\"c4\",\"type\":5}],\"tags\":[{\"name\":\"t1\",\"type\":4},{\"name\":\"t3\",\"type\":10,\"length\":8},{\"name\":\"t4\",\"type\":1},{\"name\":\"t2\",\"type\":8,\"length\":64}]}",
|
||||
"{\"type\":\"create\",\"tableType\":\"child\",\"tableName\":\"ct0\",\"using\":\"st1\",\"tagNum\":4,\"tags\":[{\"name\":\"t1\",\"type\":4,\"value\":1000},{\"name\":\"t3\",\"type\":10,\"value\":\"\\\"ttt\\\"\"},{\"name\":\"t4\",\"type\":1,\"value\":1}],\"createList\":[]}",
|
||||
"{\"type\":\"create\",\"tableType\":\"child\",\"tableName\":\"ct1\",\"using\":\"st1\",\"tagNum\":4,\"tags\":[{\"name\":\"t1\",\"type\":4,\"value\":2000}],\"createList\":[]}",
|
||||
"{\"type\":\"create\",\"tableType\":\"child\",\"tableName\":\"ct2\",\"using\":\"st1\",\"tagNum\":4,\"tags\":[],\"createList\":[]}",
|
||||
"{\"type\":\"create\",\"tableType\":\"child\",\"tableName\":\"ct3\",\"using\":\"st1\",\"tagNum\":4,\"tags\":[{\"name\":\"t1\",\"type\":4,\"value\":5000}],\"createList\":[]}",
|
||||
"{\"type\":\"create\",\"tableType\":\"normal\",\"tableName\":\"n1\",\"columns\":[{\"name\":\"ts\",\"type\":9},{\"name\":\"c2\",\"type\":10,\"length\":8},{\"name\":\"cc3\",\"type\":5}],\"tags\":[]}",
|
||||
"{\"type\":\"create\",\"tableType\":\"super\",\"tableName\":\"jt\",\"columns\":[{\"name\":\"ts\",\"type\":9},{\"name\":\"i\",\"type\":4}],\"tags\":[{\"name\":\"t\",\"type\":15}]}",
|
||||
"{\"type\":\"create\",\"tableType\":\"child\",\"tableName\":\"jt1\",\"using\":\"jt\",\"tagNum\":1,\"tags\":[{\"name\":\"t\",\"type\":15,\"value\":\"{\\\"k1\\\":1,\\\"k2\\\":\\\"hello\\\"}\"}],\"createList\":[]}",
|
||||
"{\"type\":\"create\",\"tableType\":\"child\",\"tableName\":\"jt2\",\"using\":\"jt\",\"tagNum\":1,\"tags\":[],\"createList\":[]}",
|
||||
"{\"type\":\"create\",\"tableType\":\"super\",\"tableName\":\"stt\",\"columns\":[{\"name\":\"ts\",\"type\":9},{\"name\":\"c1\",\"type\":4},{\"name\":\"c2\",\"type\":6},{\"name\":\"c3\",\"type\":8,\"length\":16}],\"tags\":[{\"name\":\"t1\",\"type\":4},{\"name\":\"t3\",\"type\":10,\"length\":8},{\"name\":\"t4\",\"type\":1}]}",
|
||||
"{\"type\":\"create\",\"tableType\":\"super\",\"tableName\":\"sttb\",\"columns\":[{\"name\":\"ts\",\"type\":9},{\"name\":\"c1\",\"type\":4},{\"name\":\"c2\",\"type\":6},{\"name\":\"c3\",\"type\":8,\"length\":16}],\"tags\":[{\"name\":\"t1\",\"type\":4},{\"name\":\"t3\",\"type\":10,\"length\":8},{\"name\":\"t4\",\"type\":1}]}",
|
||||
"{\"type\":\"create\",\"tableType\":\"child\",\"tableName\":\"stt1\",\"using\":\"stt\",\"tagNum\":3,\"tags\":[{\"name\":\"t1\",\"type\":4,\"value\":2},{\"name\":\"t3\",\"type\":10,\"value\":\"\\\"stt1\\\"\"},{\"name\":\"t4\",\"type\":1,\"value\":1}],\"createList\":[]}",
|
||||
"{\"type\":\"create\",\"tableType\":\"child\",\"tableName\":\"sttb1\",\"using\":\"sttb\",\"tagNum\":3,\"tags\":[{\"name\":\"t1\",\"type\":4,\"value\":4},{\"name\":\"t3\",\"type\":10,\"value\":\"\\\"sttb1\\\"\"},{\"name\":\"t4\",\"type\":1,\"value\":1}],\"createList\":[]}",
|
||||
"{\"type\":\"create\",\"tableType\":\"child\",\"tableName\":\"stt2\",\"using\":\"stt\",\"tagNum\":3,\"tags\":[{\"name\":\"t1\",\"type\":4,\"value\":43},{\"name\":\"t3\",\"type\":10,\"value\":\"\\\"stt2\\\"\"},{\"name\":\"t4\",\"type\":1,\"value\":0}],\"createList\":[]}",
|
||||
"{\"type\":\"create\",\"tableType\":\"child\",\"tableName\":\"sttb2\",\"using\":\"sttb\",\"tagNum\":3,\"tags\":[{\"name\":\"t1\",\"type\":4,\"value\":54},{\"name\":\"t3\",\"type\":10,\"value\":\"\\\"sttb2\\\"\"},{\"name\":\"t4\",\"type\":1,\"value\":1}],\"createList\":[]}",
|
||||
"{\"type\":\"create\",\"tableType\":\"child\",\"tableName\":\"stt3\",\"using\":\"stt\",\"tagNum\":3,\"tags\":[{\"name\":\"t1\",\"type\":4,\"value\":23},{\"name\":\"t3\",\"type\":10,\"value\":\"\\\"stt3\\\"\"},{\"name\":\"t4\",\"type\":1,\"value\":1}],\"createList\":[]}",
|
||||
"{\"type\":\"create\",\"tableType\":\"child\",\"tableName\":\"sttb3\",\"using\":\"sttb\",\"tagNum\":3,\"tags\":[{\"name\":\"t1\",\"type\":4,\"value\":4},{\"name\":\"t3\",\"type\":10,\"value\":\"\\\"sttb3\\\"\"},{\"name\":\"t4\",\"type\":1,\"value\":1}],\"createList\":[]}",
|
||||
"{\"type\":\"create\",\"tableType\":\"child\",\"tableName\":\"stt4\",\"using\":\"stt\",\"tagNum\":3,\"tags\":[{\"name\":\"t1\",\"type\":4,\"value\":433},{\"name\":\"t3\",\"type\":10,\"value\":\"\\\"stt4\\\"\"},{\"name\":\"t4\",\"type\":1,\"value\":0}],\"createList\":[]}",
|
||||
"{\"type\":\"create\",\"tableType\":\"child\",\"tableName\":\"sttb4\",\"using\":\"sttb\",\"tagNum\":3,\"tags\":[{\"name\":\"t1\",\"type\":4,\"value\":543},{\"name\":\"t3\",\"type\":10,\"value\":\"\\\"sttb4\\\"\"},{\"name\":\"t4\",\"type\":1,\"value\":1}],\"createList\":[]}"
|
||||
};
|
||||
for(int i = 0; i < sizeof(result)/sizeof(result[0]); i++){
|
||||
taosFprintfFile(pFile2, result[i]);
|
||||
taosFprintfFile(pFile2, "\n");
|
||||
}
|
||||
}
|
||||
}else{
|
||||
if(g_conf.subTable){
|
||||
char *result[] = {
|
||||
"{\"type\":\"create\",\"tableType\":\"super\",\"tableName\":\"meters_summary\",\"columns\":[{\"name\":\"_wstart\",\"type\":9},{\"name\":\"current\",\"type\":6},{\"name\":\"groupid\",\"type\":4},{\"name\":\"location\",\"type\":8,\"length\":16}],\"tags\":[{\"name\":\"group_id\",\"type\":14}]}",
|
||||
"{\"type\":\"create\",\"tableType\":\"child\",\"tableName\":\"t_d2a450ee819dcf7576f0282d9ac22dbc\",\"using\":\"meters_summary\",\"tagNum\":1,\"tags\":[{\"name\":\"group_id\",\"type\":14,\"value\":1.313555008277358e+19}],\"createList\":[]}"
|
||||
};
|
||||
|
||||
for(int i = 0; i < sizeof(result)/sizeof(result[0]); i++){
|
||||
|
@ -560,24 +626,28 @@ void initLogFile() {
|
|||
}
|
||||
}else{
|
||||
char *result[] = {
|
||||
"{\"type\":\"create\",\"tableName\":\"st1\",\"tableType\":\"super\",\"columns\":[{\"name\":\"ts\",\"type\":9},{\"name\":\"c1\",\"type\":4},{\"name\":\"c2\",\"type\":6},{\"name\":\"c3\",\"type\":8,\"length\":16}],\"tags\":[{\"name\":\"t1\",\"type\":4},{\"name\":\"t3\",\"type\":10,\"length\":8},{\"name\":\"t4\",\"type\":1}]}",
|
||||
"{\"type\":\"create\",\"tableName\":\"ct0\",\"tableType\":\"child\",\"using\":\"st1\",\"tagNum\":3,\"tags\":[{\"name\":\"t1\",\"type\":4,\"value\":1000},{\"name\":\"t3\",\"type\":10,\"value\":\"\\\"ttt\\\"\"},{\"name\":\"t4\",\"type\":1,\"value\":1}]}",
|
||||
"{\"type\":\"create\",\"tableName\":\"ct1\",\"tableType\":\"child\",\"using\":\"st1\",\"tagNum\":3,\"tags\":[{\"name\":\"t1\",\"type\":4,\"value\":2000}]}",
|
||||
"{\"type\":\"create\",\"tableName\":\"ct2\",\"tableType\":\"child\",\"using\":\"st1\",\"tagNum\":3,\"tags\":[]}",
|
||||
"{\"type\":\"create\",\"tableName\":\"ct3\",\"tableType\":\"child\",\"using\":\"st1\",\"tagNum\":3,\"tags\":[{\"name\":\"t1\",\"type\":4,\"value\":3000}]}",
|
||||
"{\"type\":\"alter\",\"tableName\":\"st1\",\"tableType\":\"super\",\"alterType\":5,\"colName\":\"c4\",\"colType\":5}",
|
||||
"{\"type\":\"alter\",\"tableName\":\"st1\",\"tableType\":\"super\",\"alterType\":7,\"colName\":\"c3\",\"colType\":8,\"colLength\":64}",
|
||||
"{\"type\":\"alter\",\"tableName\":\"st1\",\"tableType\":\"super\",\"alterType\":1,\"colName\":\"t2\",\"colType\":8,\"colLength\":64}",
|
||||
"{\"type\":\"alter\",\"tableName\":\"ct3\",\"tableType\":\"child\",\"alterType\":4,\"colName\":\"t1\",\"colValue\":\"5000\",\"colValueNull\":false}",
|
||||
"{\"type\":\"create\",\"tableName\":\"n1\",\"tableType\":\"normal\",\"columns\":[{\"name\":\"ts\",\"type\":9},{\"name\":\"c1\",\"type\":4},{\"name\":\"c2\",\"type\":10,\"length\":4}],\"tags\":[]}",
|
||||
"{\"type\":\"alter\",\"tableName\":\"n1\",\"tableType\":\"normal\",\"alterType\":5,\"colName\":\"c3\",\"colType\":5}",
|
||||
"{\"type\":\"alter\",\"tableName\":\"n1\",\"tableType\":\"normal\",\"alterType\":7,\"colName\":\"c2\",\"colType\":10,\"colLength\":8}",
|
||||
"{\"type\":\"alter\",\"tableName\":\"n1\",\"tableType\":\"normal\",\"alterType\":10,\"colName\":\"c3\",\"colNewName\":\"cc3\"}",
|
||||
"{\"type\":\"alter\",\"tableName\":\"n1\",\"tableType\":\"normal\",\"alterType\":9}",
|
||||
"{\"type\":\"alter\",\"tableName\":\"n1\",\"tableType\":\"normal\",\"alterType\":6,\"colName\":\"c1\"}",
|
||||
"{\"type\":\"create\",\"tableName\":\"jt\",\"tableType\":\"super\",\"columns\":[{\"name\":\"ts\",\"type\":9},{\"name\":\"i\",\"type\":4}],\"tags\":[{\"name\":\"t\",\"type\":15}]}",
|
||||
"{\"type\":\"create\",\"tableName\":\"jt1\",\"tableType\":\"child\",\"using\":\"jt\",\"tagNum\":1,\"tags\":[{\"name\":\"t\",\"type\":15,\"value\":\"{\\\"k1\\\":1,\\\"k2\\\":\\\"hello\\\"}\"}]}",
|
||||
"{\"type\":\"create\",\"tableName\":\"jt2\",\"tableType\":\"child\",\"using\":\"jt\",\"tagNum\":1,\"tags\":[]}"
|
||||
"{\"type\":\"create\",\"tableType\":\"super\",\"tableName\":\"st1\",\"columns\":[{\"name\":\"ts\",\"type\":9},{\"name\":\"c1\",\"type\":4},{\"name\":\"c2\",\"type\":6},{\"name\":\"c3\",\"type\":8,\"length\":16}],\"tags\":[{\"name\":\"t1\",\"type\":4},{\"name\":\"t3\",\"type\":10,\"length\":8},{\"name\":\"t4\",\"type\":1}]}",
|
||||
"{\"type\":\"create\",\"tableType\":\"child\",\"tableName\":\"ct0\",\"using\":\"st1\",\"tagNum\":3,\"tags\":[{\"name\":\"t1\",\"type\":4,\"value\":1000},{\"name\":\"t3\",\"type\":10,\"value\":\"\\\"ttt\\\"\"},{\"name\":\"t4\",\"type\":1,\"value\":1}],\"createList\":[]}",
|
||||
"{\"type\":\"create\",\"tableType\":\"child\",\"tableName\":\"ct1\",\"using\":\"st1\",\"tagNum\":3,\"tags\":[{\"name\":\"t1\",\"type\":4,\"value\":2000}],\"createList\":[]}",
|
||||
"{\"type\":\"create\",\"tableType\":\"child\",\"tableName\":\"ct2\",\"using\":\"st1\",\"tagNum\":3,\"tags\":[],\"createList\":[]}",
|
||||
"{\"type\":\"create\",\"tableType\":\"child\",\"tableName\":\"ct3\",\"using\":\"st1\",\"tagNum\":3,\"tags\":[{\"name\":\"t1\",\"type\":4,\"value\":3000}],\"createList\":[]}",
|
||||
"{\"type\":\"alter\",\"tableType\":\"super\",\"tableName\":\"st1\",\"alterType\":5,\"colName\":\"c4\",\"colType\":5}",
|
||||
"{\"type\":\"alter\",\"tableType\":\"super\",\"tableName\":\"st1\",\"alterType\":7,\"colName\":\"c3\",\"colType\":8,\"colLength\":64}",
|
||||
"{\"type\":\"alter\",\"tableType\":\"super\",\"tableName\":\"st1\",\"alterType\":1,\"colName\":\"t2\",\"colType\":8,\"colLength\":64}",
|
||||
"{\"type\":\"alter\",\"tableType\":\"child\",\"tableName\":\"ct3\",\"alterType\":4,\"colName\":\"t1\",\"colValue\":\"5000\",\"colValueNull\":false}",
|
||||
"{\"type\":\"create\",\"tableType\":\"normal\",\"tableName\":\"n1\",\"columns\":[{\"name\":\"ts\",\"type\":9},{\"name\":\"c1\",\"type\":4},{\"name\":\"c2\",\"type\":10,\"length\":4}],\"tags\":[]}",
|
||||
"{\"type\":\"alter\",\"tableType\":\"normal\",\"tableName\":\"n1\",\"alterType\":5,\"colName\":\"c3\",\"colType\":5}",
|
||||
"{\"type\":\"alter\",\"tableType\":\"normal\",\"tableName\":\"n1\",\"alterType\":7,\"colName\":\"c2\",\"colType\":10,\"colLength\":8}",
|
||||
"{\"type\":\"alter\",\"tableType\":\"normal\",\"tableName\":\"n1\",\"alterType\":10,\"colName\":\"c3\",\"colNewName\":\"cc3\"}",
|
||||
"{\"type\":\"alter\",\"tableType\":\"normal\",\"tableName\":\"n1\",\"alterType\":9}",
|
||||
"{\"type\":\"alter\",\"tableType\":\"normal\",\"tableName\":\"n1\",\"alterType\":6,\"colName\":\"c1\"}",
|
||||
"{\"type\":\"create\",\"tableType\":\"super\",\"tableName\":\"jt\",\"columns\":[{\"name\":\"ts\",\"type\":9},{\"name\":\"i\",\"type\":4}],\"tags\":[{\"name\":\"t\",\"type\":15}]}",
|
||||
"{\"type\":\"create\",\"tableType\":\"child\",\"tableName\":\"jt1\",\"using\":\"jt\",\"tagNum\":1,\"tags\":[{\"name\":\"t\",\"type\":15,\"value\":\"{\\\"k1\\\":1,\\\"k2\\\":\\\"hello\\\"}\"}],\"createList\":[]}",
|
||||
"{\"type\":\"create\",\"tableType\":\"child\",\"tableName\":\"jt2\",\"using\":\"jt\",\"tagNum\":1,\"tags\":[],\"createList\":[]}",
|
||||
"{\"type\":\"create\",\"tableType\":\"super\",\"tableName\":\"stt\",\"columns\":[{\"name\":\"ts\",\"type\":9},{\"name\":\"c1\",\"type\":4},{\"name\":\"c2\",\"type\":6},{\"name\":\"c3\",\"type\":8,\"length\":16}],\"tags\":[{\"name\":\"t1\",\"type\":4},{\"name\":\"t3\",\"type\":10,\"length\":8},{\"name\":\"t4\",\"type\":1}]}",
|
||||
"{\"type\":\"create\",\"tableType\":\"super\",\"tableName\":\"sttb\",\"columns\":[{\"name\":\"ts\",\"type\":9},{\"name\":\"c1\",\"type\":4},{\"name\":\"c2\",\"type\":6},{\"name\":\"c3\",\"type\":8,\"length\":16}],\"tags\":[{\"name\":\"t1\",\"type\":4},{\"name\":\"t3\",\"type\":10,\"length\":8},{\"name\":\"t4\",\"type\":1}]}",
|
||||
"{\"type\":\"create\",\"tableType\":\"child\",\"tableName\":\"stt1\",\"using\":\"stt\",\"tagNum\":3,\"tags\":[{\"name\":\"t1\",\"type\":4,\"value\":2},{\"name\":\"t3\",\"type\":10,\"value\":\"\\\"stt1\\\"\"},{\"name\":\"t4\",\"type\":1,\"value\":1}],\"createList\":[{\"tableName\":\"stt1\",\"using\":\"stt\",\"tagNum\":3,\"tags\":[{\"name\":\"t1\",\"type\":4,\"value\":2},{\"name\":\"t3\",\"type\":10,\"value\":\"\\\"stt1\\\"\"},{\"name\":\"t4\",\"type\":1,\"value\":1}]},{\"tableName\":\"sttb1\",\"using\":\"sttb\",\"tagNum\":3,\"tags\":[{\"name\":\"t1\",\"type\":4,\"value\":4},{\"name\":\"t3\",\"type\":10,\"value\":\"\\\"sttb1\\\"\"},{\"name\":\"t4\",\"type\":1,\"value\":1}]},{\"tableName\":\"stt2\",\"using\":\"stt\",\"tagNum\":3,\"tags\":[{\"name\":\"t1\",\"type\":4,\"value\":43},{\"name\":\"t3\",\"type\":10,\"value\":\"\\\"stt2\\\"\"},{\"name\":\"t4\",\"type\":1,\"value\":0}]},{\"tableName\":\"sttb2\",\"using\":\"sttb\",\"tagNum\":3,\"tags\":[{\"name\":\"t1\",\"type\":4,\"value\":54},{\"name\":\"t3\",\"type\":10,\"value\":\"\\\"sttb2\\\"\"},{\"name\":\"t4\",\"type\":1,\"value\":1}]}]}",
|
||||
"{\"type\":\"create\",\"tableType\":\"child\",\"tableName\":\"stt3\",\"using\":\"stt\",\"tagNum\":3,\"tags\":[{\"name\":\"t1\",\"type\":4,\"value\":23},{\"name\":\"t3\",\"type\":10,\"value\":\"\\\"stt3\\\"\"},{\"name\":\"t4\",\"type\":1,\"value\":1}],\"createList\":[{\"tableName\":\"stt3\",\"using\":\"stt\",\"tagNum\":3,\"tags\":[{\"name\":\"t1\",\"type\":4,\"value\":23},{\"name\":\"t3\",\"type\":10,\"value\":\"\\\"stt3\\\"\"},{\"name\":\"t4\",\"type\":1,\"value\":1}]},{\"tableName\":\"sttb3\",\"using\":\"sttb\",\"tagNum\":3,\"tags\":[{\"name\":\"t1\",\"type\":4,\"value\":4},{\"name\":\"t3\",\"type\":10,\"value\":\"\\\"sttb3\\\"\"},{\"name\":\"t4\",\"type\":1,\"value\":1}]},{\"tableName\":\"stt4\",\"using\":\"stt\",\"tagNum\":3,\"tags\":[{\"name\":\"t1\",\"type\":4,\"value\":433},{\"name\":\"t3\",\"type\":10,\"value\":\"\\\"stt4\\\"\"},{\"name\":\"t4\",\"type\":1,\"value\":0}]},{\"tableName\":\"sttb4\",\"using\":\"sttb\",\"tagNum\":3,\"tags\":[{\"name\":\"t1\",\"type\":4,\"value\":543},{\"name\":\"t3\",\"type\":10,\"value\":\"\\\"sttb4\\\"\"},{\"name\":\"t4\",\"type\":1,\"value\":1}]}]}"
|
||||
};
|
||||
|
||||
for(int i = 0; i < sizeof(result)/sizeof(result[0]); i++){
|
||||
|
@ -585,6 +655,7 @@ void initLogFile() {
|
|||
taosFprintfFile(pFile2, "\n");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
taosCloseFile(&pFile2);
|
||||
}
|
||||
|
@ -619,5 +690,6 @@ int main(int argc, char* argv[]) {
|
|||
tmq_t* tmq = build_consumer();
|
||||
tmq_list_t* topic_list = build_topic_list();
|
||||
basic_consume_loop(tmq, topic_list);
|
||||
tmq_list_destroy(topic_list);
|
||||
taosCloseFile(&g_fp);
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue