feat:add tmq get data interface

This commit is contained in:
wangmm0220 2022-07-26 17:43:22 +08:00
parent 4ee32f4d4a
commit ba1962cacf
17 changed files with 531 additions and 152 deletions

View File

@ -29,7 +29,7 @@ static void msg_process(TAOS_RES* msg) {
printf("vg: %d\n", tmq_get_vgroup_id(msg)); printf("vg: %d\n", tmq_get_vgroup_id(msg));
if (tmq_get_res_type(msg) == TMQ_RES_TABLE_META) { if (tmq_get_res_type(msg) == TMQ_RES_TABLE_META) {
tmq_raw_data raw = {0}; tmq_raw_data raw = {0};
int32_t code = tmq_get_raw_meta(msg, &raw); int32_t code = tmq_get_raw(msg, &raw);
if (code == 0) { if (code == 0) {
TAOS* pConn = taos_connect("192.168.1.86", "root", "taosdata", NULL, 0); TAOS* pConn = taos_connect("192.168.1.86", "root", "taosdata", NULL, 0);
if (pConn == NULL) { if (pConn == NULL) {
@ -50,7 +50,7 @@ static void msg_process(TAOS_RES* msg) {
} }
taos_free_result(pRes); taos_free_result(pRes);
int32_t ret = taos_write_raw_meta(pConn, raw); int32_t ret = tmq_write_raw(pConn, raw);
printf("write raw data: %s\n", tmq_err2str(ret)); printf("write raw data: %s\n", tmq_err2str(ret));
taos_close(pConn); taos_close(pConn);
} }

View File

@ -49,18 +49,25 @@ static void msg_process(TAOS_RES* msg) {
printf("meta result: %s\n", result); printf("meta result: %s\n", result);
} }
tmq_free_json_meta(result); tmq_free_json_meta(result);
tmq_raw_data raw = {0};
tmq_get_raw_meta(msg, &raw);
int32_t ret = taos_write_raw_meta(pConn, raw);
printf("write raw meta: %s\n", tmq_err2str(ret));
} }
if(tmq_get_res_type(msg) == TMQ_RES_DATA){ tmq_raw_data raw = {0};
int32_t ret =taos_write_raw_data(pConn, msg); tmq_get_raw(msg, &raw);
printf("write raw data: %s\n", tmq_err2str(ret)); int32_t ret = tmq_write_raw(pConn, raw);
} printf("write raw data: %s\n", tmq_err2str(ret));
// else{
// while(1){
// int numOfRows = 0;
// void *pData = NULL;
// taos_fetch_raw_block(msg, &numOfRows, &pData);
// if(numOfRows == 0) break;
// printf("write data: tbname:%s, numOfRows:%d\n", tmq_get_table_name(msg), numOfRows);
// int ret = taos_write_raw_block(pConn, numOfRows, pData, tmq_get_table_name(msg));
// printf("write raw data: %s\n", tmq_err2str(ret));
// }
// }
taos_close(pConn); taos_close(pConn);
} }
@ -121,7 +128,7 @@ int32_t init_env() {
} }
taos_free_result(pRes); taos_free_result(pRes);
pRes = taos_query(pConn, "insert into ct0 values(now, 1, 2, 'a')"); pRes = taos_query(pConn, "insert into ct0 values(1626006833600, 1, 2, 'a')");
if (taos_errno(pRes) != 0) { if (taos_errno(pRes) != 0) {
printf("failed to insert into ct0, reason:%s\n", taos_errstr(pRes)); printf("failed to insert into ct0, reason:%s\n", taos_errstr(pRes));
return -1; return -1;
@ -142,7 +149,7 @@ int32_t init_env() {
} }
taos_free_result(pRes); taos_free_result(pRes);
pRes = taos_query(pConn, "insert into ct1 values(now, 3, 4, 'b')"); pRes = taos_query(pConn, "insert into ct1 values(1626006833600, 3, 4, 'b')");
if (taos_errno(pRes) != 0) { if (taos_errno(pRes) != 0) {
printf("failed to insert into ct1, reason:%s\n", taos_errstr(pRes)); printf("failed to insert into ct1, reason:%s\n", taos_errstr(pRes));
return -1; return -1;
@ -156,7 +163,7 @@ int32_t init_env() {
} }
taos_free_result(pRes); taos_free_result(pRes);
pRes = taos_query(pConn, "insert into ct3 values(now, 5, 6, 'c') ct1 values(now+1s, 2, 3, 'sds') (now+2s, 4, 5, 'ddd') ct0 values(now+1s, 4, 3, 'hwj') ct1 values(now+5s, 23, 32, 's21ds')"); pRes = taos_query(pConn, "insert into ct3 values(1626006833600, 5, 6, 'c') ct1 values(1626006833601, 2, 3, 'sds') (1626006833602, 4, 5, 'ddd') ct0 values(1626006833602, 4, 3, 'hwj') ct1 values(now+5s, 23, 32, 's21ds')");
if (taos_errno(pRes) != 0) { if (taos_errno(pRes) != 0) {
printf("failed to insert into ct3, reason:%s\n", taos_errstr(pRes)); printf("failed to insert into ct3, reason:%s\n", taos_errstr(pRes));
return -1; return -1;
@ -177,7 +184,14 @@ int32_t init_env() {
} }
taos_free_result(pRes); taos_free_result(pRes);
pRes = taos_query(pConn, "insert into ct3 values(now+7s, 53, 63, 'cffffffffffffffffffffffffffff', 8989898899999) (now+9s, 51, 62, 'c333', 940)"); pRes = taos_query(pConn, "insert into ct3 values(1626006833605, 53, 63, 'cffffffffffffffffffffffffffff', 8989898899999) (1626006833609, 51, 62, 'c333', 940)");
if (taos_errno(pRes) != 0) {
printf("failed to insert into ct3, reason:%s\n", taos_errstr(pRes));
return -1;
}
taos_free_result(pRes);
pRes = taos_query(pConn, "insert into ct3 select * from ct1");
if (taos_errno(pRes) != 0) { if (taos_errno(pRes) != 0) {
printf("failed to insert into ct3, reason:%s\n", taos_errstr(pRes)); printf("failed to insert into ct3, reason:%s\n", taos_errstr(pRes));
return -1; return -1;
@ -198,19 +212,26 @@ int32_t init_env() {
} }
taos_free_result(pRes); taos_free_result(pRes);
// pRes = taos_query(pConn, "drop table ct3 ct1"); pRes = taos_query(pConn, "delete from abc1 .ct3 where ts < 1626006833606");
// if (taos_errno(pRes) != 0) { if (taos_errno(pRes) != 0) {
// printf("failed to drop child table ct3, reason:%s\n", taos_errstr(pRes)); printf("failed to insert into ct3, reason:%s\n", taos_errstr(pRes));
// return -1; return -1;
// } }
// taos_free_result(pRes); taos_free_result(pRes);
//
// pRes = taos_query(pConn, "drop table st1"); pRes = taos_query(pConn, "drop table ct3 ct1");
// if (taos_errno(pRes) != 0) { if (taos_errno(pRes) != 0) {
// printf("failed to drop super table st1, reason:%s\n", taos_errstr(pRes)); printf("failed to drop child table ct3, reason:%s\n", taos_errstr(pRes));
// return -1; return -1;
// } }
// taos_free_result(pRes); taos_free_result(pRes);
pRes = taos_query(pConn, "drop table st1");
if (taos_errno(pRes) != 0) {
printf("failed to drop super table st1, reason:%s\n", taos_errstr(pRes));
return -1;
}
taos_free_result(pRes);
pRes = taos_query(pConn, "create table if not exists n1(ts timestamp, c1 int, c2 nchar(4))"); pRes = taos_query(pConn, "create table if not exists n1(ts timestamp, c1 int, c2 nchar(4))");
if (taos_errno(pRes) != 0) { if (taos_errno(pRes) != 0) {
@ -261,12 +282,12 @@ int32_t init_env() {
} }
taos_free_result(pRes); taos_free_result(pRes);
// pRes = taos_query(pConn, "drop table n1"); pRes = taos_query(pConn, "drop table n1");
// if (taos_errno(pRes) != 0) { if (taos_errno(pRes) != 0) {
// printf("failed to drop normal table n1, reason:%s\n", taos_errstr(pRes)); printf("failed to drop normal table n1, reason:%s\n", taos_errstr(pRes));
// return -1; return -1;
// } }
// taos_free_result(pRes); taos_free_result(pRes);
pRes = taos_query(pConn, "create table jt(ts timestamp, i int) tags(t json)"); pRes = taos_query(pConn, "create table jt(ts timestamp, i int) tags(t json)");
if (taos_errno(pRes) != 0) { if (taos_errno(pRes) != 0) {
@ -289,21 +310,21 @@ int32_t init_env() {
} }
taos_free_result(pRes); taos_free_result(pRes);
// pRes = taos_query(pConn, pRes = taos_query(pConn,
// "create stable if not exists st1 (ts timestamp, c1 int, c2 float, c3 binary(16)) tags(t1 int, t3 " "create stable if not exists st1 (ts timestamp, c1 int, c2 float, c3 binary(16)) tags(t1 int, t3 "
// "nchar(8), t4 bool)"); "nchar(8), t4 bool)");
// if (taos_errno(pRes) != 0) { if (taos_errno(pRes) != 0) {
// printf("failed to create super table st1, reason:%s\n", taos_errstr(pRes)); printf("failed to create super table st1, reason:%s\n", taos_errstr(pRes));
// return -1; return -1;
// } }
// taos_free_result(pRes); taos_free_result(pRes);
//
// pRes = taos_query(pConn, "drop table st1"); pRes = taos_query(pConn, "drop table st1");
// if (taos_errno(pRes) != 0) { if (taos_errno(pRes) != 0) {
// printf("failed to drop super table st1, reason:%s\n", taos_errstr(pRes)); printf("failed to drop super table st1, reason:%s\n", taos_errstr(pRes));
// return -1; return -1;
// } }
// taos_free_result(pRes); taos_free_result(pRes);
taos_close(pConn); taos_close(pConn);
return 0; return 0;

View File

@ -260,17 +260,20 @@ enum tmq_res_t {
}; };
typedef struct tmq_raw_data{ typedef struct tmq_raw_data{
void* raw_meta; void* raw;
uint32_t raw_meta_len; uint32_t raw_len;
uint16_t raw_meta_type; uint16_t raw_type;
} tmq_raw_data; } tmq_raw_data;
typedef enum tmq_res_t tmq_res_t; typedef enum tmq_res_t tmq_res_t;
DLL_EXPORT tmq_res_t tmq_get_res_type(TAOS_RES *res); DLL_EXPORT tmq_res_t tmq_get_res_type(TAOS_RES *res);
DLL_EXPORT int32_t tmq_get_raw_meta(TAOS_RES *res, tmq_raw_data *raw_meta); DLL_EXPORT int32_t tmq_get_raw(TAOS_RES *res, tmq_raw_data *raw);
DLL_EXPORT int32_t taos_write_raw_meta(TAOS *taos, tmq_raw_data raw_meta); DLL_EXPORT int32_t tmq_write_raw(TAOS *taos, tmq_raw_data raw);
DLL_EXPORT int32_t taos_write_raw_data(TAOS *taos, TAOS_RES *res); DLL_EXPORT int taos_write_raw_block(TAOS *taos, int numOfRows, char *pData, const char* tbname);
DLL_EXPORT void tmq_free_raw(tmq_raw_data raw);
DLL_EXPORT char *tmq_get_json_meta(TAOS_RES *res); // Returning null means error. Returned result need to be freed by tmq_free_json_meta DLL_EXPORT char *tmq_get_json_meta(TAOS_RES *res); // Returning null means error. Returned result need to be freed by tmq_free_json_meta
DLL_EXPORT void tmq_free_json_meta(char* jsonMeta); DLL_EXPORT void tmq_free_json_meta(char* jsonMeta);
DLL_EXPORT const char *tmq_get_topic_name(TAOS_RES *res); DLL_EXPORT const char *tmq_get_topic_name(TAOS_RES *res);

View File

@ -40,6 +40,7 @@ enum {
|| x == TDMT_VND_CREATE_TABLE \ || x == TDMT_VND_CREATE_TABLE \
|| x == TDMT_VND_ALTER_TABLE \ || x == TDMT_VND_ALTER_TABLE \
|| x == TDMT_VND_DROP_TABLE \ || x == TDMT_VND_DROP_TABLE \
|| x == TDMT_VND_DELETE \
) )
// clang-format on // clang-format on

View File

@ -3036,6 +3036,7 @@ typedef struct SDeleteRes {
int64_t skey; int64_t skey;
int64_t ekey; int64_t ekey;
int64_t affectedRows; int64_t affectedRows;
char tableFName[TSDB_TABLE_FNAME_LEN];
} SDeleteRes; } SDeleteRes;
int32_t tEncodeDeleteRes(SEncoder* pCoder, const SDeleteRes* pRes); int32_t tEncodeDeleteRes(SEncoder* pCoder, const SDeleteRes* pRes);

View File

@ -38,6 +38,7 @@ typedef struct SDeleterRes {
int64_t skey; int64_t skey;
int64_t ekey; int64_t ekey;
int64_t affectedRows; int64_t affectedRows;
char tableFName[TSDB_TABLE_FNAME_LEN];
} SDeleterRes; } SDeleterRes;
typedef struct SDeleterParam { typedef struct SDeleterParam {

View File

@ -502,6 +502,7 @@ typedef struct SDataDeleterNode {
uint64_t tableId; uint64_t tableId;
int8_t tableType; // table type int8_t tableType; // table type
char tableFName[TSDB_TABLE_FNAME_LEN]; char tableFName[TSDB_TABLE_FNAME_LEN];
char tsColName[TSDB_COL_NAME_LEN];
STimeWindow deleteTimeRange; STimeWindow deleteTimeRange;
SNode* pAffectedRows; SNode* pAffectedRows;
} SDataDeleterNode; } SDataDeleterNode;

View File

@ -251,8 +251,8 @@ extern int32_t (*queryProcessMsgRsp[TDMT_MAX])(void* output, char* msg, int32_t
(_code) == TSDB_CODE_APP_NOT_READY || (_code) == TSDB_CODE_RPC_BROKEN_LINK) (_code) == TSDB_CODE_APP_NOT_READY || (_code) == TSDB_CODE_RPC_BROKEN_LINK)
#define NEED_CLIENT_RM_TBLMETA_REQ(_type) \ #define NEED_CLIENT_RM_TBLMETA_REQ(_type) \
((_type) == TDMT_VND_CREATE_TABLE || (_type) == TDMT_VND_CREATE_STB || (_type) == TDMT_VND_DROP_TABLE || \ ((_type) == TDMT_VND_CREATE_TABLE || (_type) == TDMT_MND_CREATE_STB || (_type) == TDMT_VND_DROP_TABLE || \
(_type) == TDMT_VND_DROP_STB) (_type) == TDMT_MND_DROP_STB)
#define NEED_SCHEDULER_REDIRECT_ERROR(_code) \ #define NEED_SCHEDULER_REDIRECT_ERROR(_code) \
((_code) == TSDB_CODE_RPC_REDIRECT || (_code) == TSDB_CODE_NODE_NOT_DEPLOYED || \ ((_code) == TSDB_CODE_RPC_REDIRECT || (_code) == TSDB_CODE_NODE_NOT_DEPLOYED || \

View File

@ -1206,6 +1206,7 @@ int32_t tmqPollCb(void* param, SDataBuf* pMsg, int32_t code) {
SDecoder decoder; SDecoder decoder;
tDecoderInit(&decoder, POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)), pMsg->len - sizeof(SMqRspHead)); tDecoderInit(&decoder, POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)), pMsg->len - sizeof(SMqRspHead));
tDecodeSMqDataRsp(&decoder, &pRspWrapper->dataRsp); tDecodeSMqDataRsp(&decoder, &pRspWrapper->dataRsp);
tDecoderClear(&decoder);
memcpy(&pRspWrapper->dataRsp, pMsg->pData, sizeof(SMqRspHead)); memcpy(&pRspWrapper->dataRsp, pMsg->pData, sizeof(SMqRspHead));
} else { } else {
ASSERT(rspType == TMQ_MSG_TYPE__POLL_META_RSP); ASSERT(rspType == TMQ_MSG_TYPE__POLL_META_RSP);
@ -1859,6 +1860,10 @@ tmq_res_t tmq_get_res_type(TAOS_RES* res) {
if (TD_RES_TMQ(res)) { if (TD_RES_TMQ(res)) {
return TMQ_RES_DATA; return TMQ_RES_DATA;
} else if (TD_RES_TMQ_META(res)) { } else if (TD_RES_TMQ_META(res)) {
SMqMetaRspObj* pMetaRspObj = (SMqMetaRspObj*)res;
if (pMetaRspObj->metaRsp.resMsgType == TDMT_VND_DELETE) {
return TMQ_RES_DATA;
}
return TMQ_RES_TABLE_META; return TMQ_RES_TABLE_META;
} else { } else {
return TMQ_RES_INVALID; return TMQ_RES_INVALID;
@ -1913,17 +1918,6 @@ const char* tmq_get_table_name(TAOS_RES* res) {
return NULL; return NULL;
} }
int32_t tmq_get_raw_meta(TAOS_RES* res, tmq_raw_data *raw) {
if (TD_RES_TMQ_META(res) && raw) {
SMqMetaRspObj* pMetaRspObj = (SMqMetaRspObj*)res;
raw->raw_meta = pMetaRspObj->metaRsp.metaRsp;
raw->raw_meta_len = pMetaRspObj->metaRsp.metaRspLen;
raw->raw_meta_type = pMetaRspObj->metaRsp.resMsgType;
return TSDB_CODE_SUCCESS;
}
return TSDB_CODE_INVALID_PARA;
}
static char* buildCreateTableJson(SSchemaWrapper* schemaRow, SSchemaWrapper* schemaTag, char* name, int64_t id, static char* buildCreateTableJson(SSchemaWrapper* schemaRow, SSchemaWrapper* schemaTag, char* name, int64_t id,
int8_t t) { int8_t t) {
char* string = NULL; char* string = NULL;
@ -2436,30 +2430,6 @@ _exit:
return string; return string;
} }
char* tmq_get_json_meta(TAOS_RES* res) {
if (!TD_RES_TMQ_META(res)) {
return NULL;
}
SMqMetaRspObj* pMetaRspObj = (SMqMetaRspObj*)res;
if (pMetaRspObj->metaRsp.resMsgType == TDMT_VND_CREATE_STB) {
return processCreateStb(&pMetaRspObj->metaRsp);
} else if (pMetaRspObj->metaRsp.resMsgType == TDMT_VND_ALTER_STB) {
return processAlterStb(&pMetaRspObj->metaRsp);
} else if (pMetaRspObj->metaRsp.resMsgType == TDMT_VND_DROP_STB) {
return processDropSTable(&pMetaRspObj->metaRsp);
} else if (pMetaRspObj->metaRsp.resMsgType == TDMT_VND_CREATE_TABLE) {
return processCreateTable(&pMetaRspObj->metaRsp);
} else if (pMetaRspObj->metaRsp.resMsgType == TDMT_VND_ALTER_TABLE) {
return processAlterTable(&pMetaRspObj->metaRsp);
} else if (pMetaRspObj->metaRsp.resMsgType == TDMT_VND_DROP_TABLE) {
return processDropTable(&pMetaRspObj->metaRsp);
}
return NULL;
}
void tmq_free_json_meta(char* jsonMeta) { taosMemoryFreeClear(jsonMeta); }
static int32_t taosCreateStb(TAOS* taos, void* meta, int32_t metaLen) { static int32_t taosCreateStb(TAOS* taos, void* meta, int32_t metaLen) {
SVCreateStbReq req = {0}; SVCreateStbReq req = {0};
SDecoder coder; SDecoder coder;
@ -2531,6 +2501,13 @@ static int32_t taosCreateStb(TAOS* taos, void* meta, int32_t metaLen) {
pQuery.stableQuery = true; pQuery.stableQuery = true;
launchQueryImpl(pRequest, &pQuery, true, NULL); launchQueryImpl(pRequest, &pQuery, true, NULL);
if(pRequest->code == TSDB_CODE_SUCCESS){
SCatalog* pCatalog = NULL;
catalogGetHandle(pTscObj->pAppInfo->clusterId, &pCatalog);
catalogRemoveTableMeta(pCatalog, &tableName);
}
code = pRequest->code; code = pRequest->code;
taosMemoryFree(pCmdMsg.pMsg); taosMemoryFree(pCmdMsg.pMsg);
@ -2572,7 +2549,7 @@ static int32_t taosDropStb(TAOS* taos, void* meta, int32_t metaLen) {
pReq.suid = req.suid; pReq.suid = req.suid;
STscObj* pTscObj = pRequest->pTscObj; STscObj* pTscObj = pRequest->pTscObj;
SName tableName; SName tableName = {0};
tNameExtractFullName(toName(pTscObj->acctId, pRequest->pDb, req.name, &tableName), pReq.name); tNameExtractFullName(toName(pTscObj->acctId, pRequest->pDb, req.name, &tableName), pReq.name);
SCmdMsgInfo pCmdMsg = {0}; SCmdMsgInfo pCmdMsg = {0};
@ -2593,6 +2570,13 @@ static int32_t taosDropStb(TAOS* taos, void* meta, int32_t metaLen) {
pQuery.stableQuery = true; pQuery.stableQuery = true;
launchQueryImpl(pRequest, &pQuery, true, NULL); launchQueryImpl(pRequest, &pQuery, true, NULL);
if(pRequest->code == TSDB_CODE_SUCCESS){
SCatalog* pCatalog = NULL;
catalogGetHandle(pTscObj->pAppInfo->clusterId, &pCatalog);
catalogRemoveTableMeta(pCatalog, &tableName);
}
code = pRequest->code; code = pRequest->code;
taosMemoryFree(pCmdMsg.pMsg); taosMemoryFree(pCmdMsg.pMsg);
@ -2659,17 +2643,20 @@ static int32_t taosCreateTable(TAOS* taos, void* meta, int32_t metaLen) {
.requestId = pRequest->requestId, .requestId = pRequest->requestId,
.requestObjRefId = pRequest->self, .requestObjRefId = pRequest->self,
.mgmtEps = getEpSet_s(&pTscObj->pAppInfo->mgmtEp)}; .mgmtEps = getEpSet_s(&pTscObj->pAppInfo->mgmtEp)};
pRequest->tableList = taosArrayInit(req.nReqs, sizeof(SName));
// loop to create table // loop to create table
for (int32_t iReq = 0; iReq < req.nReqs; iReq++) { for (int32_t iReq = 0; iReq < req.nReqs; iReq++) {
pCreateReq = req.pReqs + iReq; pCreateReq = req.pReqs + iReq;
SVgroupInfo pInfo = {0}; SVgroupInfo pInfo = {0};
SName pName; SName pName = {0};
toName(pTscObj->acctId, pRequest->pDb, pCreateReq->name, &pName); toName(pTscObj->acctId, pRequest->pDb, pCreateReq->name, &pName);
code = catalogGetTableHashVgroup(pCatalog, &conn, &pName, &pInfo); code = catalogGetTableHashVgroup(pCatalog, &conn, &pName, &pInfo);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
goto end; goto end;
} }
taosArrayPush(pRequest->tableList, &pName);
SVgroupCreateTableBatch* pTableBatch = taosHashGet(pVgroupHashmap, &pInfo.vgId, sizeof(pInfo.vgId)); SVgroupCreateTableBatch* pTableBatch = taosHashGet(pVgroupHashmap, &pInfo.vgId, sizeof(pInfo.vgId));
if (pTableBatch == NULL) { if (pTableBatch == NULL) {
@ -2703,8 +2690,11 @@ static int32_t taosCreateTable(TAOS* taos, void* meta, int32_t metaLen) {
goto end; goto end;
} }
launchQueryImpl(pRequest, pQuery, false, NULL); launchQueryImpl(pRequest, pQuery, true, NULL);
pQuery = NULL; // no need to free in the end if (pRequest->code == TSDB_CODE_SUCCESS){
removeMeta(pTscObj, pRequest->tableList);
}
code = pRequest->code; code = pRequest->code;
end: end:
@ -2772,19 +2762,21 @@ static int32_t taosDropTable(TAOS* taos, void* meta, int32_t metaLen) {
.requestId = pRequest->requestId, .requestId = pRequest->requestId,
.requestObjRefId = pRequest->self, .requestObjRefId = pRequest->self,
.mgmtEps = getEpSet_s(&pTscObj->pAppInfo->mgmtEp)}; .mgmtEps = getEpSet_s(&pTscObj->pAppInfo->mgmtEp)};
pRequest->tableList = taosArrayInit(req.nReqs, sizeof(SName));
// loop to create table // loop to create table
for (int32_t iReq = 0; iReq < req.nReqs; iReq++) { for (int32_t iReq = 0; iReq < req.nReqs; iReq++) {
pDropReq = req.pReqs + iReq; pDropReq = req.pReqs + iReq;
pDropReq->igNotExists = true; pDropReq->igNotExists = true;
SVgroupInfo pInfo = {0}; SVgroupInfo pInfo = {0};
SName pName; SName pName = {0};
toName(pTscObj->acctId, pRequest->pDb, pDropReq->name, &pName); toName(pTscObj->acctId, pRequest->pDb, pDropReq->name, &pName);
code = catalogGetTableHashVgroup(pCatalog, &conn, &pName, &pInfo); code = catalogGetTableHashVgroup(pCatalog, &conn, &pName, &pInfo);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
goto end; goto end;
} }
taosArrayPush(pRequest->tableList, &pName);
SVgroupDropTableBatch* pTableBatch = taosHashGet(pVgroupHashmap, &pInfo.vgId, sizeof(pInfo.vgId)); SVgroupDropTableBatch* pTableBatch = taosHashGet(pVgroupHashmap, &pInfo.vgId, sizeof(pInfo.vgId));
if (pTableBatch == NULL) { if (pTableBatch == NULL) {
SVgroupDropTableBatch tBatch = {0}; SVgroupDropTableBatch tBatch = {0};
@ -2815,8 +2807,10 @@ static int32_t taosDropTable(TAOS* taos, void* meta, int32_t metaLen) {
goto end; goto end;
} }
launchQueryImpl(pRequest, pQuery, false, NULL); launchQueryImpl(pRequest, pQuery, true, NULL);
pQuery = NULL; // no need to free in the end if (pRequest->code == TSDB_CODE_SUCCESS){
removeMeta(pTscObj, pRequest->tableList);
}
code = pRequest->code; code = pRequest->code;
end: end:
@ -2827,6 +2821,70 @@ end:
return code; return code;
} }
// delete from db.tabl where .. -> delete from tabl where ..
// delete from db .tabl where .. -> delete from tabl where ..
static void getTbName(char *sql){
char *ch = sql;
bool inBackQuote = false;
int8_t dotIndex = 0;
while(*ch != '\0'){
if(!inBackQuote && *ch == '`'){
inBackQuote = true;
ch++;
continue;
}
if(inBackQuote && *ch == '`'){
inBackQuote = false;
ch++;
continue;
}
if(!inBackQuote && *ch == '.'){
dotIndex ++;
if(dotIndex == 2){
memmove(sql, ch + 1, strlen(ch + 1) + 1);
break;
}
}
ch++;
}
}
static int32_t taosDeleteData(TAOS* taos, void* meta, int32_t metaLen) {
SDeleteRes req = {0};
SDecoder coder = {0};
int32_t code = TSDB_CODE_SUCCESS;
// decode and process req
void* data = POINTER_SHIFT(meta, sizeof(SMsgHead));
int32_t len = metaLen - sizeof(SMsgHead);
tDecoderInit(&coder, data, len);
if (tDecodeDeleteRes(&coder, &req) < 0) {
code = TSDB_CODE_INVALID_PARA;
goto end;
}
getTbName(req.tableFName);
char sql[256] = {0};
sprintf(sql, "delete from `%s` where `%s` >= %" PRId64" and `%s` <= %" PRId64, req.tableFName, "ts", req.skey, "ts", req.ekey);
printf("delete sql:%s\n", sql);
TAOS_RES* res = taos_query(taos, sql);
SRequestObj *pRequest = (SRequestObj *)res;
code = pRequest->code;
if (code == TSDB_CODE_PAR_TABLE_NOT_EXIST) {
code = TSDB_CODE_SUCCESS;
}
taos_free_result(res);
end:
tDecoderClear(&coder);
return code;
}
static int32_t taosAlterTable(TAOS* taos, void* meta, int32_t metaLen) { static int32_t taosAlterTable(TAOS* taos, void* meta, int32_t metaLen) {
SVAlterTbReq req = {0}; SVAlterTbReq req = {0};
SDecoder coder = {0}; SDecoder coder = {0};
@ -2914,15 +2972,21 @@ static int32_t taosAlterTable(TAOS* taos, void* meta, int32_t metaLen) {
goto end; goto end;
} }
launchQueryImpl(pRequest, pQuery, false, NULL); launchQueryImpl(pRequest, pQuery, true, NULL);
pQuery = NULL; // no need to free in the end
pVgData = NULL; pVgData = NULL;
pArray = NULL; pArray = NULL;
code = pRequest->code; code = pRequest->code;
if (code == TSDB_CODE_VND_TABLE_NOT_EXIST) { if (code == TSDB_CODE_VND_TABLE_NOT_EXIST) {
code = 0; code = TSDB_CODE_SUCCESS;
} }
if(pRequest->code == TSDB_CODE_SUCCESS){
SExecResult* pRes = &pRequest->body.resInfo.execRes;
if(pRes->res != NULL){
code = handleAlterTbExecRes(pRes->res, pCatalog);
}
}
end: end:
taosArrayDestroy(pArray); taosArrayDestroy(pArray);
if (pVgData) taosMemoryFreeClear(pVgData->pData); if (pVgData) taosMemoryFreeClear(pVgData->pData);
@ -2933,27 +2997,6 @@ end:
return code; return code;
} }
int32_t taos_write_raw_meta(TAOS *taos, tmq_raw_data raw_meta){
if (!taos) {
return TSDB_CODE_INVALID_PARA;
}
if(raw_meta.raw_meta_type == TDMT_VND_CREATE_STB) {
return taosCreateStb(taos, raw_meta.raw_meta, raw_meta.raw_meta_len);
}else if(raw_meta.raw_meta_type == TDMT_VND_ALTER_STB){
return taosCreateStb(taos, raw_meta.raw_meta, raw_meta.raw_meta_len);
}else if(raw_meta.raw_meta_type == TDMT_VND_DROP_STB){
return taosDropStb(taos, raw_meta.raw_meta, raw_meta.raw_meta_len);
}else if(raw_meta.raw_meta_type == TDMT_VND_CREATE_TABLE){
return taosCreateTable(taos, raw_meta.raw_meta, raw_meta.raw_meta_len);
}else if(raw_meta.raw_meta_type == TDMT_VND_ALTER_TABLE){
return taosAlterTable(taos, raw_meta.raw_meta, raw_meta.raw_meta_len);
}else if(raw_meta.raw_meta_type == TDMT_VND_DROP_TABLE){
return taosDropTable(taos, raw_meta.raw_meta, raw_meta.raw_meta_len);
}
return TSDB_CODE_INVALID_PARA;
}
typedef struct{ typedef struct{
SVgroupInfo vg; SVgroupInfo vg;
void *data; void *data;
@ -2964,15 +3007,196 @@ static void destroyVgHash(void* data) {
taosMemoryFreeClear(vgData->data); taosMemoryFreeClear(vgData->data);
} }
int32_t taos_write_raw_data(TAOS *taos, TAOS_RES *msg){ int taos_write_raw_block(TAOS *taos, int rows, char *pData, const char* tbname){
if (!TD_RES_TMQ(msg)) { int32_t code = TSDB_CODE_SUCCESS;
uError("WriteRaw:msg is not tmq : %d", *(int8_t*)msg); STableMeta* pTableMeta = NULL;
return TSDB_CODE_TMQ_INVALID_MSG; SQuery *pQuery = NULL;
SRequestObj* pRequest = (SRequestObj*)createRequest(*(int64_t*)taos, TSDB_SQL_INSERT);
if(!pRequest){
uError("WriteRaw:createRequest error request is null");
code = terrno;
goto end;
} }
if (!pRequest->pDb) {
uError("WriteRaw:not use db");
code = TSDB_CODE_PAR_DB_NOT_SPECIFIED;
goto end;
}
SName pName = {TSDB_TABLE_NAME_T, pRequest->pTscObj->acctId, {0}, {0}};
strcpy(pName.dbname, pRequest->pDb);
strcpy(pName.tname, tbname);
struct SCatalog *pCatalog = NULL;
code = catalogGetHandle(pRequest->pTscObj->pAppInfo->clusterId, &pCatalog);
if(code != TSDB_CODE_SUCCESS){
uError("WriteRaw: get gatlog error");
goto end;
}
SRequestConnInfo conn = {0};
conn.pTrans = pRequest->pTscObj->pAppInfo->pTransporter;
conn.requestId = pRequest->requestId;
conn.requestObjRefId = pRequest->self;
conn.mgmtEps = getEpSet_s(&pRequest->pTscObj->pAppInfo->mgmtEp);
SVgroupInfo vgData = {0};
code = catalogGetTableHashVgroup(pCatalog, &conn, &pName, &vgData);
if (code != TSDB_CODE_SUCCESS) {
uError("WriteRaw:catalogGetTableHashVgroup failed. table name: %s", tbname);
goto end;
}
code = catalogGetTableMeta(pCatalog, &conn, &pName, &pTableMeta);
if (code != TSDB_CODE_SUCCESS) {
uError("WriteRaw:catalogGetTableMeta failed. table name: %s", tbname);
goto end;
}
uint64_t suid = (TSDB_NORMAL_TABLE == pTableMeta->tableType ? 0 : pTableMeta->suid);
uint64_t uid = pTableMeta->uid;
int32_t numOfCols = pTableMeta->tableInfo.numOfColumns;
uint16_t fLen = 0;
int32_t rowSize = 0;
int16_t nVar = 0;
for (int i = 0; i < numOfCols; i++) {
SSchema *schema = pTableMeta->schema + i;
fLen += TYPE_BYTES[schema->type];
rowSize += schema->bytes;
if(IS_VAR_DATA_TYPE(schema->type)){
nVar ++;
}
}
int32_t extendedRowSize = rowSize + TD_ROW_HEAD_LEN - sizeof(TSKEY) + nVar * sizeof(VarDataOffsetT) +
(int32_t)TD_BITMAP_BYTES(numOfCols - 1);
int32_t schemaLen = 0;
int32_t submitLen = sizeof(SSubmitBlk) + schemaLen + rows * extendedRowSize;
int32_t totalLen = sizeof(SSubmitReq) + submitLen;
SSubmitReq* subReq = taosMemoryCalloc(1, totalLen);
SSubmitBlk* blk = POINTER_SHIFT(subReq, sizeof(SSubmitReq));
void* blkSchema = POINTER_SHIFT(blk, sizeof(SSubmitBlk));
STSRow* rowData = POINTER_SHIFT(blkSchema, schemaLen);
SRowBuilder rb = {0};
tdSRowInit(&rb, pTableMeta->sversion);
tdSRowSetTpInfo(&rb, numOfCols, fLen);
int32_t dataLen = 0;
char* pStart = pData + sizeof(int32_t) + sizeof(uint64_t) + numOfCols * (sizeof(int16_t) + sizeof(int32_t));
int32_t* colLength = (int32_t*)pStart;
pStart += sizeof(int32_t) * numOfCols;
SResultColumn *pCol = taosMemoryCalloc(numOfCols, sizeof(SResultColumn));
for (int32_t i = 0; i < numOfCols; ++i) {
if (IS_VAR_DATA_TYPE(pTableMeta->schema[i].type)) {
pCol[i].offset = (int32_t*)pStart;
pStart += rows * sizeof(int32_t);
} else {
pCol[i].nullbitmap = pStart;
pStart += BitmapLen(rows);
}
pCol[i].pData = pStart;
pStart += colLength[i];
}
for (int32_t j = 0; j < rows; j++) {
tdSRowResetBuf(&rb, rowData);
int32_t offset = 0;
for (int32_t k = 0; k < numOfCols; k++) {
const SSchema* pColumn = &pTableMeta->schema[k];
if (IS_VAR_DATA_TYPE(pColumn->type)) {
if (pCol[k].offset[j] != -1) {
char* data = pCol[k].pData + pCol[k].offset[j];
tdAppendColValToRow(&rb, pColumn->colId, pColumn->type, TD_VTYPE_NORM, data, true, offset, k);
} else {
tdAppendColValToRow(&rb, pColumn->colId, pColumn->type, TD_VTYPE_NULL, NULL, false, offset, k);
}
} else {
if (!colDataIsNull_f(pCol[k].nullbitmap, j)) {
char* data = pCol[k].pData + pColumn->bytes * j;
tdAppendColValToRow(&rb, pColumn->colId, pColumn->type, TD_VTYPE_NORM, data, true, offset, k);
} else {
tdAppendColValToRow(&rb, pColumn->colId, pColumn->type, TD_VTYPE_NULL, NULL, false, offset, k);
}
}
offset += TYPE_BYTES[pColumn->type];
}
int32_t rowLen = TD_ROW_LEN(rowData);
rowData = POINTER_SHIFT(rowData, rowLen);
dataLen += rowLen;
}
taosMemoryFree(pCol);
blk->uid = htobe64(uid);
blk->suid = htobe64(suid);
blk->padding = htonl(blk->padding);
blk->sversion = htonl(pTableMeta->sversion);
blk->schemaLen = htonl(schemaLen);
blk->numOfRows = htons(rows);
blk->dataLen = htonl(dataLen);
subReq->length = sizeof(SSubmitReq) + sizeof(SSubmitBlk) + schemaLen + dataLen;
subReq->numOfBlocks = 1;
pQuery = (SQuery*)nodesMakeNode(QUERY_NODE_QUERY);
if (NULL == pQuery) {
uError("create SQuery error");
code = TSDB_CODE_OUT_OF_MEMORY;
goto end;
}
pQuery->execMode = QUERY_EXEC_MODE_SCHEDULE;
pQuery->haveResultSet = false;
pQuery->msgType = TDMT_VND_SUBMIT;
pQuery->pRoot = (SNode *)nodesMakeNode(QUERY_NODE_VNODE_MODIF_STMT);
if (NULL == pQuery->pRoot) {
uError("create pQuery->pRoot error");
code = TSDB_CODE_OUT_OF_MEMORY;
goto end;
}
SVnodeModifOpStmt *nodeStmt = (SVnodeModifOpStmt *)(pQuery->pRoot);
nodeStmt->payloadType = PAYLOAD_TYPE_KV;
nodeStmt->pDataBlocks = taosArrayInit(1, POINTER_BYTES);
SVgDataBlocks *dst = taosMemoryCalloc(1, sizeof(SVgDataBlocks));
if (NULL == dst) {
code = TSDB_CODE_TSC_OUT_OF_MEMORY;
goto end;
}
dst->vg = vgData;
dst->numOfTables = subReq->numOfBlocks;
dst->size = subReq->length;
dst->pData = (char*)subReq;
subReq->header.vgId = htonl(dst->vg.vgId);
subReq->version = htonl(1);
subReq->header.contLen = htonl(subReq->length);
subReq->length = htonl(subReq->length);
subReq->numOfBlocks = htonl(subReq->numOfBlocks);
subReq = NULL; // no need free
taosArrayPush(nodeStmt->pDataBlocks, &dst);
launchQueryImpl(pRequest, pQuery, true, NULL);
code = pRequest->code;
end:
taosMemoryFreeClear(pTableMeta);
qDestroyQuery(pQuery);
return code;
}
static int32_t tmqWriteRaw(TAOS *taos, void* data, int32_t dataLen){
int32_t code = TSDB_CODE_SUCCESS; int32_t code = TSDB_CODE_SUCCESS;
SHashObj *pVgHash = NULL; SHashObj *pVgHash = NULL;
SQuery *pQuery = NULL; SQuery *pQuery = NULL;
SMqRspObj rspObj = {0};
SDecoder decoder = {0};
terrno = TSDB_CODE_SUCCESS; terrno = TSDB_CODE_SUCCESS;
SRequestObj* pRequest = (SRequestObj*)createRequest(*(int64_t*)taos, TSDB_SQL_INSERT); SRequestObj* pRequest = (SRequestObj*)createRequest(*(int64_t*)taos, TSDB_SQL_INSERT);
@ -2981,6 +3205,17 @@ int32_t taos_write_raw_data(TAOS *taos, TAOS_RES *msg){
return terrno; return terrno;
} }
rspObj.resIter = -1;
rspObj.resType = RES_TYPE__TMQ;
tDecoderInit(&decoder, data, dataLen);
code = tDecodeSMqDataRsp(&decoder, &rspObj.rsp);
if (code != 0){
uError("WriteRaw:decode smqDataRsp error");
code = TSDB_CODE_INVALID_MSG;
goto end;
}
if (!pRequest->pDb) { if (!pRequest->pDb) {
uError("WriteRaw:not use db"); uError("WriteRaw:not use db");
code = TSDB_CODE_PAR_DB_NOT_SPECIFIED; code = TSDB_CODE_PAR_DB_NOT_SPECIFIED;
@ -3001,18 +3236,18 @@ int32_t taos_write_raw_data(TAOS *taos, TAOS_RES *msg){
conn.requestId = pRequest->requestId; conn.requestId = pRequest->requestId;
conn.requestObjRefId = pRequest->self; conn.requestObjRefId = pRequest->self;
conn.mgmtEps = getEpSet_s(&pRequest->pTscObj->pAppInfo->mgmtEp); conn.mgmtEps = getEpSet_s(&pRequest->pTscObj->pAppInfo->mgmtEp);
SMqRspObj *rspObj = ((SMqRspObj*)msg);
printf("raw data block num:%d\n", rspObj->rsp.blockNum); printf("raw data block num:%d\n", rspObj.rsp.blockNum);
while (++rspObj->resIter < rspObj->rsp.blockNum) { while (++rspObj.resIter < rspObj.rsp.blockNum) {
SRetrieveTableRsp* pRetrieve = (SRetrieveTableRsp*)taosArrayGetP(rspObj->rsp.blockData, rspObj->resIter); SRetrieveTableRsp* pRetrieve = (SRetrieveTableRsp*)taosArrayGetP(rspObj.rsp.blockData, rspObj.resIter);
if (!rspObj->rsp.withSchema) { if (!rspObj.rsp.withSchema) {
uError("WriteRaw:no schema, iter:%d", rspObj->resIter); uError("WriteRaw:no schema, iter:%d", rspObj.resIter);
goto end; goto end;
} }
SSchemaWrapper* pSW = (SSchemaWrapper*)taosArrayGetP(rspObj->rsp.blockSchema, rspObj->resIter); SSchemaWrapper* pSW = (SSchemaWrapper*)taosArrayGetP(rspObj.rsp.blockSchema, rspObj.resIter);
setResSchemaInfo(&rspObj->resInfo, pSW->pSchema, pSW->nCols); setResSchemaInfo(&rspObj.resInfo, pSW->pSchema, pSW->nCols);
code = setQueryResultFromRsp(&rspObj->resInfo, pRetrieve, false, false); code = setQueryResultFromRsp(&rspObj.resInfo, pRetrieve, false, false);
if(code != TSDB_CODE_SUCCESS){ if(code != TSDB_CODE_SUCCESS){
uError("WriteRaw: setQueryResultFromRsp error"); uError("WriteRaw: setQueryResultFromRsp error");
goto end; goto end;
@ -3030,13 +3265,13 @@ int32_t taos_write_raw_data(TAOS *taos, TAOS_RES *msg){
} }
} }
int32_t rows = rspObj->resInfo.numOfRows; int32_t rows = rspObj.resInfo.numOfRows;
int32_t extendedRowSize = rowSize + TD_ROW_HEAD_LEN - sizeof(TSKEY) + nVar * sizeof(VarDataOffsetT) + int32_t extendedRowSize = rowSize + TD_ROW_HEAD_LEN - sizeof(TSKEY) + nVar * sizeof(VarDataOffsetT) +
(int32_t)TD_BITMAP_BYTES(pSW->nCols - 1); (int32_t)TD_BITMAP_BYTES(pSW->nCols - 1);
int32_t schemaLen = 0; int32_t schemaLen = 0;
int32_t submitLen = sizeof(SSubmitBlk) + schemaLen + rows * extendedRowSize; int32_t submitLen = sizeof(SSubmitBlk) + schemaLen + rows * extendedRowSize;
const char* tbName = tmq_get_table_name(msg); const char* tbName = (const char*)taosArrayGetP(rspObj.rsp.blockTbName, rspObj.resIter);
if(!tbName){ if(!tbName){
uError("WriteRaw: tbname is null"); uError("WriteRaw: tbname is null");
code = TSDB_CODE_TMQ_INVALID_MSG; code = TSDB_CODE_TMQ_INVALID_MSG;
@ -3108,13 +3343,13 @@ int32_t taos_write_raw_data(TAOS *taos, TAOS_RES *msg){
for (int32_t j = 0; j < rows; j++) { for (int32_t j = 0; j < rows; j++) {
tdSRowResetBuf(&rb, rowData); tdSRowResetBuf(&rb, rowData);
doSetOneRowPtr(&rspObj->resInfo); doSetOneRowPtr(&rspObj.resInfo);
rspObj->resInfo.current += 1; rspObj.resInfo.current += 1;
int32_t offset = 0; int32_t offset = 0;
for (int32_t k = 0; k < pSW->nCols; k++) { for (int32_t k = 0; k < pSW->nCols; k++) {
const SSchema* pColumn = &pSW->pSchema[k]; const SSchema* pColumn = &pSW->pSchema[k];
char *data = rspObj->resInfo.row[k]; char *data = rspObj.resInfo.row[k];
if (!data) { if (!data) {
tdAppendColValToRow(&rb, pColumn->colId, pColumn->type, TD_VTYPE_NULL, NULL, false, offset, k); tdAppendColValToRow(&rb, pColumn->colId, pColumn->type, TD_VTYPE_NULL, NULL, false, offset, k);
} else { } else {
@ -3186,13 +3421,105 @@ int32_t taos_write_raw_data(TAOS *taos, TAOS_RES *msg){
launchQueryImpl(pRequest, pQuery, true, NULL); launchQueryImpl(pRequest, pQuery, true, NULL);
code = pRequest->code; code = pRequest->code;
end: end:
tDecoderClear(&decoder);
taos_free_result(&rspObj);
qDestroyQuery(pQuery); qDestroyQuery(pQuery);
destroyRequest(pRequest); destroyRequest(pRequest);
taosHashCleanup(pVgHash); taosHashCleanup(pVgHash);
return code; return code;
} }
char* tmq_get_json_meta(TAOS_RES* res) {
if (!TD_RES_TMQ_META(res)) {
return NULL;
}
SMqMetaRspObj* pMetaRspObj = (SMqMetaRspObj*)res;
if (pMetaRspObj->metaRsp.resMsgType == TDMT_VND_CREATE_STB) {
return processCreateStb(&pMetaRspObj->metaRsp);
} else if (pMetaRspObj->metaRsp.resMsgType == TDMT_VND_ALTER_STB) {
return processAlterStb(&pMetaRspObj->metaRsp);
} else if (pMetaRspObj->metaRsp.resMsgType == TDMT_VND_DROP_STB) {
return processDropSTable(&pMetaRspObj->metaRsp);
} else if (pMetaRspObj->metaRsp.resMsgType == TDMT_VND_CREATE_TABLE) {
return processCreateTable(&pMetaRspObj->metaRsp);
} else if (pMetaRspObj->metaRsp.resMsgType == TDMT_VND_ALTER_TABLE) {
return processAlterTable(&pMetaRspObj->metaRsp);
} else if (pMetaRspObj->metaRsp.resMsgType == TDMT_VND_DROP_TABLE) {
return processDropTable(&pMetaRspObj->metaRsp);
}
return NULL;
}
void tmq_free_json_meta(char* jsonMeta) { taosMemoryFreeClear(jsonMeta); }
int32_t tmq_get_raw(TAOS_RES* res, tmq_raw_data *raw) {
if (!raw || !res){
return TSDB_CODE_INVALID_PARA;
}
if (TD_RES_TMQ_META(res)) {
SMqMetaRspObj* pMetaRspObj = (SMqMetaRspObj*)res;
raw->raw = pMetaRspObj->metaRsp.metaRsp;
raw->raw_len = pMetaRspObj->metaRsp.metaRspLen;
raw->raw_type = pMetaRspObj->metaRsp.resMsgType;
} else if(TD_RES_TMQ(res)){
SMqRspObj *rspObj = ((SMqRspObj*)res);
int32_t len = 0;
int32_t code = 0;
tEncodeSize(tEncodeSMqDataRsp, &rspObj->rsp, len, code);
if (code < 0) {
return -1;
}
void *buf = taosMemoryCalloc(1, len);
SEncoder encoder = {0};
tEncoderInit(&encoder, buf, len);
tEncodeSMqDataRsp(&encoder, &rspObj->rsp);
tEncoderClear(&encoder);
raw->raw = buf;
raw->raw_len = len;
raw->raw_type = RES_TYPE__TMQ;
} else {
return TSDB_CODE_TMQ_INVALID_MSG;
}
return TSDB_CODE_SUCCESS;
}
void tmq_free_raw(tmq_raw_data raw) {
if (raw.raw_type == RES_TYPE__TMQ){
taosMemoryFree(raw.raw);
}
}
int32_t tmq_write_raw(TAOS *taos, tmq_raw_data raw){
if (!taos) {
return TSDB_CODE_INVALID_PARA;
}
if(raw.raw_type == TDMT_VND_CREATE_STB) {
return taosCreateStb(taos, raw.raw, raw.raw_len);
}else if(raw.raw_type == TDMT_VND_ALTER_STB){
return taosCreateStb(taos, raw.raw, raw.raw_len);
}else if(raw.raw_type == TDMT_VND_DROP_STB){
return taosDropStb(taos, raw.raw, raw.raw_len);
}else if(raw.raw_type == TDMT_VND_CREATE_TABLE){
return taosCreateTable(taos, raw.raw, raw.raw_len);
}else if(raw.raw_type == TDMT_VND_ALTER_TABLE){
return taosAlterTable(taos, raw.raw, raw.raw_len);
}else if(raw.raw_type == TDMT_VND_DROP_TABLE) {
return taosDropTable(taos, raw.raw, raw.raw_len);
}else if(raw.raw_type == TDMT_VND_DELETE){
return taosDeleteData(taos, raw.raw, raw.raw_len);
}else if(raw.raw_type == RES_TYPE__TMQ){
return tmqWriteRaw(taos, raw.raw, raw.raw_len);
}
return TSDB_CODE_INVALID_PARA;
}
void tmq_commit_async(tmq_t* tmq, const TAOS_RES* msg, tmq_commit_cb* cb, void* param) { void tmq_commit_async(tmq_t* tmq, const TAOS_RES* msg, tmq_commit_cb* cb, void* param) {
// //
tmqCommitInner2(tmq, msg, 0, 1, cb, param); tmqCommitInner2(tmq, msg, 0, 1, cb, param);

View File

@ -5664,6 +5664,7 @@ int32_t tEncodeDeleteRes(SEncoder *pCoder, const SDeleteRes *pRes) {
if (tEncodeI64(pCoder, pRes->ekey) < 0) return -1; if (tEncodeI64(pCoder, pRes->ekey) < 0) return -1;
if (tEncodeI64v(pCoder, pRes->affectedRows) < 0) return -1; if (tEncodeI64v(pCoder, pRes->affectedRows) < 0) return -1;
if (tEncodeCStr(pCoder, pRes->tableFName) < 0) return -1;
return 0; return 0;
} }
@ -5675,12 +5676,13 @@ int32_t tDecodeDeleteRes(SDecoder *pCoder, SDeleteRes *pRes) {
if (tDecodeI32v(pCoder, &nUid) < 0) return -1; if (tDecodeI32v(pCoder, &nUid) < 0) return -1;
for (int32_t iUid = 0; iUid < nUid; iUid++) { for (int32_t iUid = 0; iUid < nUid; iUid++) {
if (tDecodeU64(pCoder, &uid) < 0) return -1; if (tDecodeU64(pCoder, &uid) < 0) return -1;
taosArrayPush(pRes->uidList, &uid); if (pRes->uidList) taosArrayPush(pRes->uidList, &uid);
} }
if (tDecodeI64(pCoder, &pRes->skey) < 0) return -1; if (tDecodeI64(pCoder, &pRes->skey) < 0) return -1;
if (tDecodeI64(pCoder, &pRes->ekey) < 0) return -1; if (tDecodeI64(pCoder, &pRes->ekey) < 0) return -1;
if (tDecodeI64v(pCoder, &pRes->affectedRows) < 0) return -1; if (tDecodeI64v(pCoder, &pRes->affectedRows) < 0) return -1;
if (tDecodeCStrTo(pCoder, pRes->tableFName) < 0) return -1;
return 0; return 0;
} }
int32_t tEncodeSMqDataRsp(SEncoder *pEncoder, const SMqDataRsp *pRsp) { int32_t tEncodeSMqDataRsp(SEncoder *pEncoder, const SMqDataRsp *pRsp) {

View File

@ -138,8 +138,8 @@ int32_t tqSendDataRsp(STQ* pTq, const SRpcMsg* pMsg, const SMqPollReq* pReq, con
ASSERT(taosArrayGetSize(pRsp->blockSchema) == 0); ASSERT(taosArrayGetSize(pRsp->blockSchema) == 0);
} }
int32_t len; int32_t len = 0;
int32_t code; int32_t code = 0;
tEncodeSize(tEncodeSMqDataRsp, pRsp, len, code); tEncodeSize(tEncodeSMqDataRsp, pRsp, len, code);
if (code < 0) { if (code < 0) {
return -1; return -1;
@ -156,9 +156,10 @@ int32_t tqSendDataRsp(STQ* pTq, const SRpcMsg* pMsg, const SMqPollReq* pReq, con
void* abuf = POINTER_SHIFT(buf, sizeof(SMqRspHead)); void* abuf = POINTER_SHIFT(buf, sizeof(SMqRspHead));
SEncoder encoder; SEncoder encoder = {0};
tEncoderInit(&encoder, abuf, len); tEncoderInit(&encoder, abuf, len);
tEncodeSMqDataRsp(&encoder, pRsp); tEncodeSMqDataRsp(&encoder, pRsp);
tEncoderClear(&encoder);
SRpcMsg rsp = { SRpcMsg rsp = {
.info = pMsg->info, .info = pMsg->info,
@ -168,8 +169,8 @@ int32_t tqSendDataRsp(STQ* pTq, const SRpcMsg* pMsg, const SMqPollReq* pReq, con
}; };
tmsgSendRsp(&rsp); tmsgSendRsp(&rsp);
char buf1[80]; char buf1[80] = {0};
char buf2[80]; char buf2[80] = {0};
tFormatOffset(buf1, 80, &pRsp->reqOffset); tFormatOffset(buf1, 80, &pRsp->reqOffset);
tFormatOffset(buf2, 80, &pRsp->rspOffset); tFormatOffset(buf2, 80, &pRsp->rspOffset);
tqDebug("vgId:%d from consumer:%" PRId64 ", (epoch %d) send rsp, block num: %d, reqOffset:%s, rspOffset:%s", tqDebug("vgId:%d from consumer:%" PRId64 ", (epoch %d) send rsp, block num: %d, reqOffset:%s, rspOffset:%s",

View File

@ -106,7 +106,9 @@ int32_t vnodePreProcessWriteMsg(SVnode *pVnode, SRpcMsg *pMsg) {
.meta = pVnode->pMeta, .config = &pVnode->config, .vnode = pVnode, .pMsgCb = &pVnode->msgCb}; .meta = pVnode->pMeta, .config = &pVnode->config, .vnode = pVnode, .pMsgCb = &pVnode->msgCb};
code = qWorkerProcessDeleteMsg(&handle, pVnode->pQuery, pMsg, &res); code = qWorkerProcessDeleteMsg(&handle, pVnode->pQuery, pMsg, &res);
if (code) goto _err; if (code) {
goto _err;
}
// malloc and encode // malloc and encode
tEncodeSize(tEncodeDeleteRes, &res, size, ret); tEncodeSize(tEncodeDeleteRes, &res, size, ret);
@ -981,6 +983,11 @@ static int32_t vnodeProcessDeleteReq(SVnode *pVnode, int64_t version, void *pReq
SDecoder *pCoder = &(SDecoder){0}; SDecoder *pCoder = &(SDecoder){0};
SDeleteRes *pRes = &(SDeleteRes){0}; SDeleteRes *pRes = &(SDeleteRes){0};
pRsp->msgType = TDMT_VND_DELETE_RSP;
pRsp->pCont = NULL;
pRsp->contLen = 0;
pRsp->code = TSDB_CODE_SUCCESS;
pRes->uidList = taosArrayInit(0, sizeof(tb_uid_t)); pRes->uidList = taosArrayInit(0, sizeof(tb_uid_t));
if (pRes->uidList == NULL) { if (pRes->uidList == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY; code = TSDB_CODE_OUT_OF_MEMORY;
@ -998,6 +1005,15 @@ static int32_t vnodeProcessDeleteReq(SVnode *pVnode, int64_t version, void *pReq
tDecoderClear(pCoder); tDecoderClear(pCoder);
taosArrayDestroy(pRes->uidList); taosArrayDestroy(pRes->uidList);
SVDeleteRsp rsp = {.affectedRows = pRes->affectedRows};
int32_t ret = 0;
tEncodeSize(tEncodeSVDeleteRsp, &rsp, pRsp->contLen, ret);
pRsp->pCont = rpcMallocCont(pRsp->contLen);
SEncoder ec = {0};
tEncoderInit(&ec, pRsp->pCont, pRsp->contLen);
tEncodeSVDeleteRsp(&ec, &rsp);
tEncoderClear(&ec);
return code; return code;
_err: _err:

View File

@ -90,6 +90,7 @@ static void toDataCacheEntry(SDataDeleterHandle* pHandle, const SInputData* pInp
pRes->uidList = pHandle->pParam->pUidList; pRes->uidList = pHandle->pParam->pUidList;
pRes->skey = pHandle->pDeleter->deleteTimeRange.skey; pRes->skey = pHandle->pDeleter->deleteTimeRange.skey;
pRes->ekey = pHandle->pDeleter->deleteTimeRange.ekey; pRes->ekey = pHandle->pDeleter->deleteTimeRange.ekey;
strcpy(pRes->tableFName, pHandle->pDeleter->tableFName);
pRes->affectedRows = *(int64_t*)pColRes->pData; pRes->affectedRows = *(int64_t*)pColRes->pData;
pBuf->useSize += pEntry->dataLen; pBuf->useSize += pEntry->dataLen;

View File

@ -347,6 +347,9 @@ int32_t qCreateExecTask(SReadHandle* readHandle, int32_t vgId, uint64_t taskId,
} }
code = dsCreateDataSinker(pSubplan->pDataSink, handle, pSinkParam); code = dsCreateDataSinker(pSubplan->pDataSink, handle, pSinkParam);
if(code != TSDB_CODE_SUCCESS){
taosMemoryFreeClear(pSinkParam);
}
} }
_error: _error:

View File

@ -279,7 +279,7 @@ int32_t qwGetDeleteResFromSink(QW_FPARAMS_DEF, SQWTaskCtx *ctx, SDeleteRes *pRes
pRes->skey = pDelRes->skey; pRes->skey = pDelRes->skey;
pRes->ekey = pDelRes->ekey; pRes->ekey = pDelRes->ekey;
pRes->affectedRows = pDelRes->affectedRows; pRes->affectedRows = pDelRes->affectedRows;
strcpy(pRes->tableFName, pDelRes->tableFName);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }

View File

@ -230,6 +230,7 @@ int32_t schHandleResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t execId, SDa
SVDeleteRsp rsp = {0}; SVDeleteRsp rsp = {0};
tDecoderInit(&coder, msg, msgSize); tDecoderInit(&coder, msg, msgSize);
tDecodeSVDeleteRsp(&coder, &rsp); tDecodeSVDeleteRsp(&coder, &rsp);
tDecoderClear(&coder);
atomic_add_fetch_32(&pJob->resNumOfRows, rsp.affectedRows); atomic_add_fetch_32(&pJob->resNumOfRows, rsp.affectedRows);
SCH_TASK_DLOG("delete succeed, affectedRows:%" PRId64, rsp.affectedRows); SCH_TASK_DLOG("delete succeed, affectedRows:%" PRId64, rsp.affectedRows);

View File

@ -630,7 +630,7 @@ static int32_t meta_msg_process(TAOS_RES* msg, SThreadInfo* pInfo, int32_t msgIn
{ {
tmq_raw_data raw = {0}; tmq_raw_data raw = {0};
int32_t code = tmq_get_raw_meta(msg, &raw); int32_t code = tmq_get_raw(msg, &raw);
if(code == TSDB_CODE_SUCCESS){ if(code == TSDB_CODE_SUCCESS){
int retCode = queryDB(pInfo->taos, "use metadb"); int retCode = queryDB(pInfo->taos, "use metadb");
@ -641,7 +641,7 @@ static int32_t meta_msg_process(TAOS_RES* msg, SThreadInfo* pInfo, int32_t msgIn
} }
taosFprintfFile(g_fp, "raw:%p\n", &raw); taosFprintfFile(g_fp, "raw:%p\n", &raw);
taos_write_raw_meta(pInfo->taos, raw); tmq_write_raw(pInfo->taos, raw);
} }
char* result = tmq_get_json_meta(msg); char* result = tmq_get_json_meta(msg);