Merge pull request #11831 from taosdata/feature/tq

enh: msg with adaptive schema
This commit is contained in:
Liu Jicong 2022-04-25 17:37:55 +08:00 committed by GitHub
commit ee9084e456
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 210 additions and 205 deletions

View File

@ -60,7 +60,6 @@ int32_t init_env() {
pRes =
taos_query(pConn, "create stable if not exists st1 (ts timestamp, c1 int, c2 float, c3 binary(10)) tags(t1 int)");
if (taos_errno(pRes) != 0) {
assert(0);
printf("failed to create super table st1, reason:%s\n", taos_errstr(pRes));
return -1;
}
@ -104,8 +103,8 @@ int32_t create_topic() {
}
taos_free_result(pRes);
/*pRes = taos_query(pConn, "create topic topic_ctb_column as abc1");*/
pRes = taos_query(pConn, "create topic topic_ctb_column as select ts, c1, c2, c3 from ct1");
pRes = taos_query(pConn, "create topic topic_ctb_column as abc1");
/*pRes = taos_query(pConn, "create topic topic_ctb_column as select ts, c1, c2, c3 from ct1");*/
if (taos_errno(pRes) != 0) {
printf("failed to create topic topic_ctb_column, reason:%s\n", taos_errstr(pRes));
return -1;
@ -163,9 +162,10 @@ tmq_t* build_consumer() {
tmq_conf_set(conf, "group.id", "tg2");
tmq_conf_set(conf, "td.connect.user", "root");
tmq_conf_set(conf, "td.connect.pass", "taosdata");
tmq_conf_set(conf, "td.connect.db", "abc1");
/*tmq_conf_set(conf, "td.connect.db", "abc1");*/
tmq_conf_set_offset_commit_cb(conf, tmq_commit_cb_print);
tmq_t* tmq = tmq_consumer_new(conf, NULL, 0);
assert(tmq);
return tmq;
}

View File

@ -2069,80 +2069,6 @@ static FORCE_INLINE void* tDecodeSMqSetCVgReq(void* buf, SMqSetCVgReq* pReq) {
return buf;
}
typedef struct {
int64_t leftForVer;
int32_t vgId;
int32_t epoch;
int64_t consumerId;
char topicName[TSDB_TOPIC_FNAME_LEN];
} SMqCancelConnReq;
static FORCE_INLINE int32_t tEncodeSMqCancelConnReq(void** buf, const SMqCancelConnReq* pReq) {
int32_t tlen = 0;
tlen += taosEncodeFixedI64(buf, pReq->leftForVer);
tlen += taosEncodeFixedI32(buf, pReq->vgId);
tlen += taosEncodeFixedI32(buf, pReq->epoch);
tlen += taosEncodeFixedI64(buf, pReq->consumerId);
tlen += taosEncodeString(buf, pReq->topicName);
return tlen;
}
static FORCE_INLINE void* tDecodeSMqCancelConnReq(void* buf, SMqCancelConnReq* pReq) {
buf = taosDecodeFixedI64(buf, &pReq->leftForVer);
buf = taosDecodeFixedI32(buf, &pReq->vgId);
buf = taosDecodeFixedI32(buf, &pReq->epoch);
buf = taosDecodeFixedI64(buf, &pReq->consumerId);
buf = taosDecodeStringTo(buf, pReq->topicName);
return buf;
}
typedef struct {
int8_t reserved;
} SMqCancelConnRsp;
typedef struct {
int64_t leftForVer;
int32_t vgId;
int64_t oldConsumerId;
int64_t newConsumerId;
char* topic;
} SMqMVRebReq;
static FORCE_INLINE int32_t tEncodeSMqMVRebReq(void** buf, const SMqMVRebReq* pReq) {
int32_t tlen = 0;
tlen += taosEncodeFixedI64(buf, pReq->leftForVer);
tlen += taosEncodeFixedI32(buf, pReq->vgId);
tlen += taosEncodeFixedI64(buf, pReq->oldConsumerId);
tlen += taosEncodeFixedI64(buf, pReq->newConsumerId);
tlen += taosEncodeString(buf, pReq->topic);
return tlen;
}
static FORCE_INLINE void* tDecodeSMqMVRebReq(void* buf, SMqMVRebReq* pReq) {
buf = taosDecodeFixedI64(buf, &pReq->leftForVer);
buf = taosDecodeFixedI32(buf, &pReq->vgId);
buf = taosDecodeFixedI64(buf, &pReq->oldConsumerId);
buf = taosDecodeFixedI64(buf, &pReq->newConsumerId);
buf = taosDecodeString(buf, &pReq->topic);
return buf;
}
typedef struct {
SMsgHead header;
int32_t vgId;
int64_t consumerId;
char topicName[TSDB_TOPIC_FNAME_LEN];
char cgroup[TSDB_CGROUP_LEN];
} SMqSetCVgRsp;
typedef struct {
SMsgHead header;
int32_t vgId;
int64_t consumerId;
char topicName[TSDB_TOPIC_FNAME_LEN];
char cgroup[TSDB_CGROUP_LEN];
} SMqMVRebRsp;
typedef struct {
int32_t vgId;
int64_t offset;
@ -2169,6 +2095,24 @@ typedef struct {
SSchema* pSchema;
} SSchemaWrapper;
static FORCE_INLINE SSchemaWrapper* tCloneSSchemaWrapper(const SSchemaWrapper* pSchemaWrapper) {
SSchemaWrapper* pSW = (SSchemaWrapper*)taosMemoryMalloc(sizeof(SSchemaWrapper));
if (pSW == NULL) return pSW;
pSW->nCols = pSchemaWrapper->nCols;
pSW->pSchema = (SSchema*)taosMemoryCalloc(pSW->nCols, sizeof(SSchema));
if (pSW->pSchema == NULL) {
taosMemoryFree(pSW);
return NULL;
}
memcpy(pSW->pSchema, pSchemaWrapper->pSchema, pSW->nCols * sizeof(SSchema));
return pSW;
}
static FORCE_INLINE void tDeleteSSchemaWrapper(SSchemaWrapper* pSchemaWrapper) {
taosMemoryFree(pSchemaWrapper->pSchema);
taosMemoryFree(pSchemaWrapper);
}
static FORCE_INLINE int32_t taosEncodeSSchema(void** buf, const SSchema* pSchema) {
int32_t tlen = 0;
tlen += taosEncodeFixedI8(buf, pSchema->type);
@ -2179,13 +2123,13 @@ static FORCE_INLINE int32_t taosEncodeSSchema(void** buf, const SSchema* pSchema
return tlen;
}
static FORCE_INLINE void* taosDecodeSSchema(void* buf, SSchema* pSchema) {
static FORCE_INLINE void* taosDecodeSSchema(const void* buf, SSchema* pSchema) {
buf = taosDecodeFixedI8(buf, &pSchema->type);
buf = taosDecodeFixedI8(buf, &pSchema->flags);
buf = taosDecodeFixedI32(buf, &pSchema->bytes);
buf = taosDecodeFixedI16(buf, &pSchema->colId);
buf = taosDecodeStringTo(buf, pSchema->name);
return buf;
return (void*)buf;
}
static FORCE_INLINE int32_t tEncodeSSchema(SCoder* pEncoder, const SSchema* pSchema) {
@ -2215,7 +2159,7 @@ static FORCE_INLINE int32_t taosEncodeSSchemaWrapper(void** buf, const SSchemaWr
return tlen;
}
static FORCE_INLINE void* taosDecodeSSchemaWrapper(void* buf, SSchemaWrapper* pSW) {
static FORCE_INLINE void* taosDecodeSSchemaWrapper(const void* buf, SSchemaWrapper* pSW) {
buf = taosDecodeFixedU32(buf, &pSW->nCols);
pSW->pSchema = (SSchema*)taosMemoryCalloc(pSW->nCols, sizeof(SSchema));
if (pSW->pSchema == NULL) {
@ -2225,7 +2169,7 @@ static FORCE_INLINE void* taosDecodeSSchemaWrapper(void* buf, SSchemaWrapper* pS
for (int32_t i = 0; i < pSW->nCols; i++) {
buf = taosDecodeSSchema(buf, &pSW->pSchema[i]);
}
return buf;
return (void*)buf;
}
static FORCE_INLINE int32_t tEncodeSSchemaWrapper(SCoder* pEncoder, const SSchemaWrapper* pSW) {
@ -2632,6 +2576,10 @@ static FORCE_INLINE int32_t tEncodeSMqDataBlkRsp(void** buf, const SMqDataBlkRsp
void* data = taosArrayGetP(pRsp->blockData, i);
tlen += taosEncodeFixedI32(buf, bLen);
tlen += taosEncodeBinary(buf, data, bLen);
if (pRsp->withSchema) {
SSchemaWrapper* pSW = (SSchemaWrapper*)taosArrayGetP(pRsp->blockSchema, i);
tlen += taosEncodeSSchemaWrapper(buf, pSW);
}
}
}
return tlen;
@ -2644,6 +2592,7 @@ static FORCE_INLINE void* tDecodeSMqDataBlkRsp(const void* buf, SMqDataBlkRsp* p
buf = taosDecodeFixedI32(buf, &pRsp->blockNum);
pRsp->blockData = taosArrayInit(pRsp->blockNum, sizeof(void*));
pRsp->blockDataLen = taosArrayInit(pRsp->blockNum, sizeof(void*));
pRsp->blockSchema = taosArrayInit(pRsp->blockNum, sizeof(void*));
if (pRsp->blockNum != 0) {
buf = taosDecodeFixedI8(buf, &pRsp->withTbName);
buf = taosDecodeFixedI8(buf, &pRsp->withSchema);
@ -2656,6 +2605,11 @@ static FORCE_INLINE void* tDecodeSMqDataBlkRsp(const void* buf, SMqDataBlkRsp* p
buf = taosDecodeBinary(buf, &data, bLen);
taosArrayPush(pRsp->blockDataLen, &bLen);
taosArrayPush(pRsp->blockData, &data);
if (pRsp->withSchema) {
SSchemaWrapper* pSW = (SSchemaWrapper*)taosMemoryMalloc(sizeof(SSchemaWrapper));
buf = taosDecodeSSchemaWrapper(buf, pSW);
taosArrayPush(pRsp->blockSchema, &pSW);
}
}
}
return (void*)buf;

View File

@ -231,6 +231,10 @@ static FORCE_INLINE SReqResultInfo* tmqGetNextResInfo(TAOS_RES* res, bool conver
msg->resIter++;
if (msg->resIter < msg->rsp.blockNum) {
SRetrieveTableRsp* pRetrieve = (SRetrieveTableRsp*)taosArrayGetP(msg->rsp.blockData, msg->resIter);
if (msg->rsp.withSchema) {
SSchemaWrapper* pSW = (SSchemaWrapper*)taosArrayGetP(msg->rsp.blockSchema, msg->resIter);
setResSchemaInfo(&msg->resInfo, pSW->pSchema, pSW->nCols);
}
setQueryResultFromRsp(&msg->resInfo, pRetrieve, convertUcs4);
return &msg->resInfo;
}

View File

@ -14,9 +14,9 @@
*/
#include "catalog.h"
#include "scheduler.h"
#include "clientInt.h"
#include "clientLog.h"
#include "scheduler.h"
#include "trpc.h"
static SClientHbMgr clientHbMgr = {0};
@ -110,7 +110,8 @@ static int32_t hbProcessStbInfoRsp(void *value, int32_t valueLen, struct SCatalo
static int32_t hbQueryHbRspHandle(SAppHbMgr *pAppHbMgr, SClientHbRsp *pRsp) {
SHbConnInfo *info = taosHashGet(pAppHbMgr->connInfo, &pRsp->connKey, sizeof(SClientHbKey));
if (NULL == info) {
tscWarn("fail to get connInfo, may be dropped, refId:%" PRIx64 ", type:%d", pRsp->connKey.tscRid, pRsp->connKey.connType);
tscWarn("fail to get connInfo, may be dropped, refId:%" PRIx64 ", type:%d", pRsp->connKey.tscRid,
pRsp->connKey.connType);
return TSDB_CODE_SUCCESS;
}
@ -121,7 +122,7 @@ static int32_t hbQueryHbRspHandle(SAppHbMgr *pAppHbMgr, SClientHbRsp *pRsp) {
} else {
updateEpSet_s(&pTscObj->pAppInfo->mgmtEp, &pRsp->query->epSet);
pTscObj->connId = pRsp->query->connId;
if (pRsp->query->killRid) {
SRequestObj *pRequest = acquireRequest(pRsp->query->killRid);
if (NULL == pRequest) {
@ -131,7 +132,7 @@ static int32_t hbQueryHbRspHandle(SAppHbMgr *pAppHbMgr, SClientHbRsp *pRsp) {
releaseRequest(pRsp->query->killRid);
}
}
if (pRsp->query->killConnection) {
taos_close(pTscObj);
}
@ -139,7 +140,7 @@ static int32_t hbQueryHbRspHandle(SAppHbMgr *pAppHbMgr, SClientHbRsp *pRsp) {
releaseTscObj(pRsp->connKey.tscRid);
}
}
int32_t kvNum = pRsp->info ? taosArrayGetSize(pRsp->info) : 0;
tscDebug("hb got %d rsp kv", kvNum);
@ -236,24 +237,24 @@ static int32_t hbAsyncCallBack(void *param, const SDataBuf *pMsg, int32_t code)
}
int32_t hbBuildQueryDesc(SQueryHbReqBasic *hbBasic, STscObj *pObj) {
int64_t now = taosGetTimestampUs();
int64_t now = taosGetTimestampUs();
SQueryDesc desc = {0};
int32_t code = 0;
int32_t code = 0;
void *pIter = taosHashIterate(pObj->pRequests, NULL);
void *pIter = taosHashIterate(pObj->pRequests, NULL);
while (pIter != NULL) {
int64_t *rid = pIter;
int64_t *rid = pIter;
SRequestObj *pRequest = acquireRequest(*rid);
if (NULL == pRequest) {
continue;
}
tstrncpy(desc.sql, pRequest->sqlstr, sizeof(desc.sql));
desc.stime = pRequest->metric.start;
desc.queryId = pRequest->requestId;
desc.stime = pRequest->metric.start;
desc.queryId = pRequest->requestId;
desc.useconds = now - pRequest->metric.start;
desc.reqRid = pRequest->self;
desc.pid = hbBasic->pid;
desc.reqRid = pRequest->self;
desc.pid = hbBasic->pid;
taosGetFqdn(desc.fqdn);
desc.subPlanNum = pRequest->body.pDag ? pRequest->body.pDag->numOfSubplans : 0;
@ -271,9 +272,9 @@ int32_t hbBuildQueryDesc(SQueryHbReqBasic *hbBasic, STscObj *pObj) {
}
}
releaseRequest(*rid);
releaseRequest(*rid);
taosArrayPush(hbBasic->queryDesc, &desc);
pIter = taosHashIterate(pObj->pRequests, pIter);
}
@ -286,14 +287,14 @@ int32_t hbGetQueryBasicInfo(SClientHbKey *connKey, SClientHbReq *req) {
tscWarn("tscObj rid %" PRIx64 " not exist", connKey->tscRid);
return TSDB_CODE_QRY_APP_ERROR;
}
int32_t numOfQueries = pTscObj->pRequests ? taosHashGetSize(pTscObj->pRequests) : 0;
if (numOfQueries <= 0) {
releaseTscObj(connKey->tscRid);
tscDebug("no queries on connection");
return TSDB_CODE_QRY_APP_ERROR;
}
SQueryHbReqBasic *hbBasic = (SQueryHbReqBasic *)taosMemoryCalloc(1, sizeof(SQueryHbReqBasic));
if (NULL == hbBasic) {
tscError("calloc %d failed", (int32_t)sizeof(SQueryHbReqBasic));
@ -308,7 +309,7 @@ int32_t hbGetQueryBasicInfo(SClientHbKey *connKey, SClientHbReq *req) {
taosMemoryFree(hbBasic);
return TSDB_CODE_QRY_OUT_OF_MEMORY;
}
hbBasic->connId = pTscObj->connId;
hbBasic->pid = taosGetPId();
taosGetAppName(hbBasic->app, NULL);
@ -405,7 +406,7 @@ int32_t hbQueryHbReqHandle(SClientHbKey *connKey, void *param, SClientHbReq *req
}
hbGetQueryBasicInfo(connKey, req);
code = hbGetExpiredDBInfo(connKey, pCatalog, req);
if (TSDB_CODE_SUCCESS != code) {
return code;
@ -471,10 +472,10 @@ SClientHbBatchReq *hbGatherAllInfo(SAppHbMgr *pAppHbMgr) {
pIter = taosHashIterate(pAppHbMgr->activeInfo, pIter);
}
// if (code) {
// taosArrayDestroyEx(pBatchReq->reqs, hbFreeReq);
// taosMemoryFreeClear(pBatchReq);
// }
// if (code) {
// taosArrayDestroyEx(pBatchReq->reqs, hbFreeReq);
// taosMemoryFreeClear(pBatchReq);
// }
return pBatchReq;
}
@ -630,24 +631,23 @@ void appHbMgrCleanup(void) {
int sz = taosArrayGetSize(clientHbMgr.appHbMgrs);
for (int i = 0; i < sz; i++) {
SAppHbMgr *pTarget = taosArrayGetP(clientHbMgr.appHbMgrs, i);
void *pIter = taosHashIterate(pTarget->activeInfo, NULL);
void *pIter = taosHashIterate(pTarget->activeInfo, NULL);
while (pIter != NULL) {
SClientHbReq *pOneReq = pIter;
hbFreeReq(pOneReq);
taosHashCleanup(pOneReq->info);
pIter = taosHashIterate(pTarget->activeInfo, pIter);
}
}
taosHashCleanup(pTarget->activeInfo);
pTarget->activeInfo = NULL;
pIter = taosHashIterate(pTarget->connInfo, NULL);
while (pIter != NULL) {
SHbConnInfo *info = pIter;
taosMemoryFree(info->param);
pIter = taosHashIterate(pTarget->connInfo, pIter);
}
}
taosHashCleanup(pTarget->connInfo);
pTarget->connInfo = NULL;
@ -668,13 +668,13 @@ int hbMgrInit() {
hbMgrInitHandle();
// init backgroud thread
//hbCreateThread();
/*hbCreateThread();*/
return 0;
}
void hbMgrCleanUp() {
//hbStopThread();
// hbStopThread();
// destroy all appHbMgr
int8_t old = atomic_val_compare_exchange_8(&clientHbMgr.inited, 1, 0);
@ -747,11 +747,11 @@ void hbDeregisterConn(SAppHbMgr *pAppHbMgr, SClientHbKey connKey) {
taosMemoryFree(info->param);
taosHashRemove(pAppHbMgr->connInfo, &connKey, sizeof(SClientHbKey));
}
if (NULL == pReq || NULL == info) {
return;
}
atomic_sub_fetch_32(&pAppHbMgr->connKeyCnt, 1);
}

View File

@ -226,17 +226,15 @@ int32_t execDdlQuery(SRequestObj* pRequest, SQuery* pQuery) {
int32_t getPlan(SRequestObj* pRequest, SQuery* pQuery, SQueryPlan** pPlan, SArray* pNodeList) {
pRequest->type = pQuery->msgType;
SPlanContext cxt = {
.queryId = pRequest->requestId,
.acctId = pRequest->pTscObj->acctId,
.mgmtEpSet = getEpSet_s(&pRequest->pTscObj->pAppInfo->mgmtEp),
.pAstRoot = pQuery->pRoot,
.showRewrite = pQuery->showRewrite,
.pTransporter = pRequest->pTscObj->pAppInfo->pTransporter,
.pMsg = pRequest->msgBuf,
.msgLen = ERROR_MSG_BUF_DEFAULT_SIZE
};
int32_t code = catalogGetHandle(pRequest->pTscObj->pAppInfo->clusterId, &cxt.pCatalog);
SPlanContext cxt = {.queryId = pRequest->requestId,
.acctId = pRequest->pTscObj->acctId,
.mgmtEpSet = getEpSet_s(&pRequest->pTscObj->pAppInfo->mgmtEp),
.pAstRoot = pQuery->pRoot,
.showRewrite = pQuery->showRewrite,
.pTransporter = pRequest->pTscObj->pAppInfo->pTransporter,
.pMsg = pRequest->msgBuf,
.msgLen = ERROR_MSG_BUF_DEFAULT_SIZE};
int32_t code = catalogGetHandle(pRequest->pTscObj->pAppInfo->clusterId, &cxt.pCatalog);
if (TSDB_CODE_SUCCESS == code) {
code = qCreateQueryPlan(&cxt, pPlan, pNodeList);
}
@ -247,6 +245,7 @@ void setResSchemaInfo(SReqResultInfo* pResInfo, const SSchema* pSchema, int32_t
ASSERT(pSchema != NULL && numOfCols > 0);
pResInfo->numOfCols = numOfCols;
// TODO handle memory leak
pResInfo->fields = taosMemoryCalloc(numOfCols, sizeof(TAOS_FIELD));
pResInfo->userFields = taosMemoryCalloc(numOfCols, sizeof(TAOS_FIELD));
@ -282,7 +281,7 @@ int32_t scheduleQuery(SRequestObj* pRequest, SQueryPlan* pDag, SArray* pNodeList
SQueryResult res = {.code = 0, .numOfRows = 0, .msgSize = ERROR_MSG_BUF_DEFAULT_SIZE, .msg = pRequest->msgBuf};
int32_t code = schedulerExecJob(pTransporter, pNodeList, pDag, &pRequest->body.queryJob, pRequest->sqlstr,
pRequest->metric.start, &res);
pRequest->metric.start, &res);
if (code != TSDB_CODE_SUCCESS) {
if (pRequest->body.queryJob != 0) {
schedulerFreeJob(pRequest->body.queryJob);
@ -840,12 +839,12 @@ int32_t setResultDataPtr(SReqResultInfo* pResultInfo, TAOS_FIELD* pFields, int32
return code;
}
char* p = (char*) pResultInfo->pData;
char* p = (char*)pResultInfo->pData;
int32_t dataLen = *(int32_t*) p;
int32_t dataLen = *(int32_t*)p;
p += sizeof(int32_t);
uint64_t groupId = *(uint64_t*) p;
uint64_t groupId = *(uint64_t*)p;
p += sizeof(uint64_t);
int32_t* colLength = (int32_t*)p;

View File

@ -376,7 +376,7 @@ tmq_t* tmq_consumer_new(tmq_conf_t* conf, char* errstr, int32_t errstrLen) {
ASSERT(user);
ASSERT(pass);
ASSERT(conf->db);
/*ASSERT(conf->db);*/
ASSERT(conf->groupId[0]);
pTmq->pTscObj = taos_connect_internal(conf->ip, user, pass, NULL, conf->db, conf->port, CONN_TYPE__TMQ);
@ -1118,7 +1118,9 @@ SMqRspObj* tmqBuildRspFromWrapper(SMqPollRspWrapper* pWrapper) {
pRspObj->resInfo.totalRows = 0;
pRspObj->resInfo.precision = TSDB_TIME_PRECISION_MILLI;
setResSchemaInfo(&pRspObj->resInfo, pWrapper->topicHandle->schema.pSchema, pWrapper->topicHandle->schema.nCols);
if (!pWrapper->msg.withSchema) {
setResSchemaInfo(&pRspObj->resInfo, pWrapper->topicHandle->schema.pSchema, pWrapper->topicHandle->schema.nCols);
}
taosFreeQitem(pWrapper);
return pRspObj;

View File

@ -476,33 +476,37 @@ int32_t mndScheduleStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream) {
int32_t mndSchedInitSubEp(SMnode* pMnode, const SMqTopicObj* pTopic, SMqSubscribeObj* pSub) {
SSdb* pSdb = pMnode->pSdb;
SVgObj* pVgroup = NULL;
SQueryPlan* pPlan = qStringToQueryPlan(pTopic->physicalPlan);
if (pPlan == NULL) {
terrno = TSDB_CODE_QRY_INVALID_INPUT;
return -1;
SQueryPlan* pPlan = NULL;
SSubplan* plan = NULL;
if (pTopic->subType == TOPIC_SUB_TYPE__TABLE) {
pPlan = qStringToQueryPlan(pTopic->physicalPlan);
if (pPlan == NULL) {
terrno = TSDB_CODE_QRY_INVALID_INPUT;
return -1;
}
ASSERT(pSub->vgNum == -1);
pSub->vgNum = 0;
int32_t levelNum = LIST_LENGTH(pPlan->pSubplans);
if (levelNum != 1) {
qDestroyQueryPlan(pPlan);
terrno = TSDB_CODE_MND_UNSUPPORTED_TOPIC;
return -1;
}
SNodeListNode* inner = nodesListGetNode(pPlan->pSubplans, 0);
int32_t opNum = LIST_LENGTH(inner->pNodeList);
if (opNum != 1) {
qDestroyQueryPlan(pPlan);
terrno = TSDB_CODE_MND_UNSUPPORTED_TOPIC;
return -1;
}
plan = nodesListGetNode(inner->pNodeList, 0);
}
ASSERT(pSub->vgNum == -1);
pSub->vgNum = 0;
int32_t levelNum = LIST_LENGTH(pPlan->pSubplans);
if (levelNum != 1) {
qDestroyQueryPlan(pPlan);
terrno = TSDB_CODE_MND_UNSUPPORTED_TOPIC;
return -1;
}
SNodeListNode* inner = nodesListGetNode(pPlan->pSubplans, 0);
int32_t opNum = LIST_LENGTH(inner->pNodeList);
if (opNum != 1) {
qDestroyQueryPlan(pPlan);
terrno = TSDB_CODE_MND_UNSUPPORTED_TOPIC;
return -1;
}
SSubplan* plan = nodesListGetNode(inner->pNodeList, 0);
int64_t unexistKey = -1;
SMqConsumerEpInSub* pEpInSub = taosHashGet(pSub->consumerHash, &unexistKey, sizeof(int64_t));
ASSERT(pEpInSub);
@ -519,38 +523,35 @@ int32_t mndSchedInitSubEp(SMnode* pMnode, const SMqTopicObj* pTopic, SMqSubscrib
}
pSub->vgNum++;
plan->execNode.nodeId = pVgroup->vgId;
plan->execNode.epSet = mndGetVgroupEpset(pMnode, pVgroup);
SMqVgEp* pVgEp = taosMemoryMalloc(sizeof(SMqVgEp));
pVgEp->epSet = plan->execNode.epSet;
pVgEp->vgId = plan->execNode.nodeId;
#if 0
SMqConsumerEp consumerEp = {0};
consumerEp.status = 0;
consumerEp.consumerId = -1;
consumerEp.epSet = plan->execNode.epSet;
consumerEp.vgId = plan->execNode.nodeId;
#endif
pVgEp->epSet = mndGetVgroupEpset(pMnode, pVgroup);
pVgEp->vgId = pVgroup->vgId;
taosArrayPush(pEpInSub->vgs, &pVgEp);
mDebug("init subscribption %s, assign vg: %d", pSub->key, pVgEp->vgId);
int32_t msgLen;
if (qSubPlanToString(plan, &pVgEp->qmsg, &msgLen) < 0) {
sdbRelease(pSdb, pVgroup);
qDestroyQueryPlan(pPlan);
terrno = TSDB_CODE_QRY_INVALID_INPUT;
return -1;
if (pTopic->subType == TOPIC_SUB_TYPE__TABLE) {
int32_t msgLen;
plan->execNode.epSet = pVgEp->epSet;
plan->execNode.nodeId = pVgEp->vgId;
if (qSubPlanToString(plan, &pVgEp->qmsg, &msgLen) < 0) {
sdbRelease(pSdb, pVgroup);
qDestroyQueryPlan(pPlan);
terrno = TSDB_CODE_QRY_INVALID_INPUT;
return -1;
}
} else {
pVgEp->qmsg = strdup("");
}
taosArrayPush(pEpInSub->vgs, &pVgEp);
ASSERT(taosHashGetSize(pSub->consumerHash) == 1);
/*taosArrayPush(pSub->unassignedVg, &consumerEp);*/
}
ASSERT(pEpInSub->vgs->size > 0);
pEpInSub = taosHashGet(pSub->consumerHash, &unexistKey, sizeof(int64_t));
ASSERT(pEpInSub->vgs->size > 0);

View File

@ -282,10 +282,10 @@ static int32_t mndCreateTopic(SMnode *pMnode, SNodeMsg *pReq, SCMCreateTopicReq
topicObj.version = 1;
topicObj.sql = strdup(pCreate->sql);
topicObj.sqlLen = strlen(pCreate->sql) + 1;
topicObj.ast = strdup(pCreate->ast);
topicObj.astLen = strlen(pCreate->ast) + 1;
if (pCreate->ast && pCreate->ast[0]) {
topicObj.ast = strdup(pCreate->ast);
topicObj.astLen = strlen(pCreate->ast) + 1;
topicObj.subType = TOPIC_SUB_TYPE__TABLE;
topicObj.withTbName = 0;
topicObj.withSchema = 0;
@ -314,6 +314,9 @@ static int32_t mndCreateTopic(SMnode *pMnode, SNodeMsg *pReq, SCMCreateTopicReq
return -1;
}
} else {
topicObj.ast = strdup("");
topicObj.astLen = 1;
topicObj.physicalPlan = strdup("");
topicObj.subType = TOPIC_SUB_TYPE__DB;
topicObj.withTbName = 1;
topicObj.withSchema = 1;

View File

@ -411,8 +411,10 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) {
SMqDataBlkRsp rsp = {0};
rsp.reqOffset = pReq->currentOffset;
rsp.withSchema = pExec->withSchema;
rsp.blockData = taosArrayInit(0, sizeof(void*));
rsp.blockDataLen = taosArrayInit(0, sizeof(int32_t));
rsp.blockSchema = taosArrayInit(0, sizeof(void*));
while (1) {
consumerEpoch = atomic_load_32(&pExec->epoch);
@ -512,6 +514,12 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) {
ASSERT(actualLen <= dataStrLen);
taosArrayPush(rsp.blockDataLen, &actualLen);
taosArrayPush(rsp.blockData, &buf);
if (pExec->withSchema) {
SSchemaWrapper* pSW = tCloneSSchemaWrapper(pExec->pExecReader[workerId]->pSchemaWrapper);
taosArrayPush(rsp.blockSchema, &pSW);
}
rsp.blockNum++;
}
// db subscribe
@ -540,6 +548,10 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) {
ASSERT(actualLen <= dataStrLen);
taosArrayPush(rsp.blockDataLen, &actualLen);
taosArrayPush(rsp.blockData, &buf);
SSchemaWrapper* pSW = tCloneSSchemaWrapper(pExec->pExecReader[workerId]->pSchemaWrapper);
taosArrayPush(rsp.blockSchema, &pSW);
rsp.blockNum++;
}
} else {
@ -586,6 +598,8 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) {
// TODO destroy
taosArrayDestroy(rsp.blockData);
taosArrayDestroy(rsp.blockDataLen);
taosArrayDestroyP(rsp.blockSchema, (FDelete)tDeleteSSchemaWrapper);
return 0;
}
@ -827,12 +841,16 @@ int32_t tqProcessVgChangeReq(STQ* pTq, char* msg, int32_t msgLen) {
pExec->pWalReader = walOpenReadHandle(pTq->pVnode->pWal);
for (int32_t i = 0; i < 5; i++) {
pExec->pExecReader[i] = tqInitSubmitMsgScanner(pTq->pVnode->pMeta);
SReadHandle handle = {
.reader = pExec->pExecReader[i],
.meta = pTq->pVnode->pMeta,
};
pExec->task[i] = qCreateStreamExecTaskInfo(pExec->qmsg, &handle);
ASSERT(pExec->task[i]);
if (pExec->subType == TOPIC_SUB_TYPE__TABLE) {
SReadHandle handle = {
.reader = pExec->pExecReader[i],
.meta = pTq->pVnode->pMeta,
};
pExec->task[i] = qCreateStreamExecTaskInfo(pExec->qmsg, &handle);
ASSERT(pExec->task[i]);
} else {
pExec->task[i] = NULL;
}
}
taosHashPut(pTq->execs, req.subKey, strlen(req.subKey), pExec, sizeof(STqExec));
return 0;

View File

@ -65,7 +65,9 @@ bool tqNextDataBlock(STqReadHandle* pHandle) {
/*pHandle->pBlock->uid = htobe64(pHandle->pBlock->uid);*/
/*if (pHandle->tbUid == pHandle->pBlock->uid) {*/
ASSERT(pHandle->tbIdHash);
if (pHandle->tbIdHash == NULL) {
return true;
}
void* ret = taosHashGet(pHandle->tbIdHash, &pHandle->pBlock->uid, sizeof(int64_t));
if (ret != NULL) {
/*printf("retrieve one tb %ld\n", pHandle->pBlock->uid);*/
@ -107,26 +109,15 @@ int32_t tqRetrieveDataBlock(SArray** ppCols, STqReadHandle* pHandle, uint64_t* p
*pNumOfRows = pHandle->pBlock->numOfRows;
int32_t colNumNeed = taosArrayGetSize(pHandle->pColIdList);
if (colNumNeed > pSchemaWrapper->nCols) {
colNumNeed = pSchemaWrapper->nCols;
}
if (colNumNeed == 0) {
*ppCols = taosArrayInit(pSchemaWrapper->nCols, sizeof(SColumnInfoData));
if (*ppCols == NULL) {
return -1;
}
*ppCols = taosArrayInit(colNumNeed, sizeof(SColumnInfoData));
if (*ppCols == NULL) {
return -1;
}
int32_t colMeta = 0;
int32_t colNeed = 0;
while (colMeta < pSchemaWrapper->nCols && colNeed < colNumNeed) {
SSchema* pColSchema = &pSchemaWrapper->pSchema[colMeta];
col_id_t colIdSchema = pColSchema->colId;
col_id_t colIdNeed = *(col_id_t*)taosArrayGet(pHandle->pColIdList, colNeed);
if (colIdSchema < colIdNeed) {
colMeta++;
} else if (colIdSchema > colIdNeed) {
colNeed++;
} else {
int32_t colMeta = 0;
while (colMeta < pSchemaWrapper->nCols) {
SSchema* pColSchema = &pSchemaWrapper->pSchema[colMeta];
SColumnInfoData colInfo = {0};
colInfo.info.bytes = pColSchema->bytes;
colInfo.info.colId = pColSchema->colId;
@ -137,7 +128,40 @@ int32_t tqRetrieveDataBlock(SArray** ppCols, STqReadHandle* pHandle, uint64_t* p
}
taosArrayPush(*ppCols, &colInfo);
colMeta++;
colNeed++;
}
} else {
if (colNumNeed > pSchemaWrapper->nCols) {
colNumNeed = pSchemaWrapper->nCols;
}
*ppCols = taosArrayInit(colNumNeed, sizeof(SColumnInfoData));
if (*ppCols == NULL) {
return -1;
}
int32_t colMeta = 0;
int32_t colNeed = 0;
while (colMeta < pSchemaWrapper->nCols && colNeed < colNumNeed) {
SSchema* pColSchema = &pSchemaWrapper->pSchema[colMeta];
col_id_t colIdSchema = pColSchema->colId;
col_id_t colIdNeed = *(col_id_t*)taosArrayGet(pHandle->pColIdList, colNeed);
if (colIdSchema < colIdNeed) {
colMeta++;
} else if (colIdSchema > colIdNeed) {
colNeed++;
} else {
SColumnInfoData colInfo = {0};
colInfo.info.bytes = pColSchema->bytes;
colInfo.info.colId = pColSchema->colId;
colInfo.info.type = pColSchema->type;
if (colInfoDataEnsureCapacity(&colInfo, 0, *pNumOfRows) < 0) {
goto FAIL;
}
taosArrayPush(*ppCols, &colInfo);
colMeta++;
colNeed++;
}
}
}