fix: msg with schema
This commit is contained in:
parent
3992effb00
commit
609b21647b
|
@ -103,8 +103,8 @@ int32_t create_topic() {
|
||||||
}
|
}
|
||||||
taos_free_result(pRes);
|
taos_free_result(pRes);
|
||||||
|
|
||||||
/*pRes = taos_query(pConn, "create topic topic_ctb_column as abc1");*/
|
pRes = taos_query(pConn, "create topic topic_ctb_column as abc1");
|
||||||
pRes = taos_query(pConn, "create topic topic_ctb_column as select ts, c1, c2, c3 from ct1");
|
/*pRes = taos_query(pConn, "create topic topic_ctb_column as select ts, c1, c2, c3 from ct1");*/
|
||||||
if (taos_errno(pRes) != 0) {
|
if (taos_errno(pRes) != 0) {
|
||||||
printf("failed to create topic topic_ctb_column, reason:%s\n", taos_errstr(pRes));
|
printf("failed to create topic topic_ctb_column, reason:%s\n", taos_errstr(pRes));
|
||||||
return -1;
|
return -1;
|
||||||
|
@ -162,9 +162,10 @@ tmq_t* build_consumer() {
|
||||||
tmq_conf_set(conf, "group.id", "tg2");
|
tmq_conf_set(conf, "group.id", "tg2");
|
||||||
tmq_conf_set(conf, "td.connect.user", "root");
|
tmq_conf_set(conf, "td.connect.user", "root");
|
||||||
tmq_conf_set(conf, "td.connect.pass", "taosdata");
|
tmq_conf_set(conf, "td.connect.pass", "taosdata");
|
||||||
tmq_conf_set(conf, "td.connect.db", "abc1");
|
/*tmq_conf_set(conf, "td.connect.db", "abc1");*/
|
||||||
tmq_conf_set_offset_commit_cb(conf, tmq_commit_cb_print);
|
tmq_conf_set_offset_commit_cb(conf, tmq_commit_cb_print);
|
||||||
tmq_t* tmq = tmq_consumer_new(conf, NULL, 0);
|
tmq_t* tmq = tmq_consumer_new(conf, NULL, 0);
|
||||||
|
assert(tmq);
|
||||||
return tmq;
|
return tmq;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -2052,80 +2052,6 @@ static FORCE_INLINE void* tDecodeSMqSetCVgReq(void* buf, SMqSetCVgReq* pReq) {
|
||||||
return buf;
|
return buf;
|
||||||
}
|
}
|
||||||
|
|
||||||
typedef struct {
|
|
||||||
int64_t leftForVer;
|
|
||||||
int32_t vgId;
|
|
||||||
int32_t epoch;
|
|
||||||
int64_t consumerId;
|
|
||||||
char topicName[TSDB_TOPIC_FNAME_LEN];
|
|
||||||
} SMqCancelConnReq;
|
|
||||||
|
|
||||||
static FORCE_INLINE int32_t tEncodeSMqCancelConnReq(void** buf, const SMqCancelConnReq* pReq) {
|
|
||||||
int32_t tlen = 0;
|
|
||||||
tlen += taosEncodeFixedI64(buf, pReq->leftForVer);
|
|
||||||
tlen += taosEncodeFixedI32(buf, pReq->vgId);
|
|
||||||
tlen += taosEncodeFixedI32(buf, pReq->epoch);
|
|
||||||
tlen += taosEncodeFixedI64(buf, pReq->consumerId);
|
|
||||||
tlen += taosEncodeString(buf, pReq->topicName);
|
|
||||||
return tlen;
|
|
||||||
}
|
|
||||||
|
|
||||||
static FORCE_INLINE void* tDecodeSMqCancelConnReq(void* buf, SMqCancelConnReq* pReq) {
|
|
||||||
buf = taosDecodeFixedI64(buf, &pReq->leftForVer);
|
|
||||||
buf = taosDecodeFixedI32(buf, &pReq->vgId);
|
|
||||||
buf = taosDecodeFixedI32(buf, &pReq->epoch);
|
|
||||||
buf = taosDecodeFixedI64(buf, &pReq->consumerId);
|
|
||||||
buf = taosDecodeStringTo(buf, pReq->topicName);
|
|
||||||
return buf;
|
|
||||||
}
|
|
||||||
|
|
||||||
typedef struct {
|
|
||||||
int8_t reserved;
|
|
||||||
} SMqCancelConnRsp;
|
|
||||||
|
|
||||||
typedef struct {
|
|
||||||
int64_t leftForVer;
|
|
||||||
int32_t vgId;
|
|
||||||
int64_t oldConsumerId;
|
|
||||||
int64_t newConsumerId;
|
|
||||||
char* topic;
|
|
||||||
} SMqMVRebReq;
|
|
||||||
|
|
||||||
static FORCE_INLINE int32_t tEncodeSMqMVRebReq(void** buf, const SMqMVRebReq* pReq) {
|
|
||||||
int32_t tlen = 0;
|
|
||||||
tlen += taosEncodeFixedI64(buf, pReq->leftForVer);
|
|
||||||
tlen += taosEncodeFixedI32(buf, pReq->vgId);
|
|
||||||
tlen += taosEncodeFixedI64(buf, pReq->oldConsumerId);
|
|
||||||
tlen += taosEncodeFixedI64(buf, pReq->newConsumerId);
|
|
||||||
tlen += taosEncodeString(buf, pReq->topic);
|
|
||||||
return tlen;
|
|
||||||
}
|
|
||||||
|
|
||||||
static FORCE_INLINE void* tDecodeSMqMVRebReq(void* buf, SMqMVRebReq* pReq) {
|
|
||||||
buf = taosDecodeFixedI64(buf, &pReq->leftForVer);
|
|
||||||
buf = taosDecodeFixedI32(buf, &pReq->vgId);
|
|
||||||
buf = taosDecodeFixedI64(buf, &pReq->oldConsumerId);
|
|
||||||
buf = taosDecodeFixedI64(buf, &pReq->newConsumerId);
|
|
||||||
buf = taosDecodeString(buf, &pReq->topic);
|
|
||||||
return buf;
|
|
||||||
}
|
|
||||||
|
|
||||||
typedef struct {
|
|
||||||
SMsgHead header;
|
|
||||||
int32_t vgId;
|
|
||||||
int64_t consumerId;
|
|
||||||
char topicName[TSDB_TOPIC_FNAME_LEN];
|
|
||||||
char cgroup[TSDB_CGROUP_LEN];
|
|
||||||
} SMqSetCVgRsp;
|
|
||||||
|
|
||||||
typedef struct {
|
|
||||||
SMsgHead header;
|
|
||||||
int32_t vgId;
|
|
||||||
int64_t consumerId;
|
|
||||||
char topicName[TSDB_TOPIC_FNAME_LEN];
|
|
||||||
char cgroup[TSDB_CGROUP_LEN];
|
|
||||||
} SMqMVRebRsp;
|
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
int32_t vgId;
|
int32_t vgId;
|
||||||
int64_t offset;
|
int64_t offset;
|
||||||
|
@ -2152,6 +2078,24 @@ typedef struct {
|
||||||
SSchema* pSchema;
|
SSchema* pSchema;
|
||||||
} SSchemaWrapper;
|
} SSchemaWrapper;
|
||||||
|
|
||||||
|
static FORCE_INLINE SSchemaWrapper* tCloneSSchemaWrapper(const SSchemaWrapper* pSchemaWrapper) {
|
||||||
|
SSchemaWrapper* pSW = (SSchemaWrapper*)taosMemoryMalloc(sizeof(SSchemaWrapper));
|
||||||
|
if (pSW == NULL) return pSW;
|
||||||
|
pSW->nCols = pSchemaWrapper->nCols;
|
||||||
|
pSW->pSchema = (SSchema*)taosMemoryCalloc(pSW->nCols, sizeof(SSchema));
|
||||||
|
if (pSW->pSchema == NULL) {
|
||||||
|
taosMemoryFree(pSW);
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
memcpy(pSW->pSchema, pSchemaWrapper->pSchema, pSW->nCols * sizeof(SSchema));
|
||||||
|
return pSW;
|
||||||
|
}
|
||||||
|
|
||||||
|
static FORCE_INLINE void tDeleteSSchemaWrapper(SSchemaWrapper* pSchemaWrapper) {
|
||||||
|
taosMemoryFree(pSchemaWrapper->pSchema);
|
||||||
|
taosMemoryFree(pSchemaWrapper);
|
||||||
|
}
|
||||||
|
|
||||||
static FORCE_INLINE int32_t taosEncodeSSchema(void** buf, const SSchema* pSchema) {
|
static FORCE_INLINE int32_t taosEncodeSSchema(void** buf, const SSchema* pSchema) {
|
||||||
int32_t tlen = 0;
|
int32_t tlen = 0;
|
||||||
tlen += taosEncodeFixedI8(buf, pSchema->type);
|
tlen += taosEncodeFixedI8(buf, pSchema->type);
|
||||||
|
@ -2162,13 +2106,13 @@ static FORCE_INLINE int32_t taosEncodeSSchema(void** buf, const SSchema* pSchema
|
||||||
return tlen;
|
return tlen;
|
||||||
}
|
}
|
||||||
|
|
||||||
static FORCE_INLINE void* taosDecodeSSchema(void* buf, SSchema* pSchema) {
|
static FORCE_INLINE void* taosDecodeSSchema(const void* buf, SSchema* pSchema) {
|
||||||
buf = taosDecodeFixedI8(buf, &pSchema->type);
|
buf = taosDecodeFixedI8(buf, &pSchema->type);
|
||||||
buf = taosDecodeFixedI8(buf, &pSchema->flags);
|
buf = taosDecodeFixedI8(buf, &pSchema->flags);
|
||||||
buf = taosDecodeFixedI32(buf, &pSchema->bytes);
|
buf = taosDecodeFixedI32(buf, &pSchema->bytes);
|
||||||
buf = taosDecodeFixedI16(buf, &pSchema->colId);
|
buf = taosDecodeFixedI16(buf, &pSchema->colId);
|
||||||
buf = taosDecodeStringTo(buf, pSchema->name);
|
buf = taosDecodeStringTo(buf, pSchema->name);
|
||||||
return buf;
|
return (void*)buf;
|
||||||
}
|
}
|
||||||
|
|
||||||
static FORCE_INLINE int32_t tEncodeSSchema(SCoder* pEncoder, const SSchema* pSchema) {
|
static FORCE_INLINE int32_t tEncodeSSchema(SCoder* pEncoder, const SSchema* pSchema) {
|
||||||
|
@ -2198,7 +2142,7 @@ static FORCE_INLINE int32_t taosEncodeSSchemaWrapper(void** buf, const SSchemaWr
|
||||||
return tlen;
|
return tlen;
|
||||||
}
|
}
|
||||||
|
|
||||||
static FORCE_INLINE void* taosDecodeSSchemaWrapper(void* buf, SSchemaWrapper* pSW) {
|
static FORCE_INLINE void* taosDecodeSSchemaWrapper(const void* buf, SSchemaWrapper* pSW) {
|
||||||
buf = taosDecodeFixedU32(buf, &pSW->nCols);
|
buf = taosDecodeFixedU32(buf, &pSW->nCols);
|
||||||
pSW->pSchema = (SSchema*)taosMemoryCalloc(pSW->nCols, sizeof(SSchema));
|
pSW->pSchema = (SSchema*)taosMemoryCalloc(pSW->nCols, sizeof(SSchema));
|
||||||
if (pSW->pSchema == NULL) {
|
if (pSW->pSchema == NULL) {
|
||||||
|
@ -2208,7 +2152,7 @@ static FORCE_INLINE void* taosDecodeSSchemaWrapper(void* buf, SSchemaWrapper* pS
|
||||||
for (int32_t i = 0; i < pSW->nCols; i++) {
|
for (int32_t i = 0; i < pSW->nCols; i++) {
|
||||||
buf = taosDecodeSSchema(buf, &pSW->pSchema[i]);
|
buf = taosDecodeSSchema(buf, &pSW->pSchema[i]);
|
||||||
}
|
}
|
||||||
return buf;
|
return (void*)buf;
|
||||||
}
|
}
|
||||||
|
|
||||||
static FORCE_INLINE int32_t tEncodeSSchemaWrapper(SCoder* pEncoder, const SSchemaWrapper* pSW) {
|
static FORCE_INLINE int32_t tEncodeSSchemaWrapper(SCoder* pEncoder, const SSchemaWrapper* pSW) {
|
||||||
|
@ -2615,6 +2559,10 @@ static FORCE_INLINE int32_t tEncodeSMqDataBlkRsp(void** buf, const SMqDataBlkRsp
|
||||||
void* data = taosArrayGetP(pRsp->blockData, i);
|
void* data = taosArrayGetP(pRsp->blockData, i);
|
||||||
tlen += taosEncodeFixedI32(buf, bLen);
|
tlen += taosEncodeFixedI32(buf, bLen);
|
||||||
tlen += taosEncodeBinary(buf, data, bLen);
|
tlen += taosEncodeBinary(buf, data, bLen);
|
||||||
|
if (pRsp->withSchema) {
|
||||||
|
SSchemaWrapper* pSW = (SSchemaWrapper*)taosArrayGetP(pRsp->blockSchema, i);
|
||||||
|
tlen += taosEncodeSSchemaWrapper(buf, pSW);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return tlen;
|
return tlen;
|
||||||
|
@ -2627,6 +2575,7 @@ static FORCE_INLINE void* tDecodeSMqDataBlkRsp(const void* buf, SMqDataBlkRsp* p
|
||||||
buf = taosDecodeFixedI32(buf, &pRsp->blockNum);
|
buf = taosDecodeFixedI32(buf, &pRsp->blockNum);
|
||||||
pRsp->blockData = taosArrayInit(pRsp->blockNum, sizeof(void*));
|
pRsp->blockData = taosArrayInit(pRsp->blockNum, sizeof(void*));
|
||||||
pRsp->blockDataLen = taosArrayInit(pRsp->blockNum, sizeof(void*));
|
pRsp->blockDataLen = taosArrayInit(pRsp->blockNum, sizeof(void*));
|
||||||
|
pRsp->blockSchema = taosArrayInit(pRsp->blockNum, sizeof(void*));
|
||||||
if (pRsp->blockNum != 0) {
|
if (pRsp->blockNum != 0) {
|
||||||
buf = taosDecodeFixedI8(buf, &pRsp->withTbName);
|
buf = taosDecodeFixedI8(buf, &pRsp->withTbName);
|
||||||
buf = taosDecodeFixedI8(buf, &pRsp->withSchema);
|
buf = taosDecodeFixedI8(buf, &pRsp->withSchema);
|
||||||
|
@ -2639,6 +2588,11 @@ static FORCE_INLINE void* tDecodeSMqDataBlkRsp(const void* buf, SMqDataBlkRsp* p
|
||||||
buf = taosDecodeBinary(buf, &data, bLen);
|
buf = taosDecodeBinary(buf, &data, bLen);
|
||||||
taosArrayPush(pRsp->blockDataLen, &bLen);
|
taosArrayPush(pRsp->blockDataLen, &bLen);
|
||||||
taosArrayPush(pRsp->blockData, &data);
|
taosArrayPush(pRsp->blockData, &data);
|
||||||
|
if (pRsp->withSchema) {
|
||||||
|
SSchemaWrapper* pSW = (SSchemaWrapper*)taosMemoryMalloc(sizeof(SSchemaWrapper));
|
||||||
|
buf = taosDecodeSSchemaWrapper(buf, pSW);
|
||||||
|
taosArrayPush(pRsp->blockSchema, &pSW);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return (void*)buf;
|
return (void*)buf;
|
||||||
|
|
|
@ -231,6 +231,10 @@ static FORCE_INLINE SReqResultInfo* tmqGetNextResInfo(TAOS_RES* res, bool conver
|
||||||
msg->resIter++;
|
msg->resIter++;
|
||||||
if (msg->resIter < msg->rsp.blockNum) {
|
if (msg->resIter < msg->rsp.blockNum) {
|
||||||
SRetrieveTableRsp* pRetrieve = (SRetrieveTableRsp*)taosArrayGetP(msg->rsp.blockData, msg->resIter);
|
SRetrieveTableRsp* pRetrieve = (SRetrieveTableRsp*)taosArrayGetP(msg->rsp.blockData, msg->resIter);
|
||||||
|
if (msg->rsp.withSchema) {
|
||||||
|
SSchemaWrapper* pSW = (SSchemaWrapper*)taosArrayGetP(msg->rsp.blockSchema, msg->resIter);
|
||||||
|
setResSchemaInfo(&msg->resInfo, pSW->pSchema, pSW->nCols);
|
||||||
|
}
|
||||||
setQueryResultFromRsp(&msg->resInfo, pRetrieve, convertUcs4);
|
setQueryResultFromRsp(&msg->resInfo, pRetrieve, convertUcs4);
|
||||||
return &msg->resInfo;
|
return &msg->resInfo;
|
||||||
}
|
}
|
||||||
|
|
|
@ -226,16 +226,14 @@ int32_t execDdlQuery(SRequestObj* pRequest, SQuery* pQuery) {
|
||||||
|
|
||||||
int32_t getPlan(SRequestObj* pRequest, SQuery* pQuery, SQueryPlan** pPlan, SArray* pNodeList) {
|
int32_t getPlan(SRequestObj* pRequest, SQuery* pQuery, SQueryPlan** pPlan, SArray* pNodeList) {
|
||||||
pRequest->type = pQuery->msgType;
|
pRequest->type = pQuery->msgType;
|
||||||
SPlanContext cxt = {
|
SPlanContext cxt = {.queryId = pRequest->requestId,
|
||||||
.queryId = pRequest->requestId,
|
|
||||||
.acctId = pRequest->pTscObj->acctId,
|
.acctId = pRequest->pTscObj->acctId,
|
||||||
.mgmtEpSet = getEpSet_s(&pRequest->pTscObj->pAppInfo->mgmtEp),
|
.mgmtEpSet = getEpSet_s(&pRequest->pTscObj->pAppInfo->mgmtEp),
|
||||||
.pAstRoot = pQuery->pRoot,
|
.pAstRoot = pQuery->pRoot,
|
||||||
.showRewrite = pQuery->showRewrite,
|
.showRewrite = pQuery->showRewrite,
|
||||||
.pTransporter = pRequest->pTscObj->pAppInfo->pTransporter,
|
.pTransporter = pRequest->pTscObj->pAppInfo->pTransporter,
|
||||||
.pMsg = pRequest->msgBuf,
|
.pMsg = pRequest->msgBuf,
|
||||||
.msgLen = ERROR_MSG_BUF_DEFAULT_SIZE
|
.msgLen = ERROR_MSG_BUF_DEFAULT_SIZE};
|
||||||
};
|
|
||||||
int32_t code = catalogGetHandle(pRequest->pTscObj->pAppInfo->clusterId, &cxt.pCatalog);
|
int32_t code = catalogGetHandle(pRequest->pTscObj->pAppInfo->clusterId, &cxt.pCatalog);
|
||||||
if (TSDB_CODE_SUCCESS == code) {
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
code = qCreateQueryPlan(&cxt, pPlan, pNodeList);
|
code = qCreateQueryPlan(&cxt, pPlan, pNodeList);
|
||||||
|
@ -247,6 +245,7 @@ void setResSchemaInfo(SReqResultInfo* pResInfo, const SSchema* pSchema, int32_t
|
||||||
ASSERT(pSchema != NULL && numOfCols > 0);
|
ASSERT(pSchema != NULL && numOfCols > 0);
|
||||||
|
|
||||||
pResInfo->numOfCols = numOfCols;
|
pResInfo->numOfCols = numOfCols;
|
||||||
|
// TODO handle memory leak
|
||||||
pResInfo->fields = taosMemoryCalloc(numOfCols, sizeof(TAOS_FIELD));
|
pResInfo->fields = taosMemoryCalloc(numOfCols, sizeof(TAOS_FIELD));
|
||||||
pResInfo->userFields = taosMemoryCalloc(numOfCols, sizeof(TAOS_FIELD));
|
pResInfo->userFields = taosMemoryCalloc(numOfCols, sizeof(TAOS_FIELD));
|
||||||
|
|
||||||
|
@ -840,12 +839,12 @@ int32_t setResultDataPtr(SReqResultInfo* pResultInfo, TAOS_FIELD* pFields, int32
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
char* p = (char*) pResultInfo->pData;
|
char* p = (char*)pResultInfo->pData;
|
||||||
|
|
||||||
int32_t dataLen = *(int32_t*) p;
|
int32_t dataLen = *(int32_t*)p;
|
||||||
p += sizeof(int32_t);
|
p += sizeof(int32_t);
|
||||||
|
|
||||||
uint64_t groupId = *(uint64_t*) p;
|
uint64_t groupId = *(uint64_t*)p;
|
||||||
p += sizeof(uint64_t);
|
p += sizeof(uint64_t);
|
||||||
|
|
||||||
int32_t* colLength = (int32_t*)p;
|
int32_t* colLength = (int32_t*)p;
|
||||||
|
|
|
@ -376,7 +376,7 @@ tmq_t* tmq_consumer_new(tmq_conf_t* conf, char* errstr, int32_t errstrLen) {
|
||||||
|
|
||||||
ASSERT(user);
|
ASSERT(user);
|
||||||
ASSERT(pass);
|
ASSERT(pass);
|
||||||
ASSERT(conf->db);
|
/*ASSERT(conf->db);*/
|
||||||
ASSERT(conf->groupId[0]);
|
ASSERT(conf->groupId[0]);
|
||||||
|
|
||||||
pTmq->pTscObj = taos_connect_internal(conf->ip, user, pass, NULL, conf->db, conf->port, CONN_TYPE__TMQ);
|
pTmq->pTscObj = taos_connect_internal(conf->ip, user, pass, NULL, conf->db, conf->port, CONN_TYPE__TMQ);
|
||||||
|
@ -1118,7 +1118,9 @@ SMqRspObj* tmqBuildRspFromWrapper(SMqPollRspWrapper* pWrapper) {
|
||||||
|
|
||||||
pRspObj->resInfo.totalRows = 0;
|
pRspObj->resInfo.totalRows = 0;
|
||||||
pRspObj->resInfo.precision = TSDB_TIME_PRECISION_MILLI;
|
pRspObj->resInfo.precision = TSDB_TIME_PRECISION_MILLI;
|
||||||
|
if (!pWrapper->msg.withSchema) {
|
||||||
setResSchemaInfo(&pRspObj->resInfo, pWrapper->topicHandle->schema.pSchema, pWrapper->topicHandle->schema.nCols);
|
setResSchemaInfo(&pRspObj->resInfo, pWrapper->topicHandle->schema.pSchema, pWrapper->topicHandle->schema.nCols);
|
||||||
|
}
|
||||||
|
|
||||||
taosFreeQitem(pWrapper);
|
taosFreeQitem(pWrapper);
|
||||||
return pRspObj;
|
return pRspObj;
|
||||||
|
|
|
@ -476,7 +476,10 @@ int32_t mndScheduleStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream) {
|
||||||
int32_t mndSchedInitSubEp(SMnode* pMnode, const SMqTopicObj* pTopic, SMqSubscribeObj* pSub) {
|
int32_t mndSchedInitSubEp(SMnode* pMnode, const SMqTopicObj* pTopic, SMqSubscribeObj* pSub) {
|
||||||
SSdb* pSdb = pMnode->pSdb;
|
SSdb* pSdb = pMnode->pSdb;
|
||||||
SVgObj* pVgroup = NULL;
|
SVgObj* pVgroup = NULL;
|
||||||
SQueryPlan* pPlan = qStringToQueryPlan(pTopic->physicalPlan);
|
SQueryPlan* pPlan = NULL;
|
||||||
|
SSubplan* plan = NULL;
|
||||||
|
if (pTopic->subType == TOPIC_SUB_TYPE__TABLE) {
|
||||||
|
pPlan = qStringToQueryPlan(pTopic->physicalPlan);
|
||||||
if (pPlan == NULL) {
|
if (pPlan == NULL) {
|
||||||
terrno = TSDB_CODE_QRY_INVALID_INPUT;
|
terrno = TSDB_CODE_QRY_INVALID_INPUT;
|
||||||
return -1;
|
return -1;
|
||||||
|
@ -501,7 +504,8 @@ int32_t mndSchedInitSubEp(SMnode* pMnode, const SMqTopicObj* pTopic, SMqSubscrib
|
||||||
terrno = TSDB_CODE_MND_UNSUPPORTED_TOPIC;
|
terrno = TSDB_CODE_MND_UNSUPPORTED_TOPIC;
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
SSubplan* plan = nodesListGetNode(inner->pNodeList, 0);
|
plan = nodesListGetNode(inner->pNodeList, 0);
|
||||||
|
}
|
||||||
|
|
||||||
int64_t unexistKey = -1;
|
int64_t unexistKey = -1;
|
||||||
SMqConsumerEpInSub* pEpInSub = taosHashGet(pSub->consumerHash, &unexistKey, sizeof(int64_t));
|
SMqConsumerEpInSub* pEpInSub = taosHashGet(pSub->consumerHash, &unexistKey, sizeof(int64_t));
|
||||||
|
@ -519,38 +523,35 @@ int32_t mndSchedInitSubEp(SMnode* pMnode, const SMqTopicObj* pTopic, SMqSubscrib
|
||||||
}
|
}
|
||||||
|
|
||||||
pSub->vgNum++;
|
pSub->vgNum++;
|
||||||
plan->execNode.nodeId = pVgroup->vgId;
|
|
||||||
plan->execNode.epSet = mndGetVgroupEpset(pMnode, pVgroup);
|
|
||||||
|
|
||||||
SMqVgEp* pVgEp = taosMemoryMalloc(sizeof(SMqVgEp));
|
SMqVgEp* pVgEp = taosMemoryMalloc(sizeof(SMqVgEp));
|
||||||
pVgEp->epSet = plan->execNode.epSet;
|
pVgEp->epSet = mndGetVgroupEpset(pMnode, pVgroup);
|
||||||
pVgEp->vgId = plan->execNode.nodeId;
|
pVgEp->vgId = pVgroup->vgId;
|
||||||
|
taosArrayPush(pEpInSub->vgs, &pVgEp);
|
||||||
#if 0
|
|
||||||
SMqConsumerEp consumerEp = {0};
|
|
||||||
consumerEp.status = 0;
|
|
||||||
consumerEp.consumerId = -1;
|
|
||||||
consumerEp.epSet = plan->execNode.epSet;
|
|
||||||
consumerEp.vgId = plan->execNode.nodeId;
|
|
||||||
#endif
|
|
||||||
|
|
||||||
mDebug("init subscribption %s, assign vg: %d", pSub->key, pVgEp->vgId);
|
mDebug("init subscribption %s, assign vg: %d", pSub->key, pVgEp->vgId);
|
||||||
|
|
||||||
|
if (pTopic->subType == TOPIC_SUB_TYPE__TABLE) {
|
||||||
int32_t msgLen;
|
int32_t msgLen;
|
||||||
|
|
||||||
|
plan->execNode.epSet = pVgEp->epSet;
|
||||||
|
plan->execNode.nodeId = pVgEp->vgId;
|
||||||
|
|
||||||
if (qSubPlanToString(plan, &pVgEp->qmsg, &msgLen) < 0) {
|
if (qSubPlanToString(plan, &pVgEp->qmsg, &msgLen) < 0) {
|
||||||
sdbRelease(pSdb, pVgroup);
|
sdbRelease(pSdb, pVgroup);
|
||||||
qDestroyQueryPlan(pPlan);
|
qDestroyQueryPlan(pPlan);
|
||||||
terrno = TSDB_CODE_QRY_INVALID_INPUT;
|
terrno = TSDB_CODE_QRY_INVALID_INPUT;
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
taosArrayPush(pEpInSub->vgs, &pVgEp);
|
} else {
|
||||||
|
pVgEp->qmsg = strdup("");
|
||||||
|
}
|
||||||
|
|
||||||
ASSERT(taosHashGetSize(pSub->consumerHash) == 1);
|
ASSERT(taosHashGetSize(pSub->consumerHash) == 1);
|
||||||
|
|
||||||
/*taosArrayPush(pSub->unassignedVg, &consumerEp);*/
|
/*taosArrayPush(pSub->unassignedVg, &consumerEp);*/
|
||||||
}
|
}
|
||||||
|
|
||||||
ASSERT(pEpInSub->vgs->size > 0);
|
|
||||||
pEpInSub = taosHashGet(pSub->consumerHash, &unexistKey, sizeof(int64_t));
|
pEpInSub = taosHashGet(pSub->consumerHash, &unexistKey, sizeof(int64_t));
|
||||||
|
|
||||||
ASSERT(pEpInSub->vgs->size > 0);
|
ASSERT(pEpInSub->vgs->size > 0);
|
||||||
|
|
|
@ -282,10 +282,10 @@ static int32_t mndCreateTopic(SMnode *pMnode, SNodeMsg *pReq, SCMCreateTopicReq
|
||||||
topicObj.version = 1;
|
topicObj.version = 1;
|
||||||
topicObj.sql = strdup(pCreate->sql);
|
topicObj.sql = strdup(pCreate->sql);
|
||||||
topicObj.sqlLen = strlen(pCreate->sql) + 1;
|
topicObj.sqlLen = strlen(pCreate->sql) + 1;
|
||||||
topicObj.ast = strdup(pCreate->ast);
|
|
||||||
topicObj.astLen = strlen(pCreate->ast) + 1;
|
|
||||||
|
|
||||||
if (pCreate->ast && pCreate->ast[0]) {
|
if (pCreate->ast && pCreate->ast[0]) {
|
||||||
|
topicObj.ast = strdup(pCreate->ast);
|
||||||
|
topicObj.astLen = strlen(pCreate->ast) + 1;
|
||||||
topicObj.subType = TOPIC_SUB_TYPE__TABLE;
|
topicObj.subType = TOPIC_SUB_TYPE__TABLE;
|
||||||
topicObj.withTbName = 0;
|
topicObj.withTbName = 0;
|
||||||
topicObj.withSchema = 0;
|
topicObj.withSchema = 0;
|
||||||
|
@ -314,6 +314,9 @@ static int32_t mndCreateTopic(SMnode *pMnode, SNodeMsg *pReq, SCMCreateTopicReq
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
|
topicObj.ast = strdup("");
|
||||||
|
topicObj.astLen = 1;
|
||||||
|
topicObj.physicalPlan = strdup("");
|
||||||
topicObj.subType = TOPIC_SUB_TYPE__DB;
|
topicObj.subType = TOPIC_SUB_TYPE__DB;
|
||||||
topicObj.withTbName = 1;
|
topicObj.withTbName = 1;
|
||||||
topicObj.withSchema = 1;
|
topicObj.withSchema = 1;
|
||||||
|
|
|
@ -411,8 +411,10 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) {
|
||||||
|
|
||||||
SMqDataBlkRsp rsp = {0};
|
SMqDataBlkRsp rsp = {0};
|
||||||
rsp.reqOffset = pReq->currentOffset;
|
rsp.reqOffset = pReq->currentOffset;
|
||||||
|
rsp.withSchema = pExec->withSchema;
|
||||||
rsp.blockData = taosArrayInit(0, sizeof(void*));
|
rsp.blockData = taosArrayInit(0, sizeof(void*));
|
||||||
rsp.blockDataLen = taosArrayInit(0, sizeof(int32_t));
|
rsp.blockDataLen = taosArrayInit(0, sizeof(int32_t));
|
||||||
|
rsp.blockSchema = taosArrayInit(0, sizeof(void*));
|
||||||
|
|
||||||
while (1) {
|
while (1) {
|
||||||
consumerEpoch = atomic_load_32(&pExec->epoch);
|
consumerEpoch = atomic_load_32(&pExec->epoch);
|
||||||
|
@ -512,6 +514,12 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) {
|
||||||
ASSERT(actualLen <= dataStrLen);
|
ASSERT(actualLen <= dataStrLen);
|
||||||
taosArrayPush(rsp.blockDataLen, &actualLen);
|
taosArrayPush(rsp.blockDataLen, &actualLen);
|
||||||
taosArrayPush(rsp.blockData, &buf);
|
taosArrayPush(rsp.blockData, &buf);
|
||||||
|
|
||||||
|
if (pExec->withSchema) {
|
||||||
|
SSchemaWrapper* pSW = tCloneSSchemaWrapper(pExec->pExecReader[workerId]->pSchemaWrapper);
|
||||||
|
taosArrayPush(rsp.blockSchema, &pSW);
|
||||||
|
}
|
||||||
|
|
||||||
rsp.blockNum++;
|
rsp.blockNum++;
|
||||||
}
|
}
|
||||||
// db subscribe
|
// db subscribe
|
||||||
|
@ -540,6 +548,10 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) {
|
||||||
ASSERT(actualLen <= dataStrLen);
|
ASSERT(actualLen <= dataStrLen);
|
||||||
taosArrayPush(rsp.blockDataLen, &actualLen);
|
taosArrayPush(rsp.blockDataLen, &actualLen);
|
||||||
taosArrayPush(rsp.blockData, &buf);
|
taosArrayPush(rsp.blockData, &buf);
|
||||||
|
|
||||||
|
SSchemaWrapper* pSW = tCloneSSchemaWrapper(pExec->pExecReader[workerId]->pSchemaWrapper);
|
||||||
|
taosArrayPush(rsp.blockSchema, &pSW);
|
||||||
|
|
||||||
rsp.blockNum++;
|
rsp.blockNum++;
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
|
@ -586,6 +598,8 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) {
|
||||||
// TODO destroy
|
// TODO destroy
|
||||||
taosArrayDestroy(rsp.blockData);
|
taosArrayDestroy(rsp.blockData);
|
||||||
taosArrayDestroy(rsp.blockDataLen);
|
taosArrayDestroy(rsp.blockDataLen);
|
||||||
|
taosArrayDestroyP(rsp.blockSchema, (FDelete)tDeleteSSchemaWrapper);
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -827,12 +841,16 @@ int32_t tqProcessVgChangeReq(STQ* pTq, char* msg, int32_t msgLen) {
|
||||||
pExec->pWalReader = walOpenReadHandle(pTq->pVnode->pWal);
|
pExec->pWalReader = walOpenReadHandle(pTq->pVnode->pWal);
|
||||||
for (int32_t i = 0; i < 5; i++) {
|
for (int32_t i = 0; i < 5; i++) {
|
||||||
pExec->pExecReader[i] = tqInitSubmitMsgScanner(pTq->pVnode->pMeta);
|
pExec->pExecReader[i] = tqInitSubmitMsgScanner(pTq->pVnode->pMeta);
|
||||||
|
if (pExec->subType == TOPIC_SUB_TYPE__TABLE) {
|
||||||
SReadHandle handle = {
|
SReadHandle handle = {
|
||||||
.reader = pExec->pExecReader[i],
|
.reader = pExec->pExecReader[i],
|
||||||
.meta = pTq->pVnode->pMeta,
|
.meta = pTq->pVnode->pMeta,
|
||||||
};
|
};
|
||||||
pExec->task[i] = qCreateStreamExecTaskInfo(pExec->qmsg, &handle);
|
pExec->task[i] = qCreateStreamExecTaskInfo(pExec->qmsg, &handle);
|
||||||
ASSERT(pExec->task[i]);
|
ASSERT(pExec->task[i]);
|
||||||
|
} else {
|
||||||
|
pExec->task[i] = NULL;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
taosHashPut(pTq->execs, req.subKey, strlen(req.subKey), pExec, sizeof(STqExec));
|
taosHashPut(pTq->execs, req.subKey, strlen(req.subKey), pExec, sizeof(STqExec));
|
||||||
return 0;
|
return 0;
|
||||||
|
|
|
@ -65,7 +65,9 @@ bool tqNextDataBlock(STqReadHandle* pHandle) {
|
||||||
|
|
||||||
/*pHandle->pBlock->uid = htobe64(pHandle->pBlock->uid);*/
|
/*pHandle->pBlock->uid = htobe64(pHandle->pBlock->uid);*/
|
||||||
/*if (pHandle->tbUid == pHandle->pBlock->uid) {*/
|
/*if (pHandle->tbUid == pHandle->pBlock->uid) {*/
|
||||||
ASSERT(pHandle->tbIdHash);
|
if (pHandle->tbIdHash == NULL) {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
void* ret = taosHashGet(pHandle->tbIdHash, &pHandle->pBlock->uid, sizeof(int64_t));
|
void* ret = taosHashGet(pHandle->tbIdHash, &pHandle->pBlock->uid, sizeof(int64_t));
|
||||||
if (ret != NULL) {
|
if (ret != NULL) {
|
||||||
/*printf("retrieve one tb %ld\n", pHandle->pBlock->uid);*/
|
/*printf("retrieve one tb %ld\n", pHandle->pBlock->uid);*/
|
||||||
|
@ -107,6 +109,27 @@ int32_t tqRetrieveDataBlock(SArray** ppCols, STqReadHandle* pHandle, uint64_t* p
|
||||||
*pNumOfRows = pHandle->pBlock->numOfRows;
|
*pNumOfRows = pHandle->pBlock->numOfRows;
|
||||||
int32_t colNumNeed = taosArrayGetSize(pHandle->pColIdList);
|
int32_t colNumNeed = taosArrayGetSize(pHandle->pColIdList);
|
||||||
|
|
||||||
|
if (colNumNeed == 0) {
|
||||||
|
*ppCols = taosArrayInit(pSchemaWrapper->nCols, sizeof(SColumnInfoData));
|
||||||
|
if (*ppCols == NULL) {
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t colMeta = 0;
|
||||||
|
while (colMeta < pSchemaWrapper->nCols) {
|
||||||
|
SSchema* pColSchema = &pSchemaWrapper->pSchema[colMeta];
|
||||||
|
SColumnInfoData colInfo = {0};
|
||||||
|
colInfo.info.bytes = pColSchema->bytes;
|
||||||
|
colInfo.info.colId = pColSchema->colId;
|
||||||
|
colInfo.info.type = pColSchema->type;
|
||||||
|
|
||||||
|
if (colInfoDataEnsureCapacity(&colInfo, 0, *pNumOfRows) < 0) {
|
||||||
|
goto FAIL;
|
||||||
|
}
|
||||||
|
taosArrayPush(*ppCols, &colInfo);
|
||||||
|
colMeta++;
|
||||||
|
}
|
||||||
|
} else {
|
||||||
if (colNumNeed > pSchemaWrapper->nCols) {
|
if (colNumNeed > pSchemaWrapper->nCols) {
|
||||||
colNumNeed = pSchemaWrapper->nCols;
|
colNumNeed = pSchemaWrapper->nCols;
|
||||||
}
|
}
|
||||||
|
@ -140,6 +163,7 @@ int32_t tqRetrieveDataBlock(SArray** ppCols, STqReadHandle* pHandle, uint64_t* p
|
||||||
colNeed++;
|
colNeed++;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
int32_t colActual = taosArrayGetSize(*ppCols);
|
int32_t colActual = taosArrayGetSize(*ppCols);
|
||||||
*pNumOfCols = colActual;
|
*pNumOfCols = colActual;
|
||||||
|
|
Loading…
Reference in New Issue