Merge pull request #10025 from taosdata/feature/tq

Feature/tq
This commit is contained in:
Liu Jicong 2022-01-26 12:26:49 +08:00 committed by GitHub
commit 3dd00978ac
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 42 additions and 12 deletions

View File

@ -626,8 +626,10 @@ static FORCE_INLINE int32_t tEncodeSMqConsumerTopic(void** buf, SMqConsumerTopic
int32_t tlen = 0; int32_t tlen = 0;
tlen += taosEncodeString(buf, pConsumerTopic->name); tlen += taosEncodeString(buf, pConsumerTopic->name);
tlen += taosEncodeFixedI32(buf, pConsumerTopic->epoch); tlen += taosEncodeFixedI32(buf, pConsumerTopic->epoch);
ASSERT(pConsumerTopic->pVgInfo); int32_t sz = 0;
int32_t sz = taosArrayGetSize(pConsumerTopic->pVgInfo); if (pConsumerTopic->pVgInfo != NULL) {
sz = taosArrayGetSize(pConsumerTopic->pVgInfo);
}
tlen += taosEncodeFixedI32(buf, sz); tlen += taosEncodeFixedI32(buf, sz);
for (int32_t i = 0; i < sz; i++) { for (int32_t i = 0; i < sz; i++) {
int32_t* pVgInfo = taosArrayGet(pConsumerTopic->pVgInfo, i); int32_t* pVgInfo = taosArrayGet(pConsumerTopic->pVgInfo, i);

View File

@ -757,12 +757,12 @@ int32_t tqProcessConsumeReq(STQ* pTq, SRpcMsg* pMsg) {
if (pTopic->buffer.lastOffset == -1 || pReq->offset > pTopic->buffer.lastOffset) { if (pTopic->buffer.lastOffset == -1 || pReq->offset > pTopic->buffer.lastOffset) {
pTopic->buffer.lastOffset = pReq->offset; pTopic->buffer.lastOffset = pReq->offset;
} }
// put output into rsp
SMqConsumeRsp rsp = {
.consumerId = consumerId,
.numOfTopics = 1
};
} }
// put output into rsp
SMqConsumeRsp rsp = {
.consumerId = consumerId,
.numOfTopics = 1
};
return 0; return 0;
} }
@ -822,14 +822,29 @@ STqReadHandle* tqInitSubmitMsgScanner(SMeta* pMeta) {
void tqReadHandleSetMsg(STqReadHandle* pReadHandle, SSubmitMsg* pMsg, int64_t ver) { void tqReadHandleSetMsg(STqReadHandle* pReadHandle, SSubmitMsg* pMsg, int64_t ver) {
pReadHandle->pMsg = pMsg; pReadHandle->pMsg = pMsg;
pMsg->length = htonl(pMsg->length);
pMsg->numOfBlocks = htonl(pMsg->numOfBlocks);
tInitSubmitMsgIter(pMsg, &pReadHandle->msgIter); tInitSubmitMsgIter(pMsg, &pReadHandle->msgIter);
pReadHandle->ver = ver; pReadHandle->ver = ver;
memset(&pReadHandle->blkIter, 0, sizeof(SSubmitBlkIter)); memset(&pReadHandle->blkIter, 0, sizeof(SSubmitBlkIter));
} }
bool tqNextDataBlock(STqReadHandle* pHandle) { bool tqNextDataBlock(STqReadHandle* pHandle) {
while (tGetSubmitMsgNext(&pHandle->msgIter, &pHandle->pBlock)) { while (1) {
if (pHandle->tbUid == pHandle->pBlock->uid) return true; if (tGetSubmitMsgNext(&pHandle->msgIter, &pHandle->pBlock) < 0) {
return false;
}
if (pHandle->pBlock == NULL) return false;
pHandle->pBlock->uid = htobe64(pHandle->pBlock->uid);
if (pHandle->tbUid == pHandle->pBlock->uid){
pHandle->pBlock->tid = htonl(pHandle->pBlock->tid);
pHandle->pBlock->sversion = htonl(pHandle->pBlock->sversion);
pHandle->pBlock->dataLen = htonl(pHandle->pBlock->dataLen);
pHandle->pBlock->schemaLen = htonl(pHandle->pBlock->schemaLen);
pHandle->pBlock->numOfRows = htons(pHandle->pBlock->numOfRows);
return true;
}
} }
return false; return false;
} }
@ -845,8 +860,18 @@ int tqRetrieveDataBlockInfo(STqReadHandle* pHandle, SDataBlockInfo* pBlockInfo)
SArray* tqRetrieveDataBlock(STqReadHandle* pHandle) { SArray* tqRetrieveDataBlock(STqReadHandle* pHandle) {
int32_t sversion = pHandle->pBlock->sversion; int32_t sversion = pHandle->pBlock->sversion;
SSchemaWrapper* pSchemaWrapper = metaGetTableSchema(pHandle->pMeta, pHandle->pBlock->uid, sversion, true); //TODO : change sversion
STSchema* pTschema = metaGetTbTSchema(pHandle->pMeta, pHandle->pBlock->uid, sversion); STSchema* pTschema = metaGetTbTSchema(pHandle->pMeta, pHandle->pBlock->uid, 0);
tb_uid_t quid;
STbCfg* pTbCfg = metaGetTbInfoByUid(pHandle->pMeta, pHandle->pBlock->uid);
if (pTbCfg->type == META_CHILD_TABLE) {
quid = pTbCfg->ctbCfg.suid;
} else {
quid = pHandle->pBlock->uid;
}
SSchemaWrapper* pSchemaWrapper = metaGetTableSchema(pHandle->pMeta, quid, 0, true);
SArray* pArray = taosArrayInit(pSchemaWrapper->nCols, sizeof(SColumnInfoData)); SArray* pArray = taosArrayInit(pSchemaWrapper->nCols, sizeof(SColumnInfoData));
if (pArray == NULL) { if (pArray == NULL) {
return NULL; return NULL;

View File

@ -5468,6 +5468,9 @@ SOperatorInfo* createStreamScanOperatorInfo(void *streamReadHandle, SArray* pExp
return NULL; return NULL;
} }
// todo dynamic set the value of 4096
pInfo->pRes = createOutputBuf_rv(pExprInfo, 4096);
int32_t numOfOutput = (int32_t) taosArrayGetSize(pExprInfo); int32_t numOfOutput = (int32_t) taosArrayGetSize(pExprInfo);
SArray* pColList = taosArrayInit(numOfOutput, sizeof(int32_t)); SArray* pColList = taosArrayInit(numOfOutput, sizeof(int32_t));
for(int32_t i = 0; i < numOfOutput; ++i) { for(int32_t i = 0; i < numOfOutput; ++i) {
@ -7432,7 +7435,7 @@ SOperatorInfo* createTagScanOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SExprInfo
SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo)); SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo));
pOperator->name = "SeqTableTagScan"; pOperator->name = "SeqTableTagScan";
// pOperator->operatorType = OP_TagScan; pOperator->operatorType = OP_TagScan;
pOperator->blockingOptr = false; pOperator->blockingOptr = false;
pOperator->status = OP_IN_EXECUTING; pOperator->status = OP_IN_EXECUTING;
pOperator->info = pInfo; pOperator->info = pInfo;