fix query error
This commit is contained in:
parent
4fda9f98cb
commit
6d7fd79709
|
@ -570,7 +570,6 @@ TEST(testCase, create_topic_Test) {
|
||||||
//taos_close(pConn);
|
//taos_close(pConn);
|
||||||
//}
|
//}
|
||||||
|
|
||||||
#if 0
|
|
||||||
TEST(testCase, tmq_subscribe_Test) {
|
TEST(testCase, tmq_subscribe_Test) {
|
||||||
TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
|
TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
|
||||||
assert(pConn != NULL);
|
assert(pConn != NULL);
|
||||||
|
@ -595,7 +594,6 @@ TEST(testCase, tmq_subscribe_Test) {
|
||||||
//if (msg == NULL) break;
|
//if (msg == NULL) break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
#endif
|
|
||||||
|
|
||||||
TEST(testCase, tmq_consume_Test) {
|
TEST(testCase, tmq_consume_Test) {
|
||||||
}
|
}
|
||||||
|
|
|
@ -626,8 +626,10 @@ static FORCE_INLINE int32_t tEncodeSMqConsumerTopic(void** buf, SMqConsumerTopic
|
||||||
int32_t tlen = 0;
|
int32_t tlen = 0;
|
||||||
tlen += taosEncodeString(buf, pConsumerTopic->name);
|
tlen += taosEncodeString(buf, pConsumerTopic->name);
|
||||||
tlen += taosEncodeFixedI32(buf, pConsumerTopic->epoch);
|
tlen += taosEncodeFixedI32(buf, pConsumerTopic->epoch);
|
||||||
ASSERT(pConsumerTopic->pVgInfo);
|
int32_t sz = 0;
|
||||||
int32_t sz = taosArrayGetSize(pConsumerTopic->pVgInfo);
|
if (pConsumerTopic->pVgInfo != NULL) {
|
||||||
|
sz = taosArrayGetSize(pConsumerTopic->pVgInfo);
|
||||||
|
}
|
||||||
tlen += taosEncodeFixedI32(buf, sz);
|
tlen += taosEncodeFixedI32(buf, sz);
|
||||||
for (int32_t i = 0; i < sz; i++) {
|
for (int32_t i = 0; i < sz; i++) {
|
||||||
int32_t* pVgInfo = taosArrayGet(pConsumerTopic->pVgInfo, i);
|
int32_t* pVgInfo = taosArrayGet(pConsumerTopic->pVgInfo, i);
|
||||||
|
|
|
@ -757,12 +757,12 @@ int32_t tqProcessConsumeReq(STQ* pTq, SRpcMsg* pMsg) {
|
||||||
if (pTopic->buffer.lastOffset == -1 || pReq->offset > pTopic->buffer.lastOffset) {
|
if (pTopic->buffer.lastOffset == -1 || pReq->offset > pTopic->buffer.lastOffset) {
|
||||||
pTopic->buffer.lastOffset = pReq->offset;
|
pTopic->buffer.lastOffset = pReq->offset;
|
||||||
}
|
}
|
||||||
// put output into rsp
|
|
||||||
SMqConsumeRsp rsp = {
|
|
||||||
.consumerId = consumerId,
|
|
||||||
.numOfTopics = 1
|
|
||||||
};
|
|
||||||
}
|
}
|
||||||
|
// put output into rsp
|
||||||
|
SMqConsumeRsp rsp = {
|
||||||
|
.consumerId = consumerId,
|
||||||
|
.numOfTopics = 1
|
||||||
|
};
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
@ -822,14 +822,29 @@ STqReadHandle* tqInitSubmitMsgScanner(SMeta* pMeta) {
|
||||||
|
|
||||||
void tqReadHandleSetMsg(STqReadHandle* pReadHandle, SSubmitMsg* pMsg, int64_t ver) {
|
void tqReadHandleSetMsg(STqReadHandle* pReadHandle, SSubmitMsg* pMsg, int64_t ver) {
|
||||||
pReadHandle->pMsg = pMsg;
|
pReadHandle->pMsg = pMsg;
|
||||||
|
pMsg->length = htonl(pMsg->length);
|
||||||
|
pMsg->numOfBlocks = htonl(pMsg->numOfBlocks);
|
||||||
tInitSubmitMsgIter(pMsg, &pReadHandle->msgIter);
|
tInitSubmitMsgIter(pMsg, &pReadHandle->msgIter);
|
||||||
pReadHandle->ver = ver;
|
pReadHandle->ver = ver;
|
||||||
memset(&pReadHandle->blkIter, 0, sizeof(SSubmitBlkIter));
|
memset(&pReadHandle->blkIter, 0, sizeof(SSubmitBlkIter));
|
||||||
}
|
}
|
||||||
|
|
||||||
bool tqNextDataBlock(STqReadHandle* pHandle) {
|
bool tqNextDataBlock(STqReadHandle* pHandle) {
|
||||||
while (tGetSubmitMsgNext(&pHandle->msgIter, &pHandle->pBlock)) {
|
while (1) {
|
||||||
if (pHandle->tbUid == pHandle->pBlock->uid) return true;
|
if (tGetSubmitMsgNext(&pHandle->msgIter, &pHandle->pBlock) < 0) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
if (pHandle->pBlock == NULL) return false;
|
||||||
|
|
||||||
|
pHandle->pBlock->uid = htobe64(pHandle->pBlock->uid);
|
||||||
|
if (pHandle->tbUid == pHandle->pBlock->uid){
|
||||||
|
pHandle->pBlock->tid = htonl(pHandle->pBlock->tid);
|
||||||
|
pHandle->pBlock->sversion = htonl(pHandle->pBlock->sversion);
|
||||||
|
pHandle->pBlock->dataLen = htonl(pHandle->pBlock->dataLen);
|
||||||
|
pHandle->pBlock->schemaLen = htonl(pHandle->pBlock->schemaLen);
|
||||||
|
pHandle->pBlock->numOfRows = htons(pHandle->pBlock->numOfRows);
|
||||||
|
return true;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
@ -845,8 +860,9 @@ int tqRetrieveDataBlockInfo(STqReadHandle* pHandle, SDataBlockInfo* pBlockInfo)
|
||||||
|
|
||||||
SArray* tqRetrieveDataBlock(STqReadHandle* pHandle) {
|
SArray* tqRetrieveDataBlock(STqReadHandle* pHandle) {
|
||||||
int32_t sversion = pHandle->pBlock->sversion;
|
int32_t sversion = pHandle->pBlock->sversion;
|
||||||
SSchemaWrapper* pSchemaWrapper = metaGetTableSchema(pHandle->pMeta, pHandle->pBlock->uid, sversion, true);
|
//TODO : change sversion
|
||||||
STSchema* pTschema = metaGetTbTSchema(pHandle->pMeta, pHandle->pBlock->uid, sversion);
|
SSchemaWrapper* pSchemaWrapper = metaGetTableSchema(pHandle->pMeta, pHandle->pBlock->uid, 0, true);
|
||||||
|
STSchema* pTschema = metaGetTbTSchema(pHandle->pMeta, pHandle->pBlock->uid, 0);
|
||||||
SArray* pArray = taosArrayInit(pSchemaWrapper->nCols, sizeof(SColumnInfoData));
|
SArray* pArray = taosArrayInit(pSchemaWrapper->nCols, sizeof(SColumnInfoData));
|
||||||
if (pArray == NULL) {
|
if (pArray == NULL) {
|
||||||
return NULL;
|
return NULL;
|
||||||
|
|
Loading…
Reference in New Issue