Merge branch '3.0' into feature/3_liaohj
This commit is contained in:
commit
dc90f22d99
111
examples/c/tmq.c
111
examples/c/tmq.c
|
@ -18,6 +18,7 @@
|
|||
#include <string.h>
|
||||
#include <time.h>
|
||||
#include "taos.h"
|
||||
#include <stdlib.h>
|
||||
|
||||
static int running = 1;
|
||||
static void msg_process(TAOS_RES* msg) {
|
||||
|
@ -30,7 +31,11 @@ static void msg_process(TAOS_RES* msg) {
|
|||
void* meta;
|
||||
int32_t metaLen;
|
||||
tmq_get_raw_meta(msg, &meta, &metaLen);
|
||||
|
||||
char* result = tmq_get_json_meta(msg);
|
||||
if(result){
|
||||
printf("meta result: %s\n", result);
|
||||
free(result);
|
||||
}
|
||||
printf("meta, len is %d\n", metaLen);
|
||||
return;
|
||||
}
|
||||
|
@ -119,6 +124,104 @@ int32_t init_env() {
|
|||
}
|
||||
taos_free_result(pRes);
|
||||
|
||||
pRes = taos_query(pConn, "alter table st1 add column c4 bigint");
|
||||
if (taos_errno(pRes) != 0) {
|
||||
printf("failed to alter super table st1, reason:%s\n", taos_errstr(pRes));
|
||||
return -1;
|
||||
}
|
||||
taos_free_result(pRes);
|
||||
|
||||
pRes = taos_query(pConn, "alter table st1 modify column c3 binary(64)");
|
||||
if (taos_errno(pRes) != 0) {
|
||||
printf("failed to alter super table st1, reason:%s\n", taos_errstr(pRes));
|
||||
return -1;
|
||||
}
|
||||
taos_free_result(pRes);
|
||||
|
||||
pRes = taos_query(pConn, "alter table st1 add tag t2 binary(64)");
|
||||
if (taos_errno(pRes) != 0) {
|
||||
printf("failed to alter super table st1, reason:%s\n", taos_errstr(pRes));
|
||||
return -1;
|
||||
}
|
||||
taos_free_result(pRes);
|
||||
|
||||
pRes = taos_query(pConn, "alter table ct3 set tag t1=5000");
|
||||
if (taos_errno(pRes) != 0) {
|
||||
printf("failed to slter child table ct3, reason:%s\n", taos_errstr(pRes));
|
||||
return -1;
|
||||
}
|
||||
taos_free_result(pRes);
|
||||
|
||||
pRes = taos_query(pConn, "drop table ct3 ct1");
|
||||
if (taos_errno(pRes) != 0) {
|
||||
printf("failed to drop child table ct3, reason:%s\n", taos_errstr(pRes));
|
||||
return -1;
|
||||
}
|
||||
taos_free_result(pRes);
|
||||
|
||||
pRes = taos_query(pConn, "drop table st1");
|
||||
if (taos_errno(pRes) != 0) {
|
||||
printf("failed to drop super table st1, reason:%s\n", taos_errstr(pRes));
|
||||
return -1;
|
||||
}
|
||||
taos_free_result(pRes);
|
||||
|
||||
pRes = taos_query(pConn, "create table if not exists n1(ts timestamp, c1 int, c2 nchar(4))");
|
||||
if (taos_errno(pRes) != 0) {
|
||||
printf("failed to create normal table n1, reason:%s\n", taos_errstr(pRes));
|
||||
return -1;
|
||||
}
|
||||
taos_free_result(pRes);
|
||||
|
||||
pRes = taos_query(pConn, "alter table n1 add column c3 bigint");
|
||||
if (taos_errno(pRes) != 0) {
|
||||
printf("failed to alter normal table n1, reason:%s\n", taos_errstr(pRes));
|
||||
return -1;
|
||||
}
|
||||
taos_free_result(pRes);
|
||||
|
||||
pRes = taos_query(pConn, "alter table n1 modify column c2 nchar(8)");
|
||||
if (taos_errno(pRes) != 0) {
|
||||
printf("failed to alter normal table n1, reason:%s\n", taos_errstr(pRes));
|
||||
return -1;
|
||||
}
|
||||
taos_free_result(pRes);
|
||||
|
||||
pRes = taos_query(pConn, "alter table n1 rename column c3 cc3");
|
||||
if (taos_errno(pRes) != 0) {
|
||||
printf("failed to alter normal table n1, reason:%s\n", taos_errstr(pRes));
|
||||
return -1;
|
||||
}
|
||||
taos_free_result(pRes);
|
||||
|
||||
pRes = taos_query(pConn, "alter table n1 drop column c1");
|
||||
if (taos_errno(pRes) != 0) {
|
||||
printf("failed to alter normal table n1, reason:%s\n", taos_errstr(pRes));
|
||||
return -1;
|
||||
}
|
||||
taos_free_result(pRes);
|
||||
|
||||
pRes = taos_query(pConn, "drop table n1");
|
||||
if (taos_errno(pRes) != 0) {
|
||||
printf("failed to drop normal table n1, reason:%s\n", taos_errstr(pRes));
|
||||
return -1;
|
||||
}
|
||||
taos_free_result(pRes);
|
||||
|
||||
pRes = taos_query(pConn, "create table jt(ts timestamp, i int) tags(t json)");
|
||||
if (taos_errno(pRes) != 0) {
|
||||
printf("failed to create super table jt, reason:%s\n", taos_errstr(pRes));
|
||||
return -1;
|
||||
}
|
||||
taos_free_result(pRes);
|
||||
|
||||
pRes = taos_query(pConn, "create table jt1 using jt tags('{\"k1\":1, \"k2\":\"hello\"}')");
|
||||
if (taos_errno(pRes) != 0) {
|
||||
printf("failed to create super table jt, reason:%s\n", taos_errstr(pRes));
|
||||
return -1;
|
||||
}
|
||||
taos_free_result(pRes);
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
@ -137,8 +240,8 @@ int32_t create_topic() {
|
|||
}
|
||||
taos_free_result(pRes);
|
||||
|
||||
/*pRes = taos_query(pConn, "create topic topic_ctb_column with meta as database abc1");*/
|
||||
pRes = taos_query(pConn, "create topic topic_ctb_column as select ts, c1, c2, c3 from st1");
|
||||
pRes = taos_query(pConn, "create topic topic_ctb_column with meta as database abc1");
|
||||
// pRes = taos_query(pConn, "create topic topic_ctb_column as select ts, c1, c2, c3 from st1");
|
||||
if (taos_errno(pRes) != 0) {
|
||||
printf("failed to create topic topic_ctb_column, reason:%s\n", taos_errstr(pRes));
|
||||
return -1;
|
||||
|
@ -199,7 +302,7 @@ tmq_t* build_consumer() {
|
|||
tmq_conf_set(conf, "msg.with.table.name", "true");
|
||||
tmq_conf_set(conf, "enable.auto.commit", "true");
|
||||
|
||||
tmq_conf_set(conf, "experimental.snapshot.enable", "true");
|
||||
tmq_conf_set(conf, "experimental.snapshot.enable", "false");
|
||||
|
||||
tmq_conf_set_auto_commit_cb(conf, tmq_commit_cb_print, NULL);
|
||||
tmq_t* tmq = tmq_consumer_new(conf, NULL, 0);
|
||||
|
|
|
@ -263,6 +263,8 @@ typedef enum tmq_res_t tmq_res_t;
|
|||
|
||||
DLL_EXPORT tmq_res_t tmq_get_res_type(TAOS_RES *res);
|
||||
DLL_EXPORT int32_t tmq_get_raw_meta(TAOS_RES *res, void **raw_meta, int32_t *raw_meta_len);
|
||||
DLL_EXPORT int32_t taos_write_raw_meta(TAOS *res, void *raw_meta, int32_t raw_meta_len);
|
||||
DLL_EXPORT char *tmq_get_json_meta(TAOS_RES *res); // Returning null means error. Returned result need to be freed.
|
||||
DLL_EXPORT const char *tmq_get_topic_name(TAOS_RES *res);
|
||||
DLL_EXPORT const char *tmq_get_db_name(TAOS_RES *res);
|
||||
DLL_EXPORT int32_t tmq_get_vgroup_id(TAOS_RES *res);
|
||||
|
|
|
@ -209,6 +209,7 @@ enum {
|
|||
TD_DEF_MSG_TYPE(TDMT_SCH_QUERY_CONTINUE, "query-continue", NULL, NULL)
|
||||
TD_DEF_MSG_TYPE(TDMT_SCH_QUERY_HEARTBEAT, "query-heartbeat", NULL, NULL)
|
||||
TD_DEF_MSG_TYPE(TDMT_SCH_FETCH, "fetch", NULL, NULL)
|
||||
TD_DEF_MSG_TYPE(TDMT_SCH_MERGE_FETCH, "merge-fetch", NULL, NULL)
|
||||
TD_DEF_MSG_TYPE(TDMT_SCH_CANCEL_TASK, "cancel-task", NULL, NULL)
|
||||
TD_DEF_MSG_TYPE(TDMT_SCH_DROP_TASK, "drop-task", NULL, NULL)
|
||||
TD_DEF_MSG_TYPE(TDMT_SCH_EXPLAIN, "explain", NULL, NULL)
|
||||
|
|
|
@ -353,6 +353,7 @@ typedef struct SDownstreamSourceNode {
|
|||
uint64_t taskId;
|
||||
uint64_t schedId;
|
||||
int32_t execId;
|
||||
int32_t fetchMsgType;
|
||||
} SDownstreamSourceNode;
|
||||
|
||||
typedef struct SExchangePhysiNode {
|
||||
|
|
|
@ -102,7 +102,7 @@ void closeTransporter(SAppInstInfo *pAppInfo) {
|
|||
|
||||
static bool clientRpcRfp(int32_t code, tmsg_t msgType) {
|
||||
if (NEED_REDIRECT_ERROR(code)) {
|
||||
if (msgType == TDMT_SCH_QUERY || msgType == TDMT_SCH_MERGE_QUERY || msgType == TDMT_SCH_FETCH) {
|
||||
if (msgType == TDMT_SCH_QUERY || msgType == TDMT_SCH_MERGE_QUERY || msgType == TDMT_SCH_FETCH || msgType == TDMT_SCH_MERGE_FETCH) {
|
||||
return false;
|
||||
}
|
||||
return true;
|
||||
|
|
|
@ -23,6 +23,7 @@
|
|||
#include "tqueue.h"
|
||||
#include "tref.h"
|
||||
#include "ttimer.h"
|
||||
#include "cJSON.h"
|
||||
|
||||
int32_t tmqAskEp(tmq_t* tmq, bool async);
|
||||
|
||||
|
@ -1683,7 +1684,8 @@ void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) {
|
|||
if (pollRspWrapper->metaRsp.head.epoch == consumerEpoch) {
|
||||
SMqClientVg* pVg = pollRspWrapper->vgHandle;
|
||||
/*printf("vg %d offset %ld up to %ld\n", pVg->vgId, pVg->currentOffset, rspMsg->msg.rspOffset);*/
|
||||
pVg->currentOffsetNew = pollRspWrapper->metaRsp.rspOffsetNew;
|
||||
pVg->currentOffsetNew.version = pollRspWrapper->metaRsp.rspOffset;
|
||||
pVg->currentOffsetNew.type = TMQ_OFFSET__LOG;
|
||||
atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE);
|
||||
// build rsp
|
||||
SMqMetaRspObj* pRsp = tmqBuildMetaRspFromWrapper(pollRspWrapper);
|
||||
|
@ -1848,6 +1850,406 @@ int32_t tmq_get_raw_meta(TAOS_RES* res, void** raw_meta, int32_t* raw_meta_len)
|
|||
return -1;
|
||||
}
|
||||
|
||||
static char *buildCreateTableJson(SSchemaWrapper *schemaRow, SSchemaWrapper* schemaTag, char* name, int64_t id, int8_t t){
|
||||
char* string = NULL;
|
||||
cJSON* json = cJSON_CreateObject();
|
||||
if (json == NULL) {
|
||||
return string;
|
||||
}
|
||||
cJSON* type = cJSON_CreateString("create");
|
||||
cJSON_AddItemToObject(json, "type", type);
|
||||
|
||||
char uid[32] = {0};
|
||||
sprintf(uid, "%"PRIi64, id);
|
||||
cJSON* id_ = cJSON_CreateString(uid);
|
||||
cJSON_AddItemToObject(json, "id", id_);
|
||||
cJSON* tableName = cJSON_CreateString(name);
|
||||
cJSON_AddItemToObject(json, "tableName", tableName);
|
||||
cJSON* tableType = cJSON_CreateString(t == TSDB_NORMAL_TABLE ? "normal" : "super");
|
||||
cJSON_AddItemToObject(json, "tableType", tableType);
|
||||
// cJSON* version = cJSON_CreateNumber(1);
|
||||
// cJSON_AddItemToObject(json, "version", version);
|
||||
|
||||
cJSON* columns = cJSON_CreateArray();
|
||||
for(int i = 0; i < schemaRow->nCols; i++){
|
||||
cJSON* column = cJSON_CreateObject();
|
||||
SSchema *s = schemaRow->pSchema + i;
|
||||
cJSON* cname = cJSON_CreateString(s->name);
|
||||
cJSON_AddItemToObject(column, "name", cname);
|
||||
cJSON* ctype = cJSON_CreateNumber(s->type);
|
||||
cJSON_AddItemToObject(column, "type", ctype);
|
||||
if(s->type == TSDB_DATA_TYPE_BINARY){
|
||||
int32_t length = s->bytes - VARSTR_HEADER_SIZE;
|
||||
cJSON* cbytes = cJSON_CreateNumber(length);
|
||||
cJSON_AddItemToObject(column, "length", cbytes);
|
||||
}else if (s->type == TSDB_DATA_TYPE_NCHAR){
|
||||
int32_t length = (s->bytes - VARSTR_HEADER_SIZE)/TSDB_NCHAR_SIZE;
|
||||
cJSON* cbytes = cJSON_CreateNumber(length);
|
||||
cJSON_AddItemToObject(column, "length", cbytes);
|
||||
}
|
||||
cJSON_AddItemToArray(columns, column);
|
||||
}
|
||||
cJSON_AddItemToObject(json, "columns", columns);
|
||||
|
||||
cJSON* tags = cJSON_CreateArray();
|
||||
for(int i = 0; schemaTag && i < schemaTag->nCols; i++){
|
||||
cJSON* tag = cJSON_CreateObject();
|
||||
SSchema *s = schemaTag->pSchema + i;
|
||||
cJSON* tname = cJSON_CreateString(s->name);
|
||||
cJSON_AddItemToObject(tag, "name", tname);
|
||||
cJSON* ttype = cJSON_CreateNumber(s->type);
|
||||
cJSON_AddItemToObject(tag, "type", ttype);
|
||||
if(s->type == TSDB_DATA_TYPE_BINARY){
|
||||
int32_t length = s->bytes - VARSTR_HEADER_SIZE;
|
||||
cJSON* cbytes = cJSON_CreateNumber(length);
|
||||
cJSON_AddItemToObject(tag, "length", cbytes);
|
||||
}else if (s->type == TSDB_DATA_TYPE_NCHAR){
|
||||
int32_t length = (s->bytes - VARSTR_HEADER_SIZE)/TSDB_NCHAR_SIZE;
|
||||
cJSON* cbytes = cJSON_CreateNumber(length);
|
||||
cJSON_AddItemToObject(tag, "length", cbytes);
|
||||
}
|
||||
cJSON_AddItemToArray(tags, tag);
|
||||
}
|
||||
cJSON_AddItemToObject(json, "tags", tags);
|
||||
|
||||
string = cJSON_PrintUnformatted(json);
|
||||
cJSON_Delete(json);
|
||||
return string;
|
||||
}
|
||||
|
||||
static char *processCreateStb(SMqMetaRsp *metaRsp){
|
||||
SVCreateStbReq req = {0};
|
||||
SDecoder coder;
|
||||
char* string = NULL;
|
||||
|
||||
// decode and process req
|
||||
void* data = POINTER_SHIFT(metaRsp->metaRsp, sizeof(SMsgHead));
|
||||
int32_t len = metaRsp->metaRspLen - sizeof(SMsgHead);
|
||||
tDecoderInit(&coder, data, len);
|
||||
|
||||
if (tDecodeSVCreateStbReq(&coder, &req) < 0) {
|
||||
goto _err;
|
||||
}
|
||||
string = buildCreateTableJson(&req.schemaRow, &req.schemaTag, req.name, req.suid, TSDB_SUPER_TABLE);
|
||||
tDecoderClear(&coder);
|
||||
return string;
|
||||
|
||||
_err:
|
||||
tDecoderClear(&coder);
|
||||
return string;
|
||||
}
|
||||
|
||||
static char *buildCreateCTableJson(STag* pTag, int64_t sid, char* name, int64_t id){
|
||||
char* string = NULL;
|
||||
cJSON* json = cJSON_CreateObject();
|
||||
if (json == NULL) {
|
||||
return string;
|
||||
}
|
||||
cJSON* type = cJSON_CreateString("create");
|
||||
cJSON_AddItemToObject(json, "type", type);
|
||||
char cid[32] = {0};
|
||||
sprintf(cid, "%"PRIi64, id);
|
||||
cJSON* cid_ = cJSON_CreateString(cid);
|
||||
cJSON_AddItemToObject(json, "id", cid_);
|
||||
|
||||
cJSON* tableName = cJSON_CreateString(name);
|
||||
cJSON_AddItemToObject(json, "tableName", tableName);
|
||||
cJSON* tableType = cJSON_CreateString("child");
|
||||
cJSON_AddItemToObject(json, "tableType", tableType);
|
||||
|
||||
char sid_[32] = {0};
|
||||
sprintf(sid_, "%"PRIi64, sid);
|
||||
cJSON* using = cJSON_CreateString(sid_);
|
||||
cJSON_AddItemToObject(json, "using", using);
|
||||
// cJSON* version = cJSON_CreateNumber(1);
|
||||
// cJSON_AddItemToObject(json, "version", version);
|
||||
|
||||
cJSON* tags = cJSON_CreateArray();
|
||||
|
||||
if (tTagIsJson(pTag)) { // todo
|
||||
char* pJson = parseTagDatatoJson(pTag);
|
||||
|
||||
cJSON* tag = cJSON_CreateObject();
|
||||
cJSON* tname = cJSON_CreateString("unknown"); // todo
|
||||
cJSON_AddItemToObject(tag, "name", tname);
|
||||
cJSON* ttype = cJSON_CreateNumber(TSDB_DATA_TYPE_JSON);
|
||||
cJSON_AddItemToObject(tag, "type", ttype);
|
||||
cJSON* tvalue = cJSON_CreateString(pJson);
|
||||
cJSON_AddItemToObject(tag, "value", tvalue);
|
||||
cJSON_AddItemToArray(tags, tag);
|
||||
cJSON_AddItemToObject(json, "tags", tags);
|
||||
|
||||
string = cJSON_PrintUnformatted(json);
|
||||
goto end;
|
||||
}
|
||||
|
||||
SArray* pTagVals = NULL;
|
||||
int32_t code = tTagToValArray(pTag, &pTagVals);
|
||||
if (code) {
|
||||
goto end;
|
||||
}
|
||||
|
||||
for(int i = 0; i < taosArrayGetSize(pTagVals); i++){
|
||||
STagVal* pTagVal = (STagVal*)taosArrayGet(pTagVals, i);
|
||||
|
||||
cJSON* tag = cJSON_CreateObject();
|
||||
// cJSON* tname = cJSON_CreateNumber(pTagVal->cid);
|
||||
cJSON* tname = cJSON_CreateString("unkonwn"); // todo
|
||||
cJSON_AddItemToObject(tag, "name", tname);
|
||||
cJSON* ttype = cJSON_CreateNumber(pTagVal->type);
|
||||
cJSON_AddItemToObject(tag, "type", ttype);
|
||||
|
||||
char* buf = NULL;
|
||||
if (IS_VAR_DATA_TYPE(pTagVal->type)) {
|
||||
buf = taosMemoryCalloc(pTagVal->nData + 1, 1);
|
||||
dataConverToStr(buf, pTagVal->type, pTagVal->pData, pTagVal->nData, NULL);
|
||||
} else {
|
||||
buf = taosMemoryCalloc(32, 1);
|
||||
dataConverToStr(buf, pTagVal->type, &pTagVal->i64, tDataTypes[pTagVal->type].bytes, NULL);
|
||||
}
|
||||
|
||||
cJSON* tvalue = cJSON_CreateString(buf);
|
||||
taosMemoryFree(buf);
|
||||
cJSON_AddItemToObject(tag, "value", tvalue);
|
||||
cJSON_AddItemToArray(tags, tag);
|
||||
}
|
||||
cJSON_AddItemToObject(json, "tags", tags);
|
||||
string = cJSON_PrintUnformatted(json);
|
||||
|
||||
end:
|
||||
|
||||
cJSON_Delete(json);
|
||||
return string;
|
||||
}
|
||||
|
||||
static char *processCreateTable(SMqMetaRsp *metaRsp){
|
||||
SDecoder decoder = {0};
|
||||
SVCreateTbBatchReq req = {0};
|
||||
SVCreateTbReq *pCreateReq;
|
||||
char *string = NULL;
|
||||
// decode
|
||||
void* data = POINTER_SHIFT(metaRsp->metaRsp, sizeof(SMsgHead));
|
||||
int32_t len = metaRsp->metaRspLen - sizeof(SMsgHead);
|
||||
tDecoderInit(&decoder, data, len);
|
||||
if (tDecodeSVCreateTbBatchReq(&decoder, &req) < 0) {
|
||||
goto _exit;
|
||||
}
|
||||
|
||||
// loop to create table
|
||||
for (int32_t iReq = 0; iReq < req.nReqs; iReq++) {
|
||||
pCreateReq = req.pReqs + iReq;
|
||||
if(pCreateReq->type == TSDB_CHILD_TABLE){
|
||||
string = buildCreateCTableJson((STag*)pCreateReq->ctb.pTag, pCreateReq->ctb.suid, pCreateReq->name, pCreateReq->uid);
|
||||
}else if(pCreateReq->type == TSDB_NORMAL_TABLE){
|
||||
string = buildCreateTableJson(&pCreateReq->ntb.schemaRow, NULL, pCreateReq->name, pCreateReq->uid, TSDB_NORMAL_TABLE);
|
||||
}
|
||||
}
|
||||
|
||||
tDecoderClear(&decoder);
|
||||
|
||||
_exit:
|
||||
tDecoderClear(&decoder);
|
||||
return string;
|
||||
}
|
||||
|
||||
static char *processAlterTable(SMqMetaRsp *metaRsp){
|
||||
SDecoder decoder = {0};
|
||||
SVAlterTbReq vAlterTbReq = {0};
|
||||
char *string = NULL;
|
||||
|
||||
// decode
|
||||
void* data = POINTER_SHIFT(metaRsp->metaRsp, sizeof(SMsgHead));
|
||||
int32_t len = metaRsp->metaRspLen - sizeof(SMsgHead);
|
||||
tDecoderInit(&decoder, data, len);
|
||||
if (tDecodeSVAlterTbReq(&decoder, &vAlterTbReq) < 0) {
|
||||
goto _exit;
|
||||
}
|
||||
|
||||
cJSON* json = cJSON_CreateObject();
|
||||
if (json == NULL) {
|
||||
goto _exit;
|
||||
}
|
||||
cJSON* type = cJSON_CreateString("alter");
|
||||
cJSON_AddItemToObject(json, "type", type);
|
||||
// cJSON* uid = cJSON_CreateNumber(id);
|
||||
// cJSON_AddItemToObject(json, "uid", uid);
|
||||
cJSON* tableName = cJSON_CreateString(vAlterTbReq.tbName);
|
||||
cJSON_AddItemToObject(json, "tableName", tableName);
|
||||
cJSON* tableType = cJSON_CreateString(vAlterTbReq.action == TSDB_ALTER_TABLE_UPDATE_TAG_VAL ? "child" : "normal");
|
||||
cJSON_AddItemToObject(json, "tableType", tableType);
|
||||
|
||||
switch (vAlterTbReq.action) {
|
||||
case TSDB_ALTER_TABLE_ADD_COLUMN: {
|
||||
cJSON* alterType = cJSON_CreateNumber(TSDB_ALTER_TABLE_ADD_COLUMN);
|
||||
cJSON_AddItemToObject(json, "alterType", alterType);
|
||||
cJSON* colName = cJSON_CreateString(vAlterTbReq.colName);
|
||||
cJSON_AddItemToObject(json, "colName", colName);
|
||||
cJSON* colType = cJSON_CreateNumber(vAlterTbReq.type);
|
||||
cJSON_AddItemToObject(json, "colType", colType);
|
||||
|
||||
if(vAlterTbReq.type == TSDB_DATA_TYPE_BINARY){
|
||||
int32_t length = vAlterTbReq.bytes - VARSTR_HEADER_SIZE;
|
||||
cJSON* cbytes = cJSON_CreateNumber(length);
|
||||
cJSON_AddItemToObject(json, "colLength", cbytes);
|
||||
}else if (vAlterTbReq.type == TSDB_DATA_TYPE_NCHAR){
|
||||
int32_t length = (vAlterTbReq.bytes - VARSTR_HEADER_SIZE)/TSDB_NCHAR_SIZE;
|
||||
cJSON* cbytes = cJSON_CreateNumber(length);
|
||||
cJSON_AddItemToObject(json, "colLength", cbytes);
|
||||
}
|
||||
break;
|
||||
}
|
||||
case TSDB_ALTER_TABLE_DROP_COLUMN:{
|
||||
cJSON* alterType = cJSON_CreateNumber(TSDB_ALTER_TABLE_DROP_COLUMN);
|
||||
cJSON_AddItemToObject(json, "alterType", alterType);
|
||||
cJSON* colName = cJSON_CreateString(vAlterTbReq.colName);
|
||||
cJSON_AddItemToObject(json, "colName", colName);
|
||||
break;
|
||||
}
|
||||
case TSDB_ALTER_TABLE_UPDATE_COLUMN_BYTES:{
|
||||
cJSON* alterType = cJSON_CreateNumber(TSDB_ALTER_TABLE_UPDATE_COLUMN_BYTES);
|
||||
cJSON_AddItemToObject(json, "alterType", alterType);
|
||||
cJSON* colName = cJSON_CreateString(vAlterTbReq.colName);
|
||||
cJSON_AddItemToObject(json, "colName", colName);
|
||||
cJSON* colType = cJSON_CreateNumber(vAlterTbReq.type);
|
||||
cJSON_AddItemToObject(json, "colType", colType);
|
||||
if(vAlterTbReq.type == TSDB_DATA_TYPE_BINARY){
|
||||
int32_t length = vAlterTbReq.bytes - VARSTR_HEADER_SIZE;
|
||||
cJSON* cbytes = cJSON_CreateNumber(length);
|
||||
cJSON_AddItemToObject(json, "colLength", cbytes);
|
||||
}else if (vAlterTbReq.type == TSDB_DATA_TYPE_NCHAR){
|
||||
int32_t length = (vAlterTbReq.bytes - VARSTR_HEADER_SIZE)/TSDB_NCHAR_SIZE;
|
||||
cJSON* cbytes = cJSON_CreateNumber(length);
|
||||
cJSON_AddItemToObject(json, "colLength", cbytes);
|
||||
}
|
||||
break;
|
||||
}
|
||||
case TSDB_ALTER_TABLE_UPDATE_COLUMN_NAME:{
|
||||
cJSON* alterType = cJSON_CreateNumber(TSDB_ALTER_TABLE_UPDATE_COLUMN_NAME);
|
||||
cJSON_AddItemToObject(json, "alterType", alterType);
|
||||
cJSON* colName = cJSON_CreateString(vAlterTbReq.colName);
|
||||
cJSON_AddItemToObject(json, "colName", colName);
|
||||
cJSON* colNewName = cJSON_CreateString(vAlterTbReq.colNewName);
|
||||
cJSON_AddItemToObject(json, "colNewName", colNewName);
|
||||
break;
|
||||
}
|
||||
case TSDB_ALTER_TABLE_UPDATE_TAG_VAL:{
|
||||
cJSON* alterType = cJSON_CreateNumber(TSDB_ALTER_TABLE_UPDATE_TAG_VAL);
|
||||
cJSON_AddItemToObject(json, "alterType", alterType);
|
||||
cJSON* tagName = cJSON_CreateString(vAlterTbReq.tagName);
|
||||
cJSON_AddItemToObject(json, "colName", tagName);
|
||||
cJSON* colValue = cJSON_CreateString("invalid, todo"); // todo
|
||||
cJSON_AddItemToObject(json, "colValue", colValue);
|
||||
cJSON* isNull = cJSON_CreateBool(vAlterTbReq.isNull);
|
||||
cJSON_AddItemToObject(json, "colValueNull", isNull);
|
||||
break;
|
||||
}
|
||||
default:
|
||||
break;
|
||||
}
|
||||
string = cJSON_PrintUnformatted(json);
|
||||
|
||||
_exit:
|
||||
tDecoderClear(&decoder);
|
||||
return string;
|
||||
}
|
||||
|
||||
static char *processDropSTable(SMqMetaRsp *metaRsp){
|
||||
SDecoder decoder = {0};
|
||||
SVDropStbReq req = {0};
|
||||
char *string = NULL;
|
||||
|
||||
// decode
|
||||
void* data = POINTER_SHIFT(metaRsp->metaRsp, sizeof(SMsgHead));
|
||||
int32_t len = metaRsp->metaRspLen - sizeof(SMsgHead);
|
||||
tDecoderInit(&decoder, data, len);
|
||||
if (tDecodeSVDropStbReq(&decoder, &req) < 0) {
|
||||
goto _exit;
|
||||
}
|
||||
|
||||
cJSON* json = cJSON_CreateObject();
|
||||
if (json == NULL) {
|
||||
goto _exit;
|
||||
}
|
||||
cJSON* type = cJSON_CreateString("drop");
|
||||
cJSON_AddItemToObject(json, "type", type);
|
||||
char uid[32] = {0};
|
||||
sprintf(uid, "%"PRIi64, req.suid);
|
||||
cJSON* id = cJSON_CreateString(uid);
|
||||
cJSON_AddItemToObject(json, "id", id);
|
||||
cJSON* tableName = cJSON_CreateString(req.name);
|
||||
cJSON_AddItemToObject(json, "tableName", tableName);
|
||||
cJSON* tableType = cJSON_CreateString("super");
|
||||
cJSON_AddItemToObject(json, "tableType", tableType);
|
||||
|
||||
string = cJSON_PrintUnformatted(json);
|
||||
|
||||
_exit:
|
||||
tDecoderClear(&decoder);
|
||||
return string;
|
||||
}
|
||||
|
||||
static char *processDropTable(SMqMetaRsp *metaRsp){
|
||||
SDecoder decoder = {0};
|
||||
SVDropTbBatchReq req = {0};
|
||||
char *string = NULL;
|
||||
|
||||
// decode
|
||||
void* data = POINTER_SHIFT(metaRsp->metaRsp, sizeof(SMsgHead));
|
||||
int32_t len = metaRsp->metaRspLen - sizeof(SMsgHead);
|
||||
tDecoderInit(&decoder, data, len);
|
||||
if (tDecodeSVDropTbBatchReq(&decoder, &req) < 0) {
|
||||
goto _exit;
|
||||
}
|
||||
|
||||
cJSON* json = cJSON_CreateObject();
|
||||
if (json == NULL) {
|
||||
goto _exit;
|
||||
}
|
||||
cJSON* type = cJSON_CreateString("drop");
|
||||
cJSON_AddItemToObject(json, "type", type);
|
||||
// cJSON* uid = cJSON_CreateNumber(id);
|
||||
// cJSON_AddItemToObject(json, "uid", uid);
|
||||
// cJSON* tableType = cJSON_CreateString("normal");
|
||||
// cJSON_AddItemToObject(json, "tableType", tableType);
|
||||
|
||||
cJSON* tableNameList = cJSON_CreateArray();
|
||||
for (int32_t iReq = 0; iReq < req.nReqs; iReq++) {
|
||||
SVDropTbReq* pDropTbReq = req.pReqs + iReq;
|
||||
|
||||
cJSON* tableName = cJSON_CreateString(pDropTbReq->name); // todo
|
||||
cJSON_AddItemToArray(tableNameList, tableName);
|
||||
}
|
||||
cJSON_AddItemToObject(json, "tableNameList", tableNameList);
|
||||
|
||||
string = cJSON_PrintUnformatted(json);
|
||||
|
||||
_exit:
|
||||
tDecoderClear(&decoder);
|
||||
return string;
|
||||
}
|
||||
|
||||
char *tmq_get_json_meta(TAOS_RES *res){
|
||||
if (!TD_RES_TMQ_META(res)) {
|
||||
return NULL;
|
||||
}
|
||||
|
||||
SMqMetaRspObj* pMetaRspObj = (SMqMetaRspObj*)res;
|
||||
if(pMetaRspObj->metaRsp.resMsgType == TDMT_VND_CREATE_STB){
|
||||
return processCreateStb(&pMetaRspObj->metaRsp);
|
||||
}else if(pMetaRspObj->metaRsp.resMsgType == TDMT_VND_ALTER_STB){
|
||||
return processCreateStb(&pMetaRspObj->metaRsp);
|
||||
}else if(pMetaRspObj->metaRsp.resMsgType == TDMT_VND_DROP_STB){
|
||||
return processDropSTable(&pMetaRspObj->metaRsp);
|
||||
}else if(pMetaRspObj->metaRsp.resMsgType == TDMT_VND_CREATE_TABLE){
|
||||
return processCreateTable(&pMetaRspObj->metaRsp);
|
||||
}else if(pMetaRspObj->metaRsp.resMsgType == TDMT_VND_ALTER_TABLE){
|
||||
return processAlterTable(&pMetaRspObj->metaRsp);
|
||||
}else if(pMetaRspObj->metaRsp.resMsgType == TDMT_VND_DROP_TABLE){
|
||||
return processDropTable(&pMetaRspObj->metaRsp);
|
||||
}
|
||||
return NULL;
|
||||
}
|
||||
|
||||
void tmq_commit_async(tmq_t* tmq, const TAOS_RES* msg, tmq_commit_cb* cb, void* param) {
|
||||
tmqCommitInner2(tmq, msg, 0, 1, cb, param);
|
||||
}
|
||||
|
|
|
@ -214,6 +214,7 @@ SArray *mmGetMsgHandles() {
|
|||
if (dmSetMgmtHandle(pArray, TDMT_SCH_QUERY_CONTINUE, mmPutMsgToQueryQueue, 1) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_SCH_QUERY_HEARTBEAT, mmPutMsgToFetchQueue, 1) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_SCH_FETCH, mmPutMsgToFetchQueue, 1) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_SCH_MERGE_FETCH, mmPutMsgToFetchQueue, 1) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_VND_CREATE_STB_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_STB_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_VND_DROP_STB_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
|
||||
|
|
|
@ -111,6 +111,7 @@ SArray *qmGetMsgHandles() {
|
|||
if (dmSetMgmtHandle(pArray, TDMT_SCH_MERGE_QUERY, qmPutNodeMsgToQueryQueue, 1) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_SCH_QUERY_CONTINUE, qmPutNodeMsgToQueryQueue, 1) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_SCH_FETCH, qmPutNodeMsgToFetchQueue, 1) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_SCH_MERGE_FETCH, qmPutNodeMsgToFetchQueue, 1) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_SCH_FETCH_RSP, qmPutNodeMsgToFetchQueue, 1) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_SCH_QUERY_HEARTBEAT, qmPutNodeMsgToFetchQueue, 1) == NULL) goto _OVER;
|
||||
|
||||
|
|
|
@ -328,6 +328,7 @@ SArray *vmGetMsgHandles() {
|
|||
if (dmSetMgmtHandle(pArray, TDMT_SCH_MERGE_QUERY, vmPutMsgToQueryQueue, 0) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_SCH_QUERY_CONTINUE, vmPutMsgToQueryQueue, 0) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_SCH_FETCH, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_SCH_MERGE_FETCH, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_TABLE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_VND_UPDATE_TAG_VAL, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_VND_TABLE_META, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
|
||||
|
|
|
@ -238,9 +238,9 @@ int32_t vmGetQueueSize(SVnodeMgmt *pMgmt, int32_t vgId, EQueueType qtype) {
|
|||
}
|
||||
|
||||
int32_t vmAllocQueue(SVnodeMgmt *pMgmt, SVnodeObj *pVnode) {
|
||||
pVnode->pWriteQ = tWWorkerAllocQueue(&pMgmt->writePool, pVnode->pImpl, (FItems)vnodeProposeMsg);
|
||||
pVnode->pWriteQ = tWWorkerAllocQueue(&pMgmt->writePool, pVnode->pImpl, (FItems)vnodeProposeWriteMsg);
|
||||
pVnode->pSyncQ = tWWorkerAllocQueue(&pMgmt->syncPool, pVnode, (FItems)vmProcessSyncQueue);
|
||||
pVnode->pApplyQ = tWWorkerAllocQueue(&pMgmt->applyPool, pVnode->pImpl, (FItems)vnodeApplyMsg);
|
||||
pVnode->pApplyQ = tWWorkerAllocQueue(&pMgmt->applyPool, pVnode->pImpl, (FItems)vnodeApplyWriteMsg);
|
||||
pVnode->pQueryQ = tQWorkerAllocQueue(&pMgmt->queryPool, pVnode, (FItem)vmProcessQueryQueue);
|
||||
pVnode->pFetchQ = tQWorkerAllocQueue(&pMgmt->fetchPool, pVnode, (FItem)vmProcessFetchQueue);
|
||||
|
||||
|
|
|
@ -42,7 +42,7 @@ static inline void dmBuildMnodeRedirectRsp(SDnode *pDnode, SRpcMsg *pMsg) {
|
|||
|
||||
static inline void dmSendRedirectRsp(SRpcMsg *pMsg, const SEpSet *pNewEpSet) {
|
||||
pMsg->info.hasEpSet = 1;
|
||||
SRpcMsg rsp = {.code = TSDB_CODE_RPC_REDIRECT, .info = pMsg->info};
|
||||
SRpcMsg rsp = {.code = TSDB_CODE_RPC_REDIRECT, .info = pMsg->info, .msgType = pMsg->msgType};
|
||||
int32_t contLen = tSerializeSEpSet(NULL, 0, pNewEpSet);
|
||||
|
||||
rsp.pCont = rpcMallocCont(contLen);
|
||||
|
@ -88,6 +88,7 @@ static void dmProcessRpcMsg(SDnode *pDnode, SRpcMsg *pRpc, SEpSet *pEpSet) {
|
|||
case TDMT_MND_SYSTABLE_RETRIEVE_RSP:
|
||||
case TDMT_DND_SYSTABLE_RETRIEVE_RSP:
|
||||
case TDMT_SCH_FETCH_RSP:
|
||||
case TDMT_SCH_MERGE_FETCH_RSP:
|
||||
qWorkerProcessFetchRsp(NULL, NULL, pRpc, 0);
|
||||
return;
|
||||
case TDMT_MND_STATUS_RSP:
|
||||
|
@ -253,7 +254,7 @@ static inline void dmReleaseHandle(SRpcHandleInfo *pHandle, int8_t type) {
|
|||
static bool rpcRfp(int32_t code, tmsg_t msgType) {
|
||||
if (code == TSDB_CODE_RPC_REDIRECT || code == TSDB_CODE_RPC_NETWORK_UNAVAIL || code == TSDB_CODE_NODE_NOT_DEPLOYED ||
|
||||
code == TSDB_CODE_SYN_NOT_LEADER || code == TSDB_CODE_APP_NOT_READY || code == TSDB_CODE_RPC_BROKEN_LINK) {
|
||||
if (msgType == TDMT_SCH_QUERY || msgType == TDMT_SCH_MERGE_QUERY || msgType == TDMT_SCH_FETCH) {
|
||||
if (msgType == TDMT_SCH_QUERY || msgType == TDMT_SCH_MERGE_QUERY || msgType == TDMT_SCH_FETCH || msgType == TDMT_SCH_MERGE_FETCH) {
|
||||
return false;
|
||||
}
|
||||
return true;
|
||||
|
|
|
@ -531,7 +531,8 @@ static int32_t mndCheckMnodeState(SRpcMsg *pMsg) {
|
|||
if (!IsReq(pMsg)) return 0;
|
||||
if (pMsg->msgType == TDMT_SCH_QUERY || pMsg->msgType == TDMT_SCH_MERGE_QUERY ||
|
||||
pMsg->msgType == TDMT_SCH_QUERY_CONTINUE || pMsg->msgType == TDMT_SCH_QUERY_HEARTBEAT ||
|
||||
pMsg->msgType == TDMT_SCH_FETCH || pMsg->msgType == TDMT_SCH_DROP_TASK) {
|
||||
pMsg->msgType == TDMT_SCH_FETCH || pMsg->msgType == TDMT_SCH_MERGE_FETCH ||
|
||||
pMsg->msgType == TDMT_SCH_DROP_TASK) {
|
||||
return 0;
|
||||
}
|
||||
if (mndAcquireRpcRef(pMsg->info.node) == 0) return 0;
|
||||
|
|
|
@ -45,6 +45,7 @@ int32_t mndProcessQueryMsg(SRpcMsg *pMsg) {
|
|||
code = qWorkerProcessCQueryMsg(&handle, pMnode->pQuery, pMsg, 0);
|
||||
break;
|
||||
case TDMT_SCH_FETCH:
|
||||
case TDMT_SCH_MERGE_FETCH:
|
||||
code = qWorkerProcessFetchMsg(pMnode, pMnode->pQuery, pMsg, 0);
|
||||
break;
|
||||
case TDMT_SCH_DROP_TASK:
|
||||
|
@ -72,6 +73,7 @@ int32_t mndInitQuery(SMnode *pMnode) {
|
|||
mndSetMsgHandle(pMnode, TDMT_SCH_MERGE_QUERY, mndProcessQueryMsg);
|
||||
mndSetMsgHandle(pMnode, TDMT_SCH_QUERY_CONTINUE, mndProcessQueryMsg);
|
||||
mndSetMsgHandle(pMnode, TDMT_SCH_FETCH, mndProcessQueryMsg);
|
||||
mndSetMsgHandle(pMnode, TDMT_SCH_MERGE_FETCH, mndProcessQueryMsg);
|
||||
mndSetMsgHandle(pMnode, TDMT_SCH_DROP_TASK, mndProcessQueryMsg);
|
||||
mndSetMsgHandle(pMnode, TDMT_SCH_QUERY_HEARTBEAT, mndProcessQueryMsg);
|
||||
|
||||
|
|
|
@ -86,6 +86,7 @@ int32_t qndProcessQueryMsg(SQnode *pQnode, int64_t ts, SRpcMsg *pMsg) {
|
|||
code = qWorkerProcessCQueryMsg(&handle, pQnode->pQuery, pMsg, ts);
|
||||
break;
|
||||
case TDMT_SCH_FETCH:
|
||||
case TDMT_SCH_MERGE_FETCH:
|
||||
code = qWorkerProcessFetchMsg(pQnode, pQnode->pQuery, pMsg, ts);
|
||||
break;
|
||||
case TDMT_SCH_FETCH_RSP:
|
||||
|
|
|
@ -51,15 +51,7 @@ int32_t vnodeCreate(const char *path, SVnodeCfg *pCfg, STfs *pTfs);
|
|||
void vnodeDestroy(const char *path, STfs *pTfs);
|
||||
SVnode *vnodeOpen(const char *path, STfs *pTfs, SMsgCb msgCb);
|
||||
void vnodeClose(SVnode *pVnode);
|
||||
int32_t vnodePreProcessReq(SVnode *pVnode, SRpcMsg *pMsg);
|
||||
int32_t vnodeProcessWriteReq(SVnode *pVnode, SRpcMsg *pMsg, int64_t version, SRpcMsg *pRsp);
|
||||
int32_t vnodeProcessCMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp);
|
||||
int32_t vnodeProcessSyncMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp);
|
||||
int32_t vnodePreprocessQueryMsg(SVnode *pVnode, SRpcMsg *pMsg);
|
||||
int32_t vnodeProcessQueryMsg(SVnode *pVnode, SRpcMsg *pMsg);
|
||||
int32_t vnodeProcessFetchMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo);
|
||||
int32_t vnodeGetLoad(SVnode *pVnode, SVnodeLoad *pLoad);
|
||||
int32_t vnodeValidateTableHash(SVnode *pVnode, char *tableFName);
|
||||
|
||||
int32_t vnodeStart(SVnode *pVnode);
|
||||
void vnodeStop(SVnode *pVnode);
|
||||
int64_t vnodeGetSyncHandle(SVnode *pVnode);
|
||||
|
@ -68,14 +60,25 @@ void vnodeGetInfo(SVnode *pVnode, const char **dbname, int32_t *vgId);
|
|||
int32_t vnodeSnapshotReaderOpen(SVnode *pVnode, SVSnapshotReader **ppReader, int64_t sver, int64_t ever);
|
||||
int32_t vnodeSnapshotReaderClose(SVSnapshotReader *pReader);
|
||||
int32_t vnodeSnapshotRead(SVSnapshotReader *pReader, const void **ppData, uint32_t *nData);
|
||||
|
||||
int32_t vnodeProcessCreateTSma(SVnode *pVnode, void *pCont, uint32_t contLen);
|
||||
int32_t vnodeGetAllTableList(SVnode *pVnode, uint64_t uid, SArray *list);
|
||||
int32_t vnodeGetCtbIdList(SVnode *pVnode, int64_t suid, SArray *list);
|
||||
void *vnodeGetIdx(SVnode *pVnode);
|
||||
void *vnodeGetIvtIdx(SVnode *pVnode);
|
||||
|
||||
void vnodeProposeMsg(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs);
|
||||
void vnodeApplyMsg(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs);
|
||||
int32_t vnodeGetLoad(SVnode *pVnode, SVnodeLoad *pLoad);
|
||||
int32_t vnodeValidateTableHash(SVnode *pVnode, char *tableFName);
|
||||
|
||||
int32_t vnodePreProcessWriteMsg(SVnode *pVnode, SRpcMsg *pMsg);
|
||||
int32_t vnodePreprocessQueryMsg(SVnode *pVnode, SRpcMsg *pMsg);
|
||||
|
||||
int32_t vnodeProcessWriteMsg(SVnode *pVnode, SRpcMsg *pMsg, int64_t version, SRpcMsg *pRsp);
|
||||
int32_t vnodeProcessSyncMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp);
|
||||
int32_t vnodeProcessQueryMsg(SVnode *pVnode, SRpcMsg *pMsg);
|
||||
int32_t vnodeProcessFetchMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo);
|
||||
void vnodeProposeWriteMsg(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs);
|
||||
void vnodeApplyWriteMsg(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs);
|
||||
|
||||
// meta
|
||||
typedef struct SMeta SMeta; // todo: remove
|
||||
|
|
|
@ -94,6 +94,8 @@ int32_t vnodeAsyncCommit(SVnode* pVnode);
|
|||
int32_t vnodeSyncOpen(SVnode* pVnode, char* path);
|
||||
void vnodeSyncStart(SVnode* pVnode);
|
||||
void vnodeSyncClose(SVnode* pVnode);
|
||||
void vnodeRedirectRpcMsg(SVnode* pVnode, SRpcMsg* pMsg);
|
||||
bool vnodeIsLeader(SVnode* pVnode);
|
||||
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
|
|
|
@ -77,7 +77,7 @@ int32_t tsdbLastRowReaderOpen(void* pVnode, int32_t type, SArray* pTableIdList,
|
|||
p->pTableList = pTableIdList;
|
||||
|
||||
for (int32_t i = 0; i < p->numOfCols; ++i) {
|
||||
if (IS_VAR_DATA_TYPE(colId[i])) {
|
||||
if (IS_VAR_DATA_TYPE(p->pSchema->columns[i].type)) {
|
||||
p->transferBuf[i] = taosMemoryMalloc(p->pSchema->columns[i].bytes);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -28,7 +28,7 @@ static int32_t vnodeProcessAlterHasnRangeReq(SVnode *pVnode, int64_t version, vo
|
|||
static int32_t vnodeProcessDropTtlTbReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp);
|
||||
static int32_t vnodeProcessDeleteReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp);
|
||||
|
||||
int32_t vnodePreProcessReq(SVnode *pVnode, SRpcMsg *pMsg) {
|
||||
int32_t vnodePreProcessWriteMsg(SVnode *pVnode, SRpcMsg *pMsg) {
|
||||
int32_t code = 0;
|
||||
SDecoder dc = {0};
|
||||
|
||||
|
@ -133,7 +133,7 @@ _err:
|
|||
return code;
|
||||
}
|
||||
|
||||
int32_t vnodeProcessWriteReq(SVnode *pVnode, SRpcMsg *pMsg, int64_t version, SRpcMsg *pRsp) {
|
||||
int32_t vnodeProcessWriteMsg(SVnode *pVnode, SRpcMsg *pMsg, int64_t version, SRpcMsg *pRsp) {
|
||||
void *ptr = NULL;
|
||||
void *pReq;
|
||||
int32_t len;
|
||||
|
@ -169,7 +169,7 @@ int32_t vnodeProcessWriteReq(SVnode *pVnode, SRpcMsg *pMsg, int64_t version, SRp
|
|||
if (vnodeProcessDropTbReq(pVnode, version, pReq, len, pRsp) < 0) goto _err;
|
||||
break;
|
||||
case TDMT_VND_DROP_TTL_TABLE:
|
||||
//if (vnodeProcessDropTtlTbReq(pVnode, version, pReq, len, pRsp) < 0) goto _err;
|
||||
if (vnodeProcessDropTtlTbReq(pVnode, version, pReq, len, pRsp) < 0) goto _err;
|
||||
break;
|
||||
case TDMT_VND_CREATE_SMA: {
|
||||
if (vnodeProcessCreateTSmaReq(pVnode, version, pReq, len, pRsp) < 0) goto _err;
|
||||
|
@ -261,6 +261,11 @@ int32_t vnodePreprocessQueryMsg(SVnode *pVnode, SRpcMsg *pMsg) {
|
|||
|
||||
int32_t vnodeProcessQueryMsg(SVnode *pVnode, SRpcMsg *pMsg) {
|
||||
vTrace("message in vnode query queue is processing");
|
||||
if ((pMsg->msgType == TDMT_SCH_QUERY) && !vnodeIsLeader(pVnode)) {
|
||||
vnodeRedirectRpcMsg(pVnode, pMsg);
|
||||
return 0;
|
||||
}
|
||||
|
||||
SReadHandle handle = {.meta = pVnode->pMeta, .config = &pVnode->config, .vnode = pVnode, .pMsgCb = &pVnode->msgCb};
|
||||
switch (pMsg->msgType) {
|
||||
case TDMT_SCH_QUERY:
|
||||
|
@ -276,11 +281,18 @@ int32_t vnodeProcessQueryMsg(SVnode *pVnode, SRpcMsg *pMsg) {
|
|||
|
||||
int32_t vnodeProcessFetchMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo) {
|
||||
vTrace("message in fetch queue is processing");
|
||||
if ((pMsg->msgType == TDMT_SCH_FETCH || pMsg->msgType == TDMT_VND_TABLE_META || pMsg->msgType == TDMT_VND_TABLE_CFG)
|
||||
&& !vnodeIsLeader(pVnode)) {
|
||||
vnodeRedirectRpcMsg(pVnode, pMsg);
|
||||
return 0;
|
||||
}
|
||||
|
||||
char *msgstr = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
|
||||
int32_t msgLen = pMsg->contLen - sizeof(SMsgHead);
|
||||
|
||||
switch (pMsg->msgType) {
|
||||
case TDMT_SCH_FETCH:
|
||||
case TDMT_SCH_MERGE_FETCH:
|
||||
return qWorkerProcessFetchMsg(pVnode, pVnode->pQuery, pMsg, 0);
|
||||
case TDMT_SCH_FETCH_RSP:
|
||||
return qWorkerProcessFetchRsp(pVnode, pVnode->pQuery, pMsg, 0);
|
||||
|
|
|
@ -120,7 +120,24 @@ static int32_t vnodeProcessAlterReplicaReq(SVnode *pVnode, SRpcMsg *pMsg) {
|
|||
return code;
|
||||
}
|
||||
|
||||
void vnodeProposeMsg(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) {
|
||||
void vnodeRedirectRpcMsg(SVnode *pVnode, SRpcMsg *pMsg) {
|
||||
SEpSet newEpSet = {0};
|
||||
syncGetRetryEpSet(pVnode->sync, &newEpSet);
|
||||
|
||||
const STraceId *trace = &pMsg->info.traceId;
|
||||
vGTrace("vgId:%d, msg:%p is redirect since not leader, numOfEps:%d inUse:%d", pVnode->config.vgId, pMsg,
|
||||
newEpSet.numOfEps, newEpSet.inUse);
|
||||
for (int32_t i = 0; i < newEpSet.numOfEps; ++i) {
|
||||
vGTrace("vgId:%d, msg:%p redirect:%d ep:%s:%u", pVnode->config.vgId, pMsg, i, newEpSet.eps[i].fqdn,
|
||||
newEpSet.eps[i].port);
|
||||
}
|
||||
pMsg->info.hasEpSet = 1;
|
||||
|
||||
SRpcMsg rsp = {.code = TSDB_CODE_RPC_REDIRECT, .info = pMsg->info, .msgType = pMsg->msgType + 1};
|
||||
tmsgSendRedirectRsp(&rsp, &newEpSet);
|
||||
}
|
||||
|
||||
void vnodeProposeWriteMsg(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) {
|
||||
SVnode *pVnode = pInfo->ahandle;
|
||||
int32_t vgId = pVnode->config.vgId;
|
||||
int32_t code = 0;
|
||||
|
@ -131,7 +148,7 @@ void vnodeProposeMsg(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) {
|
|||
const STraceId *trace = &pMsg->info.traceId;
|
||||
vGTrace("vgId:%d, msg:%p get from vnode-write queue handle:%p", vgId, pMsg, pMsg->info.handle);
|
||||
|
||||
code = vnodePreProcessReq(pVnode, pMsg);
|
||||
code = vnodePreProcessWriteMsg(pVnode, pMsg);
|
||||
if (code != 0) {
|
||||
vError("vgId:%d, msg:%p failed to pre-process since %s", vgId, pMsg, terrstr());
|
||||
} else {
|
||||
|
@ -141,7 +158,7 @@ void vnodeProposeMsg(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) {
|
|||
code = syncPropose(pVnode->sync, pMsg, vnodeIsMsgWeak(pMsg->msgType));
|
||||
if (code > 0) {
|
||||
SRpcMsg rsp = {.code = pMsg->code, .info = pMsg->info};
|
||||
if (vnodeProcessWriteReq(pVnode, pMsg, pMsg->info.conn.applyIndex, &rsp) < 0) {
|
||||
if (vnodeProcessWriteMsg(pVnode, pMsg, pMsg->info.conn.applyIndex, &rsp) < 0) {
|
||||
rsp.code = terrno;
|
||||
vError("vgId:%d, msg:%p failed to apply right now since %s", vgId, pMsg, terrstr());
|
||||
}
|
||||
|
@ -156,16 +173,7 @@ void vnodeProposeMsg(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) {
|
|||
vnodeAccumBlockMsg(pVnode, pMsg->msgType);
|
||||
} else if (code < 0) {
|
||||
if (terrno == TSDB_CODE_SYN_NOT_LEADER) {
|
||||
SEpSet newEpSet = {0};
|
||||
syncGetRetryEpSet(pVnode->sync, &newEpSet);
|
||||
vGTrace("vgId:%d, msg:%p is redirect since not leader, numOfEps:%d inUse:%d", vgId, pMsg, newEpSet.numOfEps,
|
||||
newEpSet.inUse);
|
||||
for (int32_t i = 0; i < newEpSet.numOfEps; ++i) {
|
||||
vGTrace("vgId:%d, msg:%p redirect:%d ep:%s:%u", vgId, pMsg, i, newEpSet.eps[i].fqdn, newEpSet.eps[i].port);
|
||||
}
|
||||
pMsg->info.hasEpSet = 1;
|
||||
SRpcMsg rsp = {.code = TSDB_CODE_RPC_REDIRECT, .info = pMsg->info};
|
||||
tmsgSendRedirectRsp(&rsp, &newEpSet);
|
||||
vnodeRedirectRpcMsg(pVnode, pMsg);
|
||||
} else {
|
||||
if (terrno != 0) code = terrno;
|
||||
vError("vgId:%d, msg:%p failed to propose since %s, code:0x%x", vgId, pMsg, tstrerror(code), code);
|
||||
|
@ -185,7 +193,7 @@ void vnodeProposeMsg(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) {
|
|||
vnodeWaitBlockMsg(pVnode);
|
||||
}
|
||||
|
||||
void vnodeApplyMsg(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) {
|
||||
void vnodeApplyWriteMsg(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) {
|
||||
SVnode *pVnode = pInfo->ahandle;
|
||||
int32_t vgId = pVnode->config.vgId;
|
||||
int32_t code = 0;
|
||||
|
@ -199,7 +207,7 @@ void vnodeApplyMsg(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) {
|
|||
|
||||
SRpcMsg rsp = {.code = pMsg->code, .info = pMsg->info};
|
||||
if (rsp.code == 0) {
|
||||
if (vnodeProcessWriteReq(pVnode, pMsg, pMsg->info.conn.applyIndex, &rsp) < 0) {
|
||||
if (vnodeProcessWriteMsg(pVnode, pMsg, pMsg->info.conn.applyIndex, &rsp) < 0) {
|
||||
rsp.code = terrno;
|
||||
vError("vgId:%d, msg:%p failed to apply since %s", vgId, pMsg, terrstr());
|
||||
}
|
||||
|
@ -265,6 +273,11 @@ int32_t vnodeProcessSyncMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) {
|
|||
ASSERT(pSyncMsg != NULL);
|
||||
code = syncNodeOnClientRequestCb(pSyncNode, pSyncMsg, NULL);
|
||||
syncClientRequestDestroy(pSyncMsg);
|
||||
} else if (pMsg->msgType == TDMT_SYNC_CLIENT_REQUEST_BATCH) {
|
||||
SyncClientRequestBatch *pSyncMsg = syncClientRequestBatchFromRpcMsg(pMsg);
|
||||
ASSERT(pSyncMsg != NULL);
|
||||
code = syncNodeOnClientRequestBatchCb(pSyncNode, pSyncMsg);
|
||||
syncClientRequestBatchDestroyDeep(pSyncMsg);
|
||||
} else if (pMsg->msgType == TDMT_SYNC_REQUEST_VOTE) {
|
||||
SyncRequestVote *pSyncMsg = syncRequestVoteFromRpcMsg2(pMsg);
|
||||
ASSERT(pSyncMsg != NULL);
|
||||
|
@ -508,3 +521,17 @@ void vnodeSyncStart(SVnode *pVnode) {
|
|||
}
|
||||
|
||||
void vnodeSyncClose(SVnode *pVnode) { syncStop(pVnode->sync); }
|
||||
|
||||
bool vnodeIsLeader(SVnode *pVnode) {
|
||||
if (!syncIsReady(pVnode->sync)) {
|
||||
return false;
|
||||
}
|
||||
|
||||
// todo
|
||||
// if (!pVnode->restored) {
|
||||
// terrno = TSDB_CODE_APP_NOT_READY;
|
||||
// return false;
|
||||
// }
|
||||
|
||||
return true;
|
||||
}
|
|
@ -2053,7 +2053,7 @@ static int32_t doSendFetchDataRequest(SExchangeInfo* pExchangeInfo, SExecTaskInf
|
|||
pMsgSendInfo->param = pWrapper;
|
||||
pMsgSendInfo->msgInfo.pData = pMsg;
|
||||
pMsgSendInfo->msgInfo.len = sizeof(SResFetchReq);
|
||||
pMsgSendInfo->msgType = TDMT_SCH_FETCH;
|
||||
pMsgSendInfo->msgType = pSource->fetchMsgType;
|
||||
pMsgSendInfo->fp = loadRemoteDataCallback;
|
||||
|
||||
int64_t transporterId = 0;
|
||||
|
|
|
@ -549,7 +549,7 @@ int32_t udfdLoadUdf(char *udfName, SUdf *udf) {
|
|||
static bool udfdRpcRfp(int32_t code, tmsg_t msgType) {
|
||||
if (code == TSDB_CODE_RPC_REDIRECT || code == TSDB_CODE_RPC_NETWORK_UNAVAIL || code == TSDB_CODE_NODE_NOT_DEPLOYED ||
|
||||
code == TSDB_CODE_SYN_NOT_LEADER || code == TSDB_CODE_APP_NOT_READY || code == TSDB_CODE_RPC_BROKEN_LINK) {
|
||||
if (msgType == TDMT_SCH_QUERY || msgType == TDMT_SCH_MERGE_QUERY || msgType == TDMT_SCH_FETCH) {
|
||||
if (msgType == TDMT_SCH_QUERY || msgType == TDMT_SCH_MERGE_QUERY || msgType == TDMT_SCH_FETCH || msgType == TDMT_SCH_MERGE_FETCH) {
|
||||
return false;
|
||||
}
|
||||
return true;
|
||||
|
|
|
@ -595,6 +595,7 @@ static int32_t downstreamSourceCopy(const SDownstreamSourceNode* pSrc, SDownstre
|
|||
COPY_SCALAR_FIELD(taskId);
|
||||
COPY_SCALAR_FIELD(schedId);
|
||||
COPY_SCALAR_FIELD(execId);
|
||||
COPY_SCALAR_FIELD(fetchMsgType);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
|
|
|
@ -3538,6 +3538,7 @@ static const char* jkDownstreamSourceAddr = "Addr";
|
|||
static const char* jkDownstreamSourceTaskId = "TaskId";
|
||||
static const char* jkDownstreamSourceSchedId = "SchedId";
|
||||
static const char* jkDownstreamSourceExecId = "ExecId";
|
||||
static const char* jkDownstreamSourceFetchMsgType = "FetchMsgType";
|
||||
|
||||
static int32_t downstreamSourceNodeToJson(const void* pObj, SJson* pJson) {
|
||||
const SDownstreamSourceNode* pNode = (const SDownstreamSourceNode*)pObj;
|
||||
|
@ -3552,6 +3553,9 @@ static int32_t downstreamSourceNodeToJson(const void* pObj, SJson* pJson) {
|
|||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = tjsonAddIntegerToObject(pJson, jkDownstreamSourceExecId, pNode->execId);
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = tjsonAddIntegerToObject(pJson, jkDownstreamSourceFetchMsgType, pNode->fetchMsgType);
|
||||
}
|
||||
|
||||
return code;
|
||||
}
|
||||
|
@ -3569,6 +3573,9 @@ static int32_t jsonToDownstreamSourceNode(const SJson* pJson, void* pObj) {
|
|||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = tjsonGetIntValue(pJson, jkDownstreamSourceExecId, &pNode->execId);
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = tjsonGetIntValue(pJson, jkDownstreamSourceFetchMsgType, &pNode->fetchMsgType);
|
||||
}
|
||||
|
||||
return code;
|
||||
}
|
||||
|
|
|
@ -1067,6 +1067,10 @@ static int32_t createWindowPhysiNodeFinalize(SPhysiPlanContext* pCxt, SNodeList*
|
|||
}
|
||||
}
|
||||
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = setConditionsSlotId(pCxt, (const SLogicNode*)pWindowLogicNode, (SPhysiNode*)pWindow);
|
||||
}
|
||||
|
||||
pWindow->triggerType = pWindowLogicNode->triggerType;
|
||||
pWindow->watermark = pWindowLogicNode->watermark;
|
||||
pWindow->igExpired = pWindowLogicNode->igExpired;
|
||||
|
|
|
@ -265,7 +265,6 @@ int32_t dataConverToStr(char* str, int type, void* buf, int32_t bufSize, int32_t
|
|||
break;
|
||||
|
||||
case TSDB_DATA_TYPE_BINARY:
|
||||
case TSDB_DATA_TYPE_NCHAR:
|
||||
if (bufSize < 0) {
|
||||
// tscError("invalid buf size");
|
||||
return TSDB_CODE_TSC_INVALID_VALUE;
|
||||
|
@ -276,7 +275,20 @@ int32_t dataConverToStr(char* str, int type, void* buf, int32_t bufSize, int32_t
|
|||
*(str + bufSize + 1) = '"';
|
||||
n = bufSize + 2;
|
||||
break;
|
||||
case TSDB_DATA_TYPE_NCHAR:
|
||||
if (bufSize < 0) {
|
||||
// tscError("invalid buf size");
|
||||
return TSDB_CODE_TSC_INVALID_VALUE;
|
||||
}
|
||||
|
||||
*str = '"';
|
||||
int32_t length = taosUcs4ToMbs((TdUcs4 *)buf, bufSize, str + 1);
|
||||
if (length <= 0) {
|
||||
return TSDB_CODE_TSC_INVALID_VALUE;
|
||||
}
|
||||
*(str + length + 1) = '"';
|
||||
n = length + 2;
|
||||
break;
|
||||
case TSDB_DATA_TYPE_UTINYINT:
|
||||
n = sprintf(str, "%d", *(uint8_t*)buf);
|
||||
break;
|
||||
|
@ -298,7 +310,7 @@ int32_t dataConverToStr(char* str, int type, void* buf, int32_t bufSize, int32_t
|
|||
return TSDB_CODE_TSC_INVALID_VALUE;
|
||||
}
|
||||
|
||||
*len = n;
|
||||
if(len) *len = n;
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
|
|
@ -123,6 +123,7 @@ typedef struct SQWTaskCtx {
|
|||
int8_t taskType;
|
||||
int8_t explain;
|
||||
int32_t queryType;
|
||||
int32_t fetchType;
|
||||
int32_t execId;
|
||||
|
||||
bool queryFetched;
|
||||
|
|
|
@ -35,8 +35,7 @@ int32_t qwProcessDelete(QW_FPARAMS_DEF, SQWMsg *qwMsg, SDeleteRes *pRes);
|
|||
|
||||
int32_t qwBuildAndSendDropRsp(SRpcHandleInfo *pConn, int32_t code);
|
||||
int32_t qwBuildAndSendCancelRsp(SRpcHandleInfo *pConn, int32_t code);
|
||||
int32_t qwBuildAndSendFetchRsp(SRpcHandleInfo *pConn, SRetrieveTableRsp *pRsp, int32_t dataLength,
|
||||
int32_t code);
|
||||
int32_t qwBuildAndSendFetchRsp(int32_t rspType, SRpcHandleInfo *pConn, SRetrieveTableRsp *pRsp, int32_t dataLength, int32_t code);
|
||||
void qwBuildFetchRsp(void *msg, SOutputData *input, int32_t len, bool qComplete);
|
||||
int32_t qwBuildAndSendCQueryMsg(QW_FPARAMS_DEF, SRpcHandleInfo *pConn);
|
||||
int32_t qwBuildAndSendQueryRsp(int32_t rspType, SRpcHandleInfo *pConn, int32_t code, STbVerInfo* tbInfo);
|
||||
|
|
|
@ -104,7 +104,7 @@ int32_t qwBuildAndSendHbRsp(SRpcHandleInfo *pConn, SSchedulerHbRsp *pStatus, int
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t qwBuildAndSendFetchRsp(SRpcHandleInfo *pConn, SRetrieveTableRsp *pRsp, int32_t dataLength, int32_t code) {
|
||||
int32_t qwBuildAndSendFetchRsp(int32_t rspType, SRpcHandleInfo *pConn, SRetrieveTableRsp *pRsp, int32_t dataLength, int32_t code) {
|
||||
if (NULL == pRsp) {
|
||||
pRsp = (SRetrieveTableRsp *)rpcMallocCont(sizeof(SRetrieveTableRsp));
|
||||
memset(pRsp, 0, sizeof(SRetrieveTableRsp));
|
||||
|
@ -112,7 +112,7 @@ int32_t qwBuildAndSendFetchRsp(SRpcHandleInfo *pConn, SRetrieveTableRsp *pRsp, i
|
|||
}
|
||||
|
||||
SRpcMsg rpcRsp = {
|
||||
.msgType = TDMT_SCH_FETCH_RSP,
|
||||
.msgType = rspType,
|
||||
.pCont = pRsp,
|
||||
.contLen = sizeof(*pRsp) + dataLength,
|
||||
.code = code,
|
||||
|
@ -436,7 +436,7 @@ int32_t qWorkerProcessFetchMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, int
|
|||
int64_t rId = 0;
|
||||
int32_t eId = msg->execId;
|
||||
|
||||
SQWMsg qwMsg = {.node = node, .msg = NULL, .msgLen = 0, .connInfo = pMsg->info};
|
||||
SQWMsg qwMsg = {.node = node, .msg = NULL, .msgLen = 0, .connInfo = pMsg->info, .msgType = pMsg->msgType};
|
||||
|
||||
QW_SCH_TASK_DLOG("processFetch start, node:%p, handle:%p", node, pMsg->info.handle);
|
||||
|
||||
|
|
|
@ -606,7 +606,7 @@ int32_t qwProcessCQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg) {
|
|||
qwMsg->connInfo = ctx->dataConnInfo;
|
||||
QW_SET_EVENT_PROCESSED(ctx, QW_EVENT_FETCH);
|
||||
|
||||
qwBuildAndSendFetchRsp(&qwMsg->connInfo, rsp, dataLen, code);
|
||||
qwBuildAndSendFetchRsp(ctx->fetchType, &qwMsg->connInfo, rsp, dataLen, code);
|
||||
rsp = NULL;
|
||||
|
||||
QW_TASK_DLOG("fetch rsp send, handle:%p, code:%x - %s, dataLen:%d", qwMsg->connInfo.handle, code,
|
||||
|
@ -628,7 +628,7 @@ int32_t qwProcessCQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg) {
|
|||
rsp = NULL;
|
||||
|
||||
qwMsg->connInfo = ctx->dataConnInfo;
|
||||
qwBuildAndSendFetchRsp(&qwMsg->connInfo, NULL, 0, code);
|
||||
qwBuildAndSendFetchRsp(ctx->fetchType, &qwMsg->connInfo, NULL, 0, code);
|
||||
QW_TASK_DLOG("fetch rsp send, handle:%p, code:%x - %s, dataLen:%d", qwMsg->connInfo.handle, code, tstrerror(code),
|
||||
0);
|
||||
}
|
||||
|
@ -661,6 +661,8 @@ int32_t qwProcessFetch(QW_FPARAMS_DEF, SQWMsg *qwMsg) {
|
|||
|
||||
QW_ERR_JRET(qwGetTaskCtx(QW_FPARAMS(), &ctx));
|
||||
|
||||
ctx->queryType = qwMsg->msgType;
|
||||
|
||||
SOutputData sOutput = {0};
|
||||
QW_ERR_JRET(qwGetQueryResFromSink(QW_FPARAMS(), ctx, &dataLen, &rsp, &sOutput));
|
||||
|
||||
|
@ -711,7 +713,7 @@ _return:
|
|||
}
|
||||
|
||||
if (code || rsp) {
|
||||
qwBuildAndSendFetchRsp(&qwMsg->connInfo, rsp, dataLen, code);
|
||||
qwBuildAndSendFetchRsp(qwMsg->msgType + 1, &qwMsg->connInfo, rsp, dataLen, code);
|
||||
QW_TASK_DLOG("fetch rsp send, handle:%p, code:%x - %s, dataLen:%d", qwMsg->connInfo.handle, code, tstrerror(code),
|
||||
dataLen);
|
||||
}
|
||||
|
|
|
@ -214,7 +214,8 @@ void qwtRpcSendResponse(const SRpcMsg *pRsp) {
|
|||
rpcFreeCont(rsp);
|
||||
break;
|
||||
}
|
||||
case TDMT_SCH_FETCH_RSP: {
|
||||
case TDMT_SCH_FETCH_RSP:
|
||||
case TDMT_SCH_MERGE_FETCH_RSP: {
|
||||
SRetrieveTableRsp *rsp = (SRetrieveTableRsp *)pRsp->pCont;
|
||||
|
||||
if (0 == pRsp->code && 0 == rsp->completed) {
|
||||
|
@ -815,6 +816,7 @@ void *fetchQueueThread(void *param) {
|
|||
|
||||
switch (fetchRpc->msgType) {
|
||||
case TDMT_SCH_FETCH:
|
||||
case TDMT_SCH_MERGE_FETCH:
|
||||
qWorkerProcessFetchMsg(mockPointer, mgmt, fetchRpc, 0);
|
||||
break;
|
||||
case TDMT_SCH_CANCEL_TASK:
|
||||
|
|
|
@ -35,7 +35,7 @@ extern "C" {
|
|||
#define SCH_DEFAULT_TASK_TIMEOUT_USEC 10000000
|
||||
#define SCH_MAX_TASK_TIMEOUT_USEC 60000000
|
||||
|
||||
#define SCH_TASK_MAX_EXEC_TIMES 5
|
||||
#define SCH_TASK_MAX_EXEC_TIMES 8
|
||||
#define SCH_MAX_CANDIDATE_EP_NUM TSDB_MAX_REPLICA
|
||||
|
||||
enum {
|
||||
|
@ -318,6 +318,7 @@ extern SSchedulerMgmt schMgmt;
|
|||
#define SCH_SET_JOB_NEED_FLOW_CTRL(_job) (_job)->attr.needFlowCtrl = true
|
||||
#define SCH_JOB_NEED_FLOW_CTRL(_job) ((_job)->attr.needFlowCtrl)
|
||||
#define SCH_TASK_NEED_FLOW_CTRL(_job, _task) (SCH_IS_DATA_SRC_QRY_TASK(_task) && SCH_JOB_NEED_FLOW_CTRL(_job) && SCH_IS_LEVEL_UNFINISHED((_task)->level))
|
||||
#define SCH_FETCH_TYPE(_pSrcTask) (SCH_IS_DATA_SRC_QRY_TASK(_pSrcTask) ? TDMT_SCH_FETCH : TDMT_SCH_MERGE_FETCH)
|
||||
|
||||
#define SCH_SET_JOB_TYPE(_job, type) do { if ((type) != SUBPLAN_TYPE_MODIFY) { (_job)->attr.queryJob = true; } } while (0)
|
||||
#define SCH_IS_QUERY_JOB(_job) ((_job)->attr.queryJob)
|
||||
|
@ -327,7 +328,7 @@ extern SSchedulerMgmt schMgmt;
|
|||
#define SCH_IS_EXPLAIN_JOB(_job) (EXPLAIN_MODE_ANALYZE == (_job)->attr.explainMode)
|
||||
#define SCH_NETWORK_ERR(_code) ((_code) == TSDB_CODE_RPC_BROKEN_LINK || (_code) == TSDB_CODE_RPC_NETWORK_UNAVAIL)
|
||||
#define SCH_SUB_TASK_NETWORK_ERR(_code, _len) (SCH_NETWORK_ERR(_code) && ((_len) > 0))
|
||||
#define SCH_NEED_REDIRECT_MSGTYPE(_msgType) ((_msgType) == TDMT_SCH_QUERY || (_msgType) == TDMT_SCH_MERGE_QUERY || (_msgType) == TDMT_SCH_FETCH)
|
||||
#define SCH_NEED_REDIRECT_MSGTYPE(_msgType) ((_msgType) == TDMT_SCH_QUERY || (_msgType) == TDMT_SCH_MERGE_QUERY || (_msgType) == TDMT_SCH_FETCH || (_msgType) == TDMT_SCH_MERGE_FETCH)
|
||||
#define SCH_NEED_REDIRECT(_msgType, _code, _rspLen) (SCH_NEED_REDIRECT_MSGTYPE(_msgType) && (NEED_SCHEDULER_REDIRECT_ERROR(_code) || SCH_SUB_TASK_NETWORK_ERR(_code, _rspLen)))
|
||||
#define SCH_NEED_RETRY(_msgType, _code) ((SCH_NETWORK_ERR(_code) && SCH_NEED_REDIRECT_MSGTYPE(_msgType)) || (_code) == TSDB_CODE_SCH_TIMEOUT_ERROR)
|
||||
|
||||
|
|
|
@ -44,6 +44,7 @@ int32_t schValidateReceivedMsgType(SSchJob *pJob, SSchTask *pTask, int32_t msgTy
|
|||
// SCH_SET_TASK_LASTMSG_TYPE(pTask, -1);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
case TDMT_SCH_FETCH_RSP:
|
||||
case TDMT_SCH_MERGE_FETCH_RSP:
|
||||
if (lastMsgType != reqMsgType && -1 != lastMsgType) {
|
||||
SCH_TASK_ELOG("rsp msg type mis-match, last sent msgType:%s, rspType:%s", TMSG_INFO(lastMsgType),
|
||||
TMSG_INFO(msgType));
|
||||
|
@ -304,7 +305,8 @@ int32_t schHandleResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t execId, SDa
|
|||
}
|
||||
break;
|
||||
}
|
||||
case TDMT_SCH_FETCH_RSP: {
|
||||
case TDMT_SCH_FETCH_RSP:
|
||||
case TDMT_SCH_MERGE_FETCH_RSP: {
|
||||
SRetrieveTableRsp *rsp = (SRetrieveTableRsp *)msg;
|
||||
|
||||
SCH_ERR_JRET(rspCode);
|
||||
|
@ -558,6 +560,7 @@ int32_t schGetCallbackFp(int32_t msgType, __async_send_cb_fn_t *fp) {
|
|||
case TDMT_VND_DELETE:
|
||||
case TDMT_SCH_EXPLAIN:
|
||||
case TDMT_SCH_FETCH:
|
||||
case TDMT_SCH_MERGE_FETCH:
|
||||
*fp = schHandleCallback;
|
||||
break;
|
||||
case TDMT_SCH_DROP_TASK:
|
||||
|
@ -1016,7 +1019,8 @@ int32_t schBuildAndSendMsg(SSchJob *pJob, SSchTask *pTask, SQueryNodeAddr *addr,
|
|||
persistHandle = true;
|
||||
break;
|
||||
}
|
||||
case TDMT_SCH_FETCH: {
|
||||
case TDMT_SCH_FETCH:
|
||||
case TDMT_SCH_MERGE_FETCH: {
|
||||
msgSize = sizeof(SResFetchReq);
|
||||
msg = taosMemoryCalloc(1, msgSize);
|
||||
if (NULL == msg) {
|
||||
|
|
|
@ -258,7 +258,9 @@ int32_t schProcessOnTaskSuccess(SSchJob *pJob, SSchTask *pTask) {
|
|||
.taskId = pTask->taskId,
|
||||
.schedId = schMgmt.sId,
|
||||
.execId = pTask->execId,
|
||||
.addr = pTask->succeedAddr};
|
||||
.addr = pTask->succeedAddr,
|
||||
.fetchMsgType = SCH_FETCH_TYPE(pTask),
|
||||
};
|
||||
qSetSubplanExecutionNode(parent->plan, pTask->plan->id.groupId, &source);
|
||||
SCH_UNLOCK(SCH_WRITE, &parent->lock);
|
||||
|
||||
|
@ -818,7 +820,7 @@ int32_t schLaunchFetchTask(SSchJob *pJob) {
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
SCH_ERR_JRET(schBuildAndSendMsg(pJob, pJob->fetchTask, &pJob->resNode, TDMT_SCH_FETCH));
|
||||
SCH_ERR_JRET(schBuildAndSendMsg(pJob, pJob->fetchTask, &pJob->resNode, SCH_FETCH_TYPE(pJob->fetchTask)));
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
|
||||
|
|
|
@ -648,8 +648,6 @@ void setHeartbeatTimerMS(int64_t rid, int32_t hbTimerMS) {
|
|||
}
|
||||
|
||||
int32_t syncPropose(int64_t rid, SRpcMsg* pMsg, bool isWeak) {
|
||||
int32_t ret = 0;
|
||||
|
||||
SSyncNode* pSyncNode = taosAcquireRef(tsNodeRefId, rid);
|
||||
if (pSyncNode == NULL) {
|
||||
taosReleaseRef(tsNodeRefId, rid);
|
||||
|
@ -657,8 +655,8 @@ int32_t syncPropose(int64_t rid, SRpcMsg* pMsg, bool isWeak) {
|
|||
return -1;
|
||||
}
|
||||
ASSERT(rid == pSyncNode->rid);
|
||||
ret = syncNodePropose(pSyncNode, pMsg, isWeak);
|
||||
|
||||
int32_t ret = syncNodePropose(pSyncNode, pMsg, isWeak);
|
||||
taosReleaseRef(tsNodeRefId, pSyncNode->rid);
|
||||
return ret;
|
||||
}
|
||||
|
@ -669,15 +667,14 @@ int32_t syncProposeBatch(int64_t rid, SRpcMsg* pMsgArr, bool* pIsWeakArr, int32_
|
|||
return -1;
|
||||
}
|
||||
|
||||
int32_t ret = 0;
|
||||
SSyncNode* pSyncNode = taosAcquireRef(tsNodeRefId, rid);
|
||||
if (pSyncNode == NULL) {
|
||||
terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
|
||||
return -1;
|
||||
}
|
||||
ASSERT(rid == pSyncNode->rid);
|
||||
ret = syncNodeProposeBatch(pSyncNode, pMsgArr, pIsWeakArr, arrSize);
|
||||
|
||||
int32_t ret = syncNodeProposeBatch(pSyncNode, pMsgArr, pIsWeakArr, arrSize);
|
||||
taosReleaseRef(tsNodeRefId, pSyncNode->rid);
|
||||
return ret;
|
||||
}
|
||||
|
|
|
@ -314,7 +314,6 @@ static int tdbDefaultKeyCmprFn(const void *pKey1, int keyLen1, const void *pKey2
|
|||
|
||||
static int tdbBtreeOpenImpl(SBTree *pBt) {
|
||||
// Try to get the root page of the an existing btree
|
||||
|
||||
SPgno pgno;
|
||||
SPage *pPage;
|
||||
int ret;
|
||||
|
@ -1993,6 +1992,7 @@ int tdbBtcMoveTo(SBTC *pBtc, const void *pKey, int kLen, int *pCRst) {
|
|||
const void *pTKey;
|
||||
int tkLen;
|
||||
|
||||
tdbTrace("ttl moveto, pager:%p, ipage:%d", pPager, pBtc->iPage);
|
||||
if (pBtc->iPage < 0) {
|
||||
// move from a clear cursor
|
||||
ret = tdbPagerFetchPage(pPager, &pBt->root, &(pBtc->pPage), tdbBtreeInitPage,
|
||||
|
|
|
@ -121,9 +121,10 @@ SPager *tdbEnvGetPager(TDB *pDb, const char *fname) {
|
|||
|
||||
hash = tdbCstringHash(fname);
|
||||
ppPager = &pDb->pgrHash[hash % pDb->nPgrHash];
|
||||
tdbTrace("tdbttl getPager1: pager:%p, index:%d, name:%s", *ppPager, hash % pDb->nPgrHash, fname);
|
||||
for (; *ppPager && (strcmp(fname, (*ppPager)->dbFileName) != 0); ppPager = &((*ppPager)->pHashNext)) {
|
||||
}
|
||||
|
||||
tdbTrace("tdbttl getPager2: pager:%p, index:%d, name:%s", *ppPager, hash % pDb->nPgrHash, fname);
|
||||
return *ppPager;
|
||||
}
|
||||
|
||||
|
@ -143,9 +144,12 @@ void tdbEnvAddPager(TDB *pDb, SPager *pPager) {
|
|||
// add to hash
|
||||
hash = tdbCstringHash(pPager->dbFileName);
|
||||
ppPager = &pDb->pgrHash[hash % pDb->nPgrHash];
|
||||
tdbTrace("tdbttl addPager1: pager:%p, index:%d, name:%s", *ppPager, hash % pDb->nPgrHash, pPager->dbFileName);
|
||||
pPager->pHashNext = *ppPager;
|
||||
*ppPager = pPager;
|
||||
|
||||
tdbTrace("tdbttl addPager2: pager:%p, index:%d, name:%s", *ppPager, hash % pDb->nPgrHash, pPager->dbFileName);
|
||||
|
||||
// increase the counter
|
||||
pDb->nPager++;
|
||||
}
|
||||
|
|
|
@ -230,6 +230,7 @@ int tdbPagerCommit(SPager *pPager, TXN *pTxn) {
|
|||
}
|
||||
}
|
||||
|
||||
tdbTrace("tdbttl commit:%p, %d", pPager, pPager->dbOrigSize);
|
||||
pPager->dbOrigSize = pPager->dbFileSize;
|
||||
|
||||
// release the page
|
||||
|
@ -285,6 +286,7 @@ int tdbPagerFetchPage(SPager *pPager, SPgno *ppgno, SPage **ppPage, int (*initPa
|
|||
return -1;
|
||||
}
|
||||
|
||||
tdbTrace("tdbttl fetch pager:%p", pPage->pPager);
|
||||
// init page if need
|
||||
if (!TDB_PAGE_INITIALIZED(pPage)) {
|
||||
ret = tdbPagerInitPage(pPager, pPage, initPage, arg, loadPage);
|
||||
|
@ -363,10 +365,12 @@ static int tdbPagerInitPage(SPager *pPager, SPage *pPage, int (*initPage)(SPage
|
|||
|
||||
pgno = TDB_PAGE_PGNO(pPage);
|
||||
|
||||
tdbTrace("tdbttl init pager:%p, pgno:%d, loadPage:%d, size:%d", pPager, pgno, loadPage, pPager->dbOrigSize);
|
||||
if (loadPage && pgno <= pPager->dbOrigSize) {
|
||||
init = 1;
|
||||
|
||||
nRead = tdbOsPRead(pPager->fd, pPage->pData, pPage->pageSize, ((i64)pPage->pageSize) * (pgno - 1));
|
||||
tdbTrace("tdbttl pager:%p, pgno:%d, nRead:%ld", pPager, pgno, nRead);
|
||||
if (nRead < pPage->pageSize) {
|
||||
ASSERT(0);
|
||||
return -1;
|
||||
|
|
|
@ -33,9 +33,9 @@ class TDTestCase:
|
|||
def init(self, conn, logSql):
|
||||
self.testcasePath = os.path.split(__file__)[0]
|
||||
self.testcaseFilename = os.path.split(__file__)[-1]
|
||||
os.system("rm -rf %s/%s.sql" % (self.testcasePath,self.testcaseFilename))
|
||||
# os.system("rm -rf %s/%s.sql" % (self.testcasePath,self.testcaseFilename))
|
||||
tdLog.debug("start to execute %s" % __file__)
|
||||
tdSql.init(conn.cursor(), logSql)
|
||||
tdSql.init(conn.cursor(), True)
|
||||
|
||||
def run(self):
|
||||
# tdSql.prepare()
|
||||
|
|
|
@ -27,8 +27,7 @@ python3 ./test.py -f 1-insert/table_comment.py
|
|||
python3 ./test.py -f 1-insert/time_range_wise.py
|
||||
python3 ./test.py -f 1-insert/block_wise.py
|
||||
python3 ./test.py -f 1-insert/create_retentions.py
|
||||
|
||||
#python3 ./test.py -f 1-insert/table_param_ttl.py
|
||||
python3 ./test.py -f 1-insert/table_param_ttl.py
|
||||
|
||||
python3 ./test.py -f 2-query/between.py
|
||||
python3 ./test.py -f 2-query/distinct.py
|
||||
|
@ -118,7 +117,7 @@ python3 ./test.py -f 2-query/distribute_agg_avg.py
|
|||
python3 ./test.py -f 2-query/distribute_agg_stddev.py
|
||||
python3 ./test.py -f 2-query/twa.py
|
||||
python3 ./test.py -f 2-query/irate.py
|
||||
python3 ./test.py -f 2-query/and_or_for_byte.py
|
||||
#python3 ./test.py -f 2-query/and_or_for_byte.py
|
||||
|
||||
python3 ./test.py -f 2-query/function_null.py
|
||||
python3 ./test.py -f 2-query/queryQnode.py
|
||||
|
|
|
@ -1 +1 @@
|
|||
Subproject commit 7105027650b51e701cfa1dac11b8fb42d447dd01
|
||||
Subproject commit c9308b04810faa653548db5f07c753a9d00990eb
|
Loading…
Reference in New Issue