Merge pull request #10331 from taosdata/feature/tq
extract convert to set msg
This commit is contained in:
commit
c5854b83d1
|
@ -333,6 +333,8 @@ do { \
|
||||||
#define TSDB_QUERY_TYPE_NON_TYPE 0x00u // none type
|
#define TSDB_QUERY_TYPE_NON_TYPE 0x00u // none type
|
||||||
#define TSDB_QUERY_TYPE_FREE_RESOURCE 0x01u // free qhandle at vnode
|
#define TSDB_QUERY_TYPE_FREE_RESOURCE 0x01u // free qhandle at vnode
|
||||||
|
|
||||||
|
#define TSDB_QUERY_TYPE_NON_TYPE 0x00u // none type
|
||||||
|
#define TSDB_QUERY_TYPE_FREE_RESOURCE 0x01u // free qhandle at vnode
|
||||||
|
|
||||||
#define TSDB_META_COMPACT_RATIO 0 // disable tsdb meta compact by default
|
#define TSDB_META_COMPACT_RATIO 0 // disable tsdb meta compact by default
|
||||||
|
|
||||||
|
|
|
@ -75,7 +75,7 @@ struct tmq_message_t {
|
||||||
SMqConsumeRsp rsp;
|
SMqConsumeRsp rsp;
|
||||||
};
|
};
|
||||||
|
|
||||||
typedef struct SMqClientVg {
|
typedef struct {
|
||||||
// statistics
|
// statistics
|
||||||
int64_t pollCnt;
|
int64_t pollCnt;
|
||||||
// offset
|
// offset
|
||||||
|
@ -86,7 +86,7 @@ typedef struct SMqClientVg {
|
||||||
SEpSet epSet;
|
SEpSet epSet;
|
||||||
} SMqClientVg;
|
} SMqClientVg;
|
||||||
|
|
||||||
typedef struct SMqClientTopic {
|
typedef struct {
|
||||||
// subscribe info
|
// subscribe info
|
||||||
int32_t sqlLen;
|
int32_t sqlLen;
|
||||||
char* sql;
|
char* sql;
|
||||||
|
@ -779,11 +779,9 @@ tmq_message_t* tmq_consumer_poll(tmq_t* tmq, int64_t blocking_time) {
|
||||||
param->pVg = pVg;
|
param->pVg = pVg;
|
||||||
tsem_init(¶m->rspSem, 0, 0);
|
tsem_init(¶m->rspSem, 0, 0);
|
||||||
|
|
||||||
|
|
||||||
SRequestObj* pRequest = createRequest(tmq->pTscObj, NULL, NULL, TDMT_VND_CONSUME);
|
SRequestObj* pRequest = createRequest(tmq->pTscObj, NULL, NULL, TDMT_VND_CONSUME);
|
||||||
pRequest->body.requestMsg = (SDataBuf){.pData = pReq, .len = sizeof(SMqConsumeReq), .handle = NULL};
|
pRequest->body.requestMsg = (SDataBuf){.pData = pReq, .len = sizeof(SMqConsumeReq), .handle = NULL};
|
||||||
|
|
||||||
|
|
||||||
SMsgSendInfo* sendInfo = buildMsgInfoImpl(pRequest);
|
SMsgSendInfo* sendInfo = buildMsgInfoImpl(pRequest);
|
||||||
sendInfo->requestObjRefId = 0;
|
sendInfo->requestObjRefId = 0;
|
||||||
sendInfo->param = param;
|
sendInfo->param = param;
|
||||||
|
|
|
@ -34,6 +34,7 @@ int32_t tInitSubmitMsgIter(SSubmitReq *pMsg, SSubmitMsgIter *pIter) {
|
||||||
}
|
}
|
||||||
|
|
||||||
pIter->totalLen = pMsg->length;
|
pIter->totalLen = pMsg->length;
|
||||||
|
ASSERT(pIter->totalLen > 0);
|
||||||
pIter->len = 0;
|
pIter->len = 0;
|
||||||
pIter->pMsg = pMsg;
|
pIter->pMsg = pMsg;
|
||||||
if (pMsg->length <= sizeof(SSubmitReq)) {
|
if (pMsg->length <= sizeof(SSubmitReq)) {
|
||||||
|
@ -52,6 +53,7 @@ int32_t tGetSubmitMsgNext(SSubmitMsgIter *pIter, SSubmitBlk **pPBlock) {
|
||||||
} else {
|
} else {
|
||||||
SSubmitBlk *pSubmitBlk = (SSubmitBlk *)POINTER_SHIFT(pIter->pMsg, pIter->len);
|
SSubmitBlk *pSubmitBlk = (SSubmitBlk *)POINTER_SHIFT(pIter->pMsg, pIter->len);
|
||||||
pIter->len += (sizeof(SSubmitBlk) + pSubmitBlk->dataLen + pSubmitBlk->schemaLen);
|
pIter->len += (sizeof(SSubmitBlk) + pSubmitBlk->dataLen + pSubmitBlk->schemaLen);
|
||||||
|
ASSERT(pIter->len > 0);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pIter->len > pIter->totalLen) {
|
if (pIter->len > pIter->totalLen) {
|
||||||
|
|
|
@ -226,9 +226,9 @@ static FORCE_INLINE int tqReadHandleSetTbUidList(STqReadHandle *pHandle, const S
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
void tqReadHandleSetMsg(STqReadHandle *pHandle, SSubmitReq *pMsg, int64_t ver);
|
int32_t tqReadHandleSetMsg(STqReadHandle *pHandle, SSubmitReq *pMsg, int64_t ver);
|
||||||
bool tqNextDataBlock(STqReadHandle *pHandle);
|
bool tqNextDataBlock(STqReadHandle *pHandle);
|
||||||
int tqRetrieveDataBlockInfo(STqReadHandle *pHandle, SDataBlockInfo *pBlockInfo);
|
int tqRetrieveDataBlockInfo(STqReadHandle *pHandle, SDataBlockInfo *pBlockInfo);
|
||||||
// return SArray<SColumnInfoData>
|
// return SArray<SColumnInfoData>
|
||||||
SArray *tqRetrieveDataBlock(STqReadHandle *pHandle);
|
SArray *tqRetrieveDataBlock(STqReadHandle *pHandle);
|
||||||
|
|
||||||
|
|
|
@ -31,13 +31,28 @@ STqReadHandle* tqInitSubmitMsgScanner(SMeta* pMeta) {
|
||||||
return pReadHandle;
|
return pReadHandle;
|
||||||
}
|
}
|
||||||
|
|
||||||
void tqReadHandleSetMsg(STqReadHandle* pReadHandle, SSubmitReq* pMsg, int64_t ver) {
|
int32_t tqReadHandleSetMsg(STqReadHandle* pReadHandle, SSubmitReq* pMsg, int64_t ver) {
|
||||||
pReadHandle->pMsg = pMsg;
|
pReadHandle->pMsg = pMsg;
|
||||||
pMsg->length = htonl(pMsg->length);
|
pMsg->length = htonl(pMsg->length);
|
||||||
pMsg->numOfBlocks = htonl(pMsg->numOfBlocks);
|
pMsg->numOfBlocks = htonl(pMsg->numOfBlocks);
|
||||||
tInitSubmitMsgIter(pMsg, &pReadHandle->msgIter);
|
|
||||||
|
if (tInitSubmitMsgIter(pMsg, &pReadHandle->msgIter) < 0) return -1;
|
||||||
|
while (true) {
|
||||||
|
if (tGetSubmitMsgNext(&pReadHandle->msgIter, &pReadHandle->pBlock) < 0) return -1;
|
||||||
|
if (pReadHandle->pBlock == NULL) break;
|
||||||
|
|
||||||
|
pReadHandle->pBlock->uid = htobe64(pReadHandle->pBlock->uid);
|
||||||
|
pReadHandle->pBlock->tid = htonl(pReadHandle->pBlock->tid);
|
||||||
|
pReadHandle->pBlock->sversion = htonl(pReadHandle->pBlock->sversion);
|
||||||
|
pReadHandle->pBlock->dataLen = htonl(pReadHandle->pBlock->dataLen);
|
||||||
|
pReadHandle->pBlock->schemaLen = htonl(pReadHandle->pBlock->schemaLen);
|
||||||
|
pReadHandle->pBlock->numOfRows = htons(pReadHandle->pBlock->numOfRows);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (tInitSubmitMsgIter(pMsg, &pReadHandle->msgIter) < 0) return -1;
|
||||||
pReadHandle->ver = ver;
|
pReadHandle->ver = ver;
|
||||||
memset(&pReadHandle->blkIter, 0, sizeof(SSubmitBlkIter));
|
memset(&pReadHandle->blkIter, 0, sizeof(SSubmitBlkIter));
|
||||||
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
bool tqNextDataBlock(STqReadHandle* pHandle) {
|
bool tqNextDataBlock(STqReadHandle* pHandle) {
|
||||||
|
@ -47,19 +62,19 @@ bool tqNextDataBlock(STqReadHandle* pHandle) {
|
||||||
}
|
}
|
||||||
if (pHandle->pBlock == NULL) return false;
|
if (pHandle->pBlock == NULL) return false;
|
||||||
|
|
||||||
pHandle->pBlock->uid = htobe64(pHandle->pBlock->uid);
|
/*pHandle->pBlock->uid = htobe64(pHandle->pBlock->uid);*/
|
||||||
/*if (pHandle->tbUid == pHandle->pBlock->uid) {*/
|
/*if (pHandle->tbUid == pHandle->pBlock->uid) {*/
|
||||||
ASSERT(pHandle->tbIdHash);
|
ASSERT(pHandle->tbIdHash);
|
||||||
void* ret = taosHashGet(pHandle->tbIdHash, &pHandle->pBlock->uid, sizeof(int64_t));
|
void* ret = taosHashGet(pHandle->tbIdHash, &pHandle->pBlock->uid, sizeof(int64_t));
|
||||||
if (ret != NULL) {
|
if (ret != NULL) {
|
||||||
/*printf("retrieve one tb %ld\n", pHandle->pBlock->uid);*/
|
/*printf("retrieve one tb %ld\n", pHandle->pBlock->uid);*/
|
||||||
pHandle->pBlock->tid = htonl(pHandle->pBlock->tid);
|
/*pHandle->pBlock->tid = htonl(pHandle->pBlock->tid);*/
|
||||||
pHandle->pBlock->sversion = htonl(pHandle->pBlock->sversion);
|
/*pHandle->pBlock->sversion = htonl(pHandle->pBlock->sversion);*/
|
||||||
pHandle->pBlock->dataLen = htonl(pHandle->pBlock->dataLen);
|
/*pHandle->pBlock->dataLen = htonl(pHandle->pBlock->dataLen);*/
|
||||||
pHandle->pBlock->schemaLen = htonl(pHandle->pBlock->schemaLen);
|
/*pHandle->pBlock->schemaLen = htonl(pHandle->pBlock->schemaLen);*/
|
||||||
pHandle->pBlock->numOfRows = htons(pHandle->pBlock->numOfRows);
|
/*pHandle->pBlock->numOfRows = htons(pHandle->pBlock->numOfRows);*/
|
||||||
return true;
|
return true;
|
||||||
} else {
|
/*} else {*/
|
||||||
/*printf("skip one tb %ld\n", pHandle->pBlock->uid);*/
|
/*printf("skip one tb %ld\n", pHandle->pBlock->uid);*/
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -14,9 +14,9 @@
|
||||||
*/
|
*/
|
||||||
|
|
||||||
#include "executor.h"
|
#include "executor.h"
|
||||||
#include "tq.h"
|
|
||||||
#include "executorimpl.h"
|
#include "executorimpl.h"
|
||||||
#include "planner.h"
|
#include "planner.h"
|
||||||
|
#include "tq.h"
|
||||||
|
|
||||||
static int32_t doSetStreamBlock(SOperatorInfo* pOperator, void* input, char* id) {
|
static int32_t doSetStreamBlock(SOperatorInfo* pOperator, void* input, char* id) {
|
||||||
ASSERT(pOperator != NULL);
|
ASSERT(pOperator != NULL);
|
||||||
|
@ -34,7 +34,10 @@ static int32_t doSetStreamBlock(SOperatorInfo* pOperator, void* input, char* id)
|
||||||
return doSetStreamBlock(pOperator->pDownstream[0], input, id);
|
return doSetStreamBlock(pOperator->pDownstream[0], input, id);
|
||||||
} else {
|
} else {
|
||||||
SStreamBlockScanInfo* pInfo = pOperator->info;
|
SStreamBlockScanInfo* pInfo = pOperator->info;
|
||||||
tqReadHandleSetMsg(pInfo->readerHandle, input, 0);
|
if (tqReadHandleSetMsg(pInfo->readerHandle, input, 0) < 0) {
|
||||||
|
qError("submit msg error while set stream msg, %s" PRIx64, id);
|
||||||
|
return TSDB_CODE_QRY_APP_ERROR;
|
||||||
|
}
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -48,9 +51,9 @@ int32_t qSetStreamInput(qTaskInfo_t tinfo, const void* input) {
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
SExecTaskInfo* pTaskInfo = (SExecTaskInfo*) tinfo;
|
SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
|
||||||
|
|
||||||
int32_t code = doSetStreamBlock(pTaskInfo->pRoot, (void*) input, GET_TASKID(pTaskInfo));
|
int32_t code = doSetStreamBlock(pTaskInfo->pRoot, (void*)input, GET_TASKID(pTaskInfo));
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
qError("%s failed to set the stream block data", GET_TASKID(pTaskInfo));
|
qError("%s failed to set the stream block data", GET_TASKID(pTaskInfo));
|
||||||
} else {
|
} else {
|
||||||
|
|
Loading…
Reference in New Issue