diff --git a/include/util/tdef.h b/include/util/tdef.h index 35655c8eaf..f57d9b6f35 100644 --- a/include/util/tdef.h +++ b/include/util/tdef.h @@ -333,6 +333,8 @@ do { \ #define TSDB_QUERY_TYPE_NON_TYPE 0x00u // none type #define TSDB_QUERY_TYPE_FREE_RESOURCE 0x01u // free qhandle at vnode +#define TSDB_QUERY_TYPE_NON_TYPE 0x00u // none type +#define TSDB_QUERY_TYPE_FREE_RESOURCE 0x01u // free qhandle at vnode #define TSDB_META_COMPACT_RATIO 0 // disable tsdb meta compact by default diff --git a/source/client/src/tmq.c b/source/client/src/tmq.c index 9855a65b0b..229d3a9ec3 100644 --- a/source/client/src/tmq.c +++ b/source/client/src/tmq.c @@ -75,7 +75,7 @@ struct tmq_message_t { SMqConsumeRsp rsp; }; -typedef struct SMqClientVg { +typedef struct { // statistics int64_t pollCnt; // offset @@ -86,7 +86,7 @@ typedef struct SMqClientVg { SEpSet epSet; } SMqClientVg; -typedef struct SMqClientTopic { +typedef struct { // subscribe info int32_t sqlLen; char* sql; @@ -779,11 +779,9 @@ tmq_message_t* tmq_consumer_poll(tmq_t* tmq, int64_t blocking_time) { param->pVg = pVg; tsem_init(¶m->rspSem, 0, 0); - SRequestObj* pRequest = createRequest(tmq->pTscObj, NULL, NULL, TDMT_VND_CONSUME); pRequest->body.requestMsg = (SDataBuf){.pData = pReq, .len = sizeof(SMqConsumeReq), .handle = NULL}; - SMsgSendInfo* sendInfo = buildMsgInfoImpl(pRequest); sendInfo->requestObjRefId = 0; sendInfo->param = param; diff --git a/source/common/src/tmsg.c b/source/common/src/tmsg.c index ac46c0c48c..46feab7791 100644 --- a/source/common/src/tmsg.c +++ b/source/common/src/tmsg.c @@ -34,6 +34,7 @@ int32_t tInitSubmitMsgIter(SSubmitReq *pMsg, SSubmitMsgIter *pIter) { } pIter->totalLen = pMsg->length; + ASSERT(pIter->totalLen > 0); pIter->len = 0; pIter->pMsg = pMsg; if (pMsg->length <= sizeof(SSubmitReq)) { @@ -52,6 +53,7 @@ int32_t tGetSubmitMsgNext(SSubmitMsgIter *pIter, SSubmitBlk **pPBlock) { } else { SSubmitBlk *pSubmitBlk = (SSubmitBlk *)POINTER_SHIFT(pIter->pMsg, pIter->len); pIter->len += (sizeof(SSubmitBlk) + pSubmitBlk->dataLen + pSubmitBlk->schemaLen); + ASSERT(pIter->len > 0); } if (pIter->len > pIter->totalLen) { diff --git a/source/dnode/vnode/inc/vnode.h b/source/dnode/vnode/inc/vnode.h index 1fb0b05f9e..3a06674e3c 100644 --- a/source/dnode/vnode/inc/vnode.h +++ b/source/dnode/vnode/inc/vnode.h @@ -226,9 +226,9 @@ static FORCE_INLINE int tqReadHandleSetTbUidList(STqReadHandle *pHandle, const S return 0; } -void tqReadHandleSetMsg(STqReadHandle *pHandle, SSubmitReq *pMsg, int64_t ver); -bool tqNextDataBlock(STqReadHandle *pHandle); -int tqRetrieveDataBlockInfo(STqReadHandle *pHandle, SDataBlockInfo *pBlockInfo); +int32_t tqReadHandleSetMsg(STqReadHandle *pHandle, SSubmitReq *pMsg, int64_t ver); +bool tqNextDataBlock(STqReadHandle *pHandle); +int tqRetrieveDataBlockInfo(STqReadHandle *pHandle, SDataBlockInfo *pBlockInfo); // return SArray SArray *tqRetrieveDataBlock(STqReadHandle *pHandle); diff --git a/source/dnode/vnode/src/tq/tqRead.c b/source/dnode/vnode/src/tq/tqRead.c index e76d43becd..bfa811feec 100644 --- a/source/dnode/vnode/src/tq/tqRead.c +++ b/source/dnode/vnode/src/tq/tqRead.c @@ -31,13 +31,28 @@ STqReadHandle* tqInitSubmitMsgScanner(SMeta* pMeta) { return pReadHandle; } -void tqReadHandleSetMsg(STqReadHandle* pReadHandle, SSubmitReq* pMsg, int64_t ver) { +int32_t tqReadHandleSetMsg(STqReadHandle* pReadHandle, SSubmitReq* pMsg, int64_t ver) { pReadHandle->pMsg = pMsg; pMsg->length = htonl(pMsg->length); pMsg->numOfBlocks = htonl(pMsg->numOfBlocks); - tInitSubmitMsgIter(pMsg, &pReadHandle->msgIter); + + if (tInitSubmitMsgIter(pMsg, &pReadHandle->msgIter) < 0) return -1; + while (true) { + if (tGetSubmitMsgNext(&pReadHandle->msgIter, &pReadHandle->pBlock) < 0) return -1; + if (pReadHandle->pBlock == NULL) break; + + pReadHandle->pBlock->uid = htobe64(pReadHandle->pBlock->uid); + pReadHandle->pBlock->tid = htonl(pReadHandle->pBlock->tid); + pReadHandle->pBlock->sversion = htonl(pReadHandle->pBlock->sversion); + pReadHandle->pBlock->dataLen = htonl(pReadHandle->pBlock->dataLen); + pReadHandle->pBlock->schemaLen = htonl(pReadHandle->pBlock->schemaLen); + pReadHandle->pBlock->numOfRows = htons(pReadHandle->pBlock->numOfRows); + } + + if (tInitSubmitMsgIter(pMsg, &pReadHandle->msgIter) < 0) return -1; pReadHandle->ver = ver; memset(&pReadHandle->blkIter, 0, sizeof(SSubmitBlkIter)); + return 0; } bool tqNextDataBlock(STqReadHandle* pHandle) { @@ -47,19 +62,19 @@ bool tqNextDataBlock(STqReadHandle* pHandle) { } if (pHandle->pBlock == NULL) return false; - pHandle->pBlock->uid = htobe64(pHandle->pBlock->uid); + /*pHandle->pBlock->uid = htobe64(pHandle->pBlock->uid);*/ /*if (pHandle->tbUid == pHandle->pBlock->uid) {*/ ASSERT(pHandle->tbIdHash); void* ret = taosHashGet(pHandle->tbIdHash, &pHandle->pBlock->uid, sizeof(int64_t)); if (ret != NULL) { /*printf("retrieve one tb %ld\n", pHandle->pBlock->uid);*/ - pHandle->pBlock->tid = htonl(pHandle->pBlock->tid); - pHandle->pBlock->sversion = htonl(pHandle->pBlock->sversion); - pHandle->pBlock->dataLen = htonl(pHandle->pBlock->dataLen); - pHandle->pBlock->schemaLen = htonl(pHandle->pBlock->schemaLen); - pHandle->pBlock->numOfRows = htons(pHandle->pBlock->numOfRows); + /*pHandle->pBlock->tid = htonl(pHandle->pBlock->tid);*/ + /*pHandle->pBlock->sversion = htonl(pHandle->pBlock->sversion);*/ + /*pHandle->pBlock->dataLen = htonl(pHandle->pBlock->dataLen);*/ + /*pHandle->pBlock->schemaLen = htonl(pHandle->pBlock->schemaLen);*/ + /*pHandle->pBlock->numOfRows = htons(pHandle->pBlock->numOfRows);*/ return true; - } else { + /*} else {*/ /*printf("skip one tb %ld\n", pHandle->pBlock->uid);*/ } } diff --git a/source/libs/executor/src/executor.c b/source/libs/executor/src/executor.c index c47c83ba29..18485249b3 100644 --- a/source/libs/executor/src/executor.c +++ b/source/libs/executor/src/executor.c @@ -14,9 +14,9 @@ */ #include "executor.h" -#include "tq.h" #include "executorimpl.h" #include "planner.h" +#include "tq.h" static int32_t doSetStreamBlock(SOperatorInfo* pOperator, void* input, char* id) { ASSERT(pOperator != NULL); @@ -34,7 +34,10 @@ static int32_t doSetStreamBlock(SOperatorInfo* pOperator, void* input, char* id) return doSetStreamBlock(pOperator->pDownstream[0], input, id); } else { SStreamBlockScanInfo* pInfo = pOperator->info; - tqReadHandleSetMsg(pInfo->readerHandle, input, 0); + if (tqReadHandleSetMsg(pInfo->readerHandle, input, 0) < 0) { + qError("submit msg error while set stream msg, %s" PRIx64, id); + return TSDB_CODE_QRY_APP_ERROR; + } return TSDB_CODE_SUCCESS; } } @@ -48,9 +51,9 @@ int32_t qSetStreamInput(qTaskInfo_t tinfo, const void* input) { return TSDB_CODE_SUCCESS; } - SExecTaskInfo* pTaskInfo = (SExecTaskInfo*) tinfo; + SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo; - int32_t code = doSetStreamBlock(pTaskInfo->pRoot, (void*) input, GET_TASKID(pTaskInfo)); + int32_t code = doSetStreamBlock(pTaskInfo->pRoot, (void*)input, GET_TASKID(pTaskInfo)); if (code != TSDB_CODE_SUCCESS) { qError("%s failed to set the stream block data", GET_TASKID(pTaskInfo)); } else {