fix: fix tmq result parse
This commit is contained in:
parent
79bc579779
commit
0fe0d6fc77
|
@ -19,8 +19,20 @@
|
|||
#include <time.h>
|
||||
#include "taos.h"
|
||||
|
||||
static int running = 1;
|
||||
/*static void msg_process(tmq_message_t* message) { tmqShowMsg(message); }*/
|
||||
static int running = 1;
|
||||
static void msg_process(TAOS_RES* msg) {
|
||||
char buf[1024];
|
||||
printf("topic: %s\n", tmq_get_topic_name(msg));
|
||||
printf("vg:%d\n", tmq_get_vgroup_id(msg));
|
||||
while (1) {
|
||||
TAOS_ROW row = taos_fetch_row(msg);
|
||||
if (row == NULL) break;
|
||||
TAOS_FIELD* fields = taos_fetch_fields(msg);
|
||||
int32_t numOfFields = taos_field_count(msg);
|
||||
taos_print_row(buf, row, fields, numOfFields);
|
||||
printf("%s\n", buf);
|
||||
}
|
||||
}
|
||||
|
||||
int32_t init_env() {
|
||||
TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
|
||||
|
@ -42,8 +54,7 @@ int32_t init_env() {
|
|||
}
|
||||
taos_free_result(pRes);
|
||||
|
||||
pRes =
|
||||
taos_query(pConn, "create stable if not exists st1 (ts timestamp, c1 int, c2 float, c3 binary(10)) tags(t1 int)");
|
||||
pRes = taos_query(pConn, "create stable if not exists st1 (ts timestamp, c1 int, c2 float, c4 int) tags(t1 int)");
|
||||
if (taos_errno(pRes) != 0) {
|
||||
printf("failed to create super table st1, reason:%s\n", taos_errstr(pRes));
|
||||
return -1;
|
||||
|
@ -90,7 +101,7 @@ int32_t create_topic() {
|
|||
|
||||
/*const char* sql = "select * from tu1";*/
|
||||
/*pRes = tmq_create_topic(pConn, "test_stb_topic_1", sql, strlen(sql));*/
|
||||
pRes = taos_query(pConn, "create topic topic_ctb_column as select ts, c1 from ct1");
|
||||
pRes = taos_query(pConn, "create topic topic_ctb_column as select ts, c1, c2, c4 from ct1");
|
||||
if (taos_errno(pRes) != 0) {
|
||||
printf("failed to create topic topic_ctb_column, reason:%s\n", taos_errstr(pRes));
|
||||
return -1;
|
||||
|
@ -200,7 +211,7 @@ void sync_consume_loop(tmq_t* tmq, tmq_list_t* topics) {
|
|||
while (running) {
|
||||
TAOS_RES* tmqmessage = tmq_consumer_poll(tmq, 1000);
|
||||
if (tmqmessage) {
|
||||
/*msg_process(tmqmessage);*/
|
||||
msg_process(tmqmessage);
|
||||
tmq_message_destroy(tmqmessage);
|
||||
|
||||
if ((++msg_count % MIN_COMMIT_COUNT) == 0) tmq_commit(tmq, NULL, 0);
|
||||
|
|
|
@ -2365,11 +2365,10 @@ typedef struct {
|
|||
} SMqSubVgEp;
|
||||
|
||||
typedef struct {
|
||||
char topic[TSDB_TOPIC_FNAME_LEN];
|
||||
int8_t isSchemaAdaptive;
|
||||
SArray* vgs; // SArray<SMqSubVgEp>
|
||||
int32_t numOfFields;
|
||||
TAOS_FIELD* fields;
|
||||
char topic[TSDB_TOPIC_FNAME_LEN];
|
||||
int8_t isSchemaAdaptive;
|
||||
SArray* vgs; // SArray<SMqSubVgEp>
|
||||
SSchemaWrapper schema;
|
||||
} SMqSubTopicEp;
|
||||
|
||||
typedef struct {
|
||||
|
@ -2468,8 +2467,7 @@ static FORCE_INLINE int32_t tEncodeSMqSubTopicEp(void** buf, const SMqSubTopicEp
|
|||
SMqSubVgEp* pVgEp = (SMqSubVgEp*)taosArrayGet(pTopicEp->vgs, i);
|
||||
tlen += tEncodeSMqSubVgEp(buf, pVgEp);
|
||||
}
|
||||
tlen += taosEncodeFixedI32(buf, pTopicEp->numOfFields);
|
||||
// tlen += taosEncodeBinary(buf, pTopicEp->fields, pTopicEp->numOfFields * sizeof(TAOS_FIELD));
|
||||
tlen += taosEncodeSSchemaWrapper(buf, &pTopicEp->schema);
|
||||
return tlen;
|
||||
}
|
||||
|
||||
|
@ -2487,8 +2485,7 @@ static FORCE_INLINE void* tDecodeSMqSubTopicEp(void* buf, SMqSubTopicEp* pTopicE
|
|||
buf = tDecodeSMqSubVgEp(buf, &vgEp);
|
||||
taosArrayPush(pTopicEp->vgs, &vgEp);
|
||||
}
|
||||
buf = taosDecodeFixedI32(buf, &pTopicEp->numOfFields);
|
||||
// buf = taosDecodeBinary(buf, (void**)&pTopicEp->fields, pTopicEp->numOfFields * sizeof(TAOS_FIELD));
|
||||
buf = taosDecodeSSchemaWrapper(buf, &pTopicEp->schema);
|
||||
return buf;
|
||||
}
|
||||
|
||||
|
|
|
@ -171,21 +171,25 @@ TAOS_ROW taos_fetch_row(TAOS_RES *res) {
|
|||
return doFetchRow(pRequest, true, true);
|
||||
|
||||
} else if (TD_RES_TMQ(res)) {
|
||||
SMqRspObj *msg = ((SMqRspObj *)res);
|
||||
SMqRspObj *msg = ((SMqRspObj *)res);
|
||||
if (msg->resIter == -1) msg->resIter++;
|
||||
SReqResultInfo *pResultInfo = taosArrayGet(msg->res, msg->resIter);
|
||||
|
||||
doSetOneRowPtr(pResultInfo);
|
||||
pResultInfo->current += 1;
|
||||
|
||||
if (pResultInfo->row == NULL) {
|
||||
msg->resIter++;
|
||||
pResultInfo = taosArrayGet(msg->res, msg->resIter);
|
||||
if (pResultInfo->current < pResultInfo->numOfRows) {
|
||||
doSetOneRowPtr(pResultInfo);
|
||||
pResultInfo->current += 1;
|
||||
return pResultInfo->row;
|
||||
} else {
|
||||
msg->resIter++;
|
||||
if (msg->resIter < taosArrayGetSize(msg->res)) {
|
||||
pResultInfo = taosArrayGet(msg->res, msg->resIter);
|
||||
doSetOneRowPtr(pResultInfo);
|
||||
pResultInfo->current += 1;
|
||||
return pResultInfo->row;
|
||||
} else {
|
||||
return NULL;
|
||||
}
|
||||
}
|
||||
|
||||
return pResultInfo->row;
|
||||
|
||||
} else {
|
||||
// assert to avoid uninitialization error
|
||||
ASSERT(0);
|
||||
|
|
|
@ -119,14 +119,14 @@ typedef struct {
|
|||
|
||||
typedef struct {
|
||||
// subscribe info
|
||||
int32_t sqlLen;
|
||||
char* sql;
|
||||
char* topicName;
|
||||
int64_t topicId;
|
||||
SArray* vgs; // SArray<SMqClientVg>
|
||||
int8_t isSchemaAdaptive;
|
||||
int32_t numOfFields;
|
||||
TAOS_FIELD* fields;
|
||||
int32_t sqlLen;
|
||||
char* sql;
|
||||
char* topicName;
|
||||
int64_t topicId;
|
||||
SArray* vgs; // SArray<SMqClientVg>
|
||||
int8_t isSchemaAdaptive;
|
||||
int32_t numOfFields;
|
||||
SSchemaWrapper schema;
|
||||
} SMqClientTopic;
|
||||
|
||||
typedef struct {
|
||||
|
@ -956,6 +956,7 @@ bool tmqUpdateEp(tmq_t* tmq, int32_t epoch, SMqCMGetSubEpRsp* pRsp) {
|
|||
for (int32_t i = 0; i < topicNumGet; i++) {
|
||||
SMqClientTopic topic = {0};
|
||||
SMqSubTopicEp* pTopicEp = taosArrayGet(pRsp->topics, i);
|
||||
topic.schema = pTopicEp->schema;
|
||||
taosHashClear(pHash);
|
||||
topic.topicName = strdup(pTopicEp->topic);
|
||||
|
||||
|
@ -1191,7 +1192,10 @@ SMqRspObj* tmqBuildRspFromWrapper(SMqPollRspWrapper* pWrapper) {
|
|||
for (int32_t i = 0; i < blockNum; i++) {
|
||||
int32_t pos = *(int32_t*)taosArrayGet(pRsp->blockPos, i);
|
||||
SRetrieveTableRsp* pRetrieve = POINTER_SHIFT(pRsp->blockData, pos);
|
||||
SReqResultInfo resInfo;
|
||||
SReqResultInfo resInfo = {0};
|
||||
resInfo.totalRows = 0;
|
||||
resInfo.precision = TSDB_TIME_PRECISION_MILLI;
|
||||
setResSchemaInfo(&resInfo, pWrapper->topicHandle->schema.pSchema, pWrapper->topicHandle->schema.nCols);
|
||||
setQueryResultFromRsp(&resInfo, pRetrieve, true);
|
||||
taosArrayPush(pRspObj->res, &resInfo);
|
||||
}
|
||||
|
@ -1386,7 +1390,7 @@ SMqRspObj* tmqHandleAllRsp(tmq_t* tmq, int64_t blockingTime, bool pollIfReset) {
|
|||
rspWrapper = NULL;
|
||||
continue;
|
||||
}
|
||||
// build msg
|
||||
// build rsp
|
||||
SMqRspObj* pRsp = tmqBuildRspFromWrapper(pollRspWrapper);
|
||||
return pRsp;
|
||||
} else {
|
||||
|
|
|
@ -60,8 +60,10 @@ static int32_t mndProcessResetOffsetReq(SNodeMsg *pMsg);
|
|||
static int32_t mndPersistMqSetConnReq(SMnode *pMnode, STrans *pTrans, const SMqTopicObj *pTopic, const char *cgroup,
|
||||
const SMqConsumerEp *pConsumerEp);
|
||||
|
||||
static int32_t mndPersistRebalanceMsg(SMnode *pMnode, STrans *pTrans, const SMqConsumerEp *pConsumerEp, const char* topicName);
|
||||
static int32_t mndPersistCancelConnReq(SMnode *pMnode, STrans *pTrans, const SMqConsumerEp *pConsumerEp, const char* oldTopicName);
|
||||
static int32_t mndPersistRebalanceMsg(SMnode *pMnode, STrans *pTrans, const SMqConsumerEp *pConsumerEp,
|
||||
const char *topicName);
|
||||
static int32_t mndPersistCancelConnReq(SMnode *pMnode, STrans *pTrans, const SMqConsumerEp *pConsumerEp,
|
||||
const char *oldTopicName);
|
||||
|
||||
int32_t mndInitSubscribe(SMnode *pMnode) {
|
||||
SSdbTable table = {.sdbType = SDB_SUBSCRIBE,
|
||||
|
@ -102,7 +104,8 @@ static SMqSubscribeObj *mndCreateSubscription(SMnode *pMnode, const SMqTopicObj
|
|||
return pSub;
|
||||
}
|
||||
|
||||
static int32_t mndBuildRebalanceMsg(void **pBuf, int32_t *pLen, const SMqConsumerEp *pConsumerEp, const char* topicName) {
|
||||
static int32_t mndBuildRebalanceMsg(void **pBuf, int32_t *pLen, const SMqConsumerEp *pConsumerEp,
|
||||
const char *topicName) {
|
||||
SMqMVRebReq req = {
|
||||
.vgId = pConsumerEp->vgId,
|
||||
.oldConsumerId = pConsumerEp->oldConsumerId,
|
||||
|
@ -131,7 +134,8 @@ static int32_t mndBuildRebalanceMsg(void **pBuf, int32_t *pLen, const SMqConsume
|
|||
return 0;
|
||||
}
|
||||
|
||||
static int32_t mndPersistRebalanceMsg(SMnode *pMnode, STrans *pTrans, const SMqConsumerEp *pConsumerEp, const char* topicName) {
|
||||
static int32_t mndPersistRebalanceMsg(SMnode *pMnode, STrans *pTrans, const SMqConsumerEp *pConsumerEp,
|
||||
const char *topicName) {
|
||||
ASSERT(pConsumerEp->oldConsumerId != -1);
|
||||
|
||||
void *buf;
|
||||
|
@ -158,7 +162,8 @@ static int32_t mndPersistRebalanceMsg(SMnode *pMnode, STrans *pTrans, const SMqC
|
|||
return 0;
|
||||
}
|
||||
|
||||
static int32_t mndBuildCancelConnReq(void **pBuf, int32_t *pLen, const SMqConsumerEp *pConsumerEp, const char* oldTopicName) {
|
||||
static int32_t mndBuildCancelConnReq(void **pBuf, int32_t *pLen, const SMqConsumerEp *pConsumerEp,
|
||||
const char *oldTopicName) {
|
||||
SMqCancelConnReq req = {0};
|
||||
req.consumerId = pConsumerEp->consumerId;
|
||||
req.vgId = pConsumerEp->vgId;
|
||||
|
@ -182,7 +187,8 @@ static int32_t mndBuildCancelConnReq(void **pBuf, int32_t *pLen, const SMqConsum
|
|||
return 0;
|
||||
}
|
||||
|
||||
static int32_t mndPersistCancelConnReq(SMnode *pMnode, STrans *pTrans, const SMqConsumerEp *pConsumerEp, const char* oldTopicName) {
|
||||
static int32_t mndPersistCancelConnReq(SMnode *pMnode, STrans *pTrans, const SMqConsumerEp *pConsumerEp,
|
||||
const char *oldTopicName) {
|
||||
void *buf;
|
||||
int32_t tlen;
|
||||
if (mndBuildCancelConnReq(&buf, &tlen, pConsumerEp, oldTopicName) < 0) {
|
||||
|
@ -219,13 +225,14 @@ static int32_t mndProcessGetSubEpReq(SNodeMsg *pMsg) {
|
|||
terrno = TSDB_CODE_MND_CONSUMER_NOT_EXIST;
|
||||
return -1;
|
||||
}
|
||||
//TODO add lock
|
||||
// TODO add lock
|
||||
ASSERT(strcmp(pReq->cgroup, pConsumer->cgroup) == 0);
|
||||
int32_t serverEpoch = pConsumer->epoch;
|
||||
int32_t serverEpoch = pConsumer->epoch;
|
||||
|
||||
// TODO
|
||||
int32_t hbStatus = atomic_load_32(&pConsumer->hbStatus);
|
||||
mDebug("consumer %ld epoch(%d) try to get sub ep, server epoch %d, old val: %d", consumerId, epoch, serverEpoch, hbStatus);
|
||||
mDebug("consumer %ld epoch(%d) try to get sub ep, server epoch %d, old val: %d", consumerId, epoch, serverEpoch,
|
||||
hbStatus);
|
||||
atomic_store_32(&pConsumer->hbStatus, 0);
|
||||
/*SSdbRaw *pConsumerRaw = mndConsumerActionEncode(pConsumer);*/
|
||||
/*sdbSetRawStatus(pConsumerRaw, SDB_STATUS_READY);*/
|
||||
|
@ -233,7 +240,8 @@ static int32_t mndProcessGetSubEpReq(SNodeMsg *pMsg) {
|
|||
|
||||
strcpy(rsp.cgroup, pReq->cgroup);
|
||||
if (epoch != serverEpoch) {
|
||||
mInfo("send new assignment to consumer %ld, consumer epoch %d, server epoch %d", pConsumer->consumerId, epoch, serverEpoch);
|
||||
mInfo("send new assignment to consumer %ld, consumer epoch %d, server epoch %d", pConsumer->consumerId, epoch,
|
||||
serverEpoch);
|
||||
mDebug("consumer %ld try r lock", consumerId);
|
||||
taosRLockLatch(&pConsumer->lock);
|
||||
mDebug("consumer %ld r locked", consumerId);
|
||||
|
@ -251,8 +259,15 @@ static int32_t mndProcessGetSubEpReq(SNodeMsg *pMsg) {
|
|||
if (consumerId == pSubConsumer->consumerId) {
|
||||
int32_t vgsz = taosArrayGetSize(pSubConsumer->vgInfo);
|
||||
mInfo("topic %s has %d vg", topicName, serverEpoch);
|
||||
|
||||
SMqSubTopicEp topicEp;
|
||||
strcpy(topicEp.topic, topicName);
|
||||
|
||||
SMqTopicObj *pTopic = mndAcquireTopic(pMnode, topicName);
|
||||
ASSERT(pTopic != NULL);
|
||||
topicEp.schema = pTopic->schema;
|
||||
mndReleaseTopic(pMnode, pTopic);
|
||||
|
||||
topicEp.vgs = taosArrayInit(vgsz, sizeof(SMqSubVgEp));
|
||||
for (int32_t k = 0; k < vgsz; k++) {
|
||||
char offsetKey[TSDB_PARTITION_KEY_LEN];
|
||||
|
@ -409,7 +424,8 @@ static int32_t mndProcessDoRebalanceMsg(SNodeMsg *pMsg) {
|
|||
SMqSubscribeObj *pSub = mndAcquireSubscribeByKey(pMnode, pRebSub->key);
|
||||
taosMemoryFreeClear(pRebSub->key);
|
||||
|
||||
mInfo("mq rebalance subscription: %s, vgNum: %d, unassignedVg: %d", pSub->key, pSub->vgNum, (int32_t)taosArrayGetSize(pSub->unassignedVg));
|
||||
mInfo("mq rebalance subscription: %s, vgNum: %d, unassignedVg: %d", pSub->key, pSub->vgNum,
|
||||
(int32_t)taosArrayGetSize(pSub->unassignedVg));
|
||||
|
||||
// remove lost consumer
|
||||
for (int32_t i = 0; i < taosArrayGetSize(pRebSub->lostConsumers); i++) {
|
||||
|
@ -459,12 +475,12 @@ static int32_t mndProcessDoRebalanceMsg(SNodeMsg *pMsg) {
|
|||
mDebug("consumer %ld try w lock", pRebConsumer->consumerId);
|
||||
taosWLockLatch(&pRebConsumer->lock);
|
||||
mDebug("consumer %ld w locked", pRebConsumer->consumerId);
|
||||
int32_t status = atomic_load_32(&pRebConsumer->status);
|
||||
int32_t status = atomic_load_32(&pRebConsumer->status);
|
||||
if (vgThisConsumerAfterRb != vgThisConsumerBeforeRb ||
|
||||
(vgThisConsumerAfterRb != 0 && status != MQ_CONSUMER_STATUS__ACTIVE) ||
|
||||
(vgThisConsumerAfterRb == 0 && status != MQ_CONSUMER_STATUS__LOST)) {
|
||||
/*if (vgThisConsumerAfterRb != vgThisConsumerBeforeRb) {*/
|
||||
/*pRebConsumer->epoch++;*/
|
||||
/*pRebConsumer->epoch++;*/
|
||||
/*}*/
|
||||
if (vgThisConsumerAfterRb != 0) {
|
||||
atomic_store_32(&pRebConsumer->status, MQ_CONSUMER_STATUS__ACTIVE);
|
||||
|
@ -500,7 +516,7 @@ static int32_t mndProcessDoRebalanceMsg(SNodeMsg *pMsg) {
|
|||
|
||||
pConsumerEp->oldConsumerId = pConsumerEp->consumerId;
|
||||
pConsumerEp->consumerId = pSubConsumer->consumerId;
|
||||
//TODO
|
||||
// TODO
|
||||
pConsumerEp->epoch = 0;
|
||||
taosArrayPush(pSubConsumer->vgInfo, pConsumerEp);
|
||||
|
||||
|
|
|
@ -551,30 +551,40 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) {
|
|||
rspV2.rspOffset = fetchOffset;
|
||||
|
||||
int32_t blockSz = taosArrayGetSize(pRes);
|
||||
int32_t tlen = 0;
|
||||
int32_t dataBlockStrLen = 0;
|
||||
for (int32_t i = 0; i < blockSz; i++) {
|
||||
SSDataBlock* pBlock = taosArrayGet(pRes, i);
|
||||
tlen += sizeof(SRetrieveTableRsp) + blockGetEncodeSize(pBlock);
|
||||
dataBlockStrLen += sizeof(SRetrieveTableRsp) + blockGetEncodeSize(pBlock);
|
||||
}
|
||||
|
||||
void* data = taosMemoryMalloc(tlen);
|
||||
if (data == NULL) {
|
||||
void* dataBlockBuf = taosMemoryMalloc(dataBlockStrLen);
|
||||
if (dataBlockBuf == NULL) {
|
||||
pMsg->code = -1;
|
||||
taosMemoryFree(pHead);
|
||||
}
|
||||
|
||||
rspV2.blockData = data;
|
||||
rspV2.blockData = dataBlockBuf;
|
||||
|
||||
void* dataBlockBuf = data;
|
||||
int32_t pos;
|
||||
rspV2.blockPos = taosArrayInit(blockSz, sizeof(int32_t));
|
||||
for (int32_t i = 0; i < blockSz; i++) {
|
||||
pos = 0;
|
||||
SSDataBlock* pBlock = taosArrayGet(pRes, i);
|
||||
blockCompressEncode(pBlock, dataBlockBuf, &pos, pBlock->info.numOfCols, false);
|
||||
SSDataBlock* pBlock = taosArrayGet(pRes, i);
|
||||
SRetrieveTableRsp* pRetrieve = (SRetrieveTableRsp*)dataBlockBuf;
|
||||
pRetrieve->useconds = 0;
|
||||
pRetrieve->precision = 0;
|
||||
pRetrieve->compressed = 0;
|
||||
pRetrieve->completed = 1;
|
||||
pRetrieve->numOfRows = htonl(pBlock->info.rows);
|
||||
blockCompressEncode(pBlock, pRetrieve->data, &pos, pBlock->info.numOfCols, false);
|
||||
taosArrayPush(rspV2.blockPos, &rspV2.dataLen);
|
||||
rspV2.dataLen += pos;
|
||||
dataBlockBuf = POINTER_SHIFT(dataBlockBuf, pos);
|
||||
|
||||
int32_t totLen = sizeof(SRetrieveTableRsp) + pos;
|
||||
pRetrieve->compLen = htonl(totLen);
|
||||
rspV2.dataLen += totLen;
|
||||
dataBlockBuf = POINTER_SHIFT(dataBlockBuf, totLen);
|
||||
}
|
||||
ASSERT(POINTER_DISTANCE(dataBlockBuf, rspV2.blockData) <= dataBlockStrLen);
|
||||
|
||||
int32_t msgLen = sizeof(SMqRspHead) + tEncodeSMqPollRspV2(NULL, &rspV2);
|
||||
void* buf = rpcMallocCont(msgLen);
|
||||
|
@ -590,7 +600,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) {
|
|||
|
||||
/*taosArrayDestroyEx(rsp.pBlockData, (void (*)(void*))tDeleteSSDataBlock);*/
|
||||
pMsg->pCont = buf;
|
||||
pMsg->contLen = tlen;
|
||||
pMsg->contLen = msgLen;
|
||||
pMsg->code = 0;
|
||||
vDebug("vg %d offset %ld msgType %d from consumer %ld (epoch %d) actual rsp", pTq->pVnode->vgId, fetchOffset,
|
||||
pHead->msgType, consumerId, pReq->epoch);
|
||||
|
|
Loading…
Reference in New Issue