extract convert to set msg
This commit is contained in:
parent
82bcecc4ff
commit
7ee6657cf7
|
@ -94,20 +94,21 @@ extern const int32_t TYPE_BYTES[15];
|
||||||
#define TSDB_TIME_PRECISION_MICRO_STR "us"
|
#define TSDB_TIME_PRECISION_MICRO_STR "us"
|
||||||
#define TSDB_TIME_PRECISION_NANO_STR "ns"
|
#define TSDB_TIME_PRECISION_NANO_STR "ns"
|
||||||
|
|
||||||
#define TSDB_TICK_PER_SECOND(precision) ((int64_t)((precision)==TSDB_TIME_PRECISION_MILLI ? 1e3L : ((precision)==TSDB_TIME_PRECISION_MICRO ? 1e6L : 1e9L)))
|
#define TSDB_TICK_PER_SECOND(precision) \
|
||||||
|
((int64_t)((precision) == TSDB_TIME_PRECISION_MILLI ? 1e3L \
|
||||||
|
: ((precision) == TSDB_TIME_PRECISION_MICRO ? 1e6L : 1e9L)))
|
||||||
|
|
||||||
#define T_MEMBER_SIZE(type, member) sizeof(((type *)0)->member)
|
#define T_MEMBER_SIZE(type, member) sizeof(((type *)0)->member)
|
||||||
#define T_APPEND_MEMBER(dst, ptr, type, member) \
|
#define T_APPEND_MEMBER(dst, ptr, type, member) \
|
||||||
do {\
|
do { \
|
||||||
memcpy((void *)(dst), (void *)(&((ptr)->member)), T_MEMBER_SIZE(type, member));\
|
memcpy((void *)(dst), (void *)(&((ptr)->member)), T_MEMBER_SIZE(type, member)); \
|
||||||
dst = (void *)((char *)(dst) + T_MEMBER_SIZE(type, member));\
|
dst = (void *)((char *)(dst) + T_MEMBER_SIZE(type, member)); \
|
||||||
} while(0)
|
} while (0)
|
||||||
#define T_READ_MEMBER(src, type, target) \
|
#define T_READ_MEMBER(src, type, target) \
|
||||||
do { \
|
do { \
|
||||||
(target) = *(type *)(src); \
|
(target) = *(type *)(src); \
|
||||||
(src) = (void *)((char *)src + sizeof(type));\
|
(src) = (void *)((char *)src + sizeof(type)); \
|
||||||
} while(0)
|
} while (0)
|
||||||
|
|
||||||
|
|
||||||
// TODO: check if below is necessary
|
// TODO: check if below is necessary
|
||||||
#define TSDB_RELATION_INVALID 0
|
#define TSDB_RELATION_INVALID 0
|
||||||
|
@ -159,7 +160,7 @@ do { \
|
||||||
#define TSDB_ACCT_ID_LEN 11
|
#define TSDB_ACCT_ID_LEN 11
|
||||||
|
|
||||||
#define TSDB_MAX_COLUMNS 4096
|
#define TSDB_MAX_COLUMNS 4096
|
||||||
#define TSDB_MIN_COLUMNS 2 //PRIMARY COLUMN(timestamp) + other columns
|
#define TSDB_MIN_COLUMNS 2 // PRIMARY COLUMN(timestamp) + other columns
|
||||||
|
|
||||||
#define TSDB_NODE_NAME_LEN 64
|
#define TSDB_NODE_NAME_LEN 64
|
||||||
#define TSDB_TABLE_NAME_LEN 193 // it is a null-terminated string
|
#define TSDB_TABLE_NAME_LEN 193 // it is a null-terminated string
|
||||||
|
@ -184,11 +185,11 @@ do { \
|
||||||
#define TSDB_MAX_SAVED_SQL_LEN TSDB_MAX_COLUMNS * 64
|
#define TSDB_MAX_SAVED_SQL_LEN TSDB_MAX_COLUMNS * 64
|
||||||
#define TSDB_MAX_SQL_LEN TSDB_PAYLOAD_SIZE
|
#define TSDB_MAX_SQL_LEN TSDB_PAYLOAD_SIZE
|
||||||
#define TSDB_MAX_SQL_SHOW_LEN 1024
|
#define TSDB_MAX_SQL_SHOW_LEN 1024
|
||||||
#define TSDB_MAX_ALLOWED_SQL_LEN (1*1024*1024u) // sql length should be less than 1mb
|
#define TSDB_MAX_ALLOWED_SQL_LEN (1 * 1024 * 1024u) // sql length should be less than 1mb
|
||||||
|
|
||||||
#define TSDB_APP_NAME_LEN TSDB_UNI_LEN
|
#define TSDB_APP_NAME_LEN TSDB_UNI_LEN
|
||||||
#define TSDB_STB_COMMENT_LEN 1024
|
#define TSDB_STB_COMMENT_LEN 1024
|
||||||
/**
|
/**
|
||||||
* In some scenarios uint16_t (0~65535) is used to store the row len.
|
* In some scenarios uint16_t (0~65535) is used to store the row len.
|
||||||
* - Firstly, we use 65531(65535 - 4), as the SDataRow/SKVRow contains 4 bits header.
|
* - Firstly, we use 65531(65535 - 4), as the SDataRow/SKVRow contains 4 bits header.
|
||||||
* - Secondly, if all cols are VarDataT type except primary key, we need 4 bits to store the offset, thus
|
* - Secondly, if all cols are VarDataT type except primary key, we need 4 bits to store the offset, thus
|
||||||
|
@ -235,7 +236,7 @@ do { \
|
||||||
#define TSDB_DB_TYPE_DEFAULT 0
|
#define TSDB_DB_TYPE_DEFAULT 0
|
||||||
#define TSDB_DB_TYPE_TOPIC 1
|
#define TSDB_DB_TYPE_TOPIC 1
|
||||||
|
|
||||||
#define TSDB_DEFAULT_PKT_SIZE 65480 //same as RPC_MAX_UDP_SIZE
|
#define TSDB_DEFAULT_PKT_SIZE 65480 // same as RPC_MAX_UDP_SIZE
|
||||||
|
|
||||||
#define TSDB_PAYLOAD_SIZE TSDB_DEFAULT_PKT_SIZE
|
#define TSDB_PAYLOAD_SIZE TSDB_DEFAULT_PKT_SIZE
|
||||||
#define TSDB_DEFAULT_PAYLOAD_SIZE 5120 // default payload size, greater than PATH_MAX value
|
#define TSDB_DEFAULT_PAYLOAD_SIZE 5120 // default payload size, greater than PATH_MAX value
|
||||||
|
@ -323,8 +324,8 @@ do { \
|
||||||
#define TSDB_MAX_UNION_CLAUSE 5
|
#define TSDB_MAX_UNION_CLAUSE 5
|
||||||
|
|
||||||
#define TSDB_MAX_FIELD_LEN 16384
|
#define TSDB_MAX_FIELD_LEN 16384
|
||||||
#define TSDB_MAX_BINARY_LEN (TSDB_MAX_FIELD_LEN-TSDB_KEYSIZE) // keep 16384
|
#define TSDB_MAX_BINARY_LEN (TSDB_MAX_FIELD_LEN - TSDB_KEYSIZE) // keep 16384
|
||||||
#define TSDB_MAX_NCHAR_LEN (TSDB_MAX_FIELD_LEN-TSDB_KEYSIZE) // keep 16384
|
#define TSDB_MAX_NCHAR_LEN (TSDB_MAX_FIELD_LEN - TSDB_KEYSIZE) // keep 16384
|
||||||
#define PRIMARYKEY_TIMESTAMP_COL_ID 1
|
#define PRIMARYKEY_TIMESTAMP_COL_ID 1
|
||||||
#define COL_REACH_END(colId, maxColId) ((colId) > (maxColId))
|
#define COL_REACH_END(colId, maxColId) ((colId) > (maxColId))
|
||||||
|
|
||||||
|
@ -333,11 +334,8 @@ do { \
|
||||||
#define TSDB_QUERY_TYPE_NON_TYPE 0x00u // none type
|
#define TSDB_QUERY_TYPE_NON_TYPE 0x00u // none type
|
||||||
#define TSDB_QUERY_TYPE_FREE_RESOURCE 0x01u // free qhandle at vnode
|
#define TSDB_QUERY_TYPE_FREE_RESOURCE 0x01u // free qhandle at vnode
|
||||||
|
|
||||||
|
|
||||||
#define TSDB_META_COMPACT_RATIO 0 // disable tsdb meta compact by default
|
#define TSDB_META_COMPACT_RATIO 0 // disable tsdb meta compact by default
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* 1. ordinary sub query for select * from super_table
|
* 1. ordinary sub query for select * from super_table
|
||||||
* 2. all sqlobj generated by createSubqueryObj with this flag
|
* 2. all sqlobj generated by createSubqueryObj with this flag
|
||||||
|
@ -376,7 +374,7 @@ do { \
|
||||||
#define TSDB_DEFAULT_STABLES_HASH_SIZE 100
|
#define TSDB_DEFAULT_STABLES_HASH_SIZE 100
|
||||||
#define TSDB_DEFAULT_CTABLES_HASH_SIZE 20000
|
#define TSDB_DEFAULT_CTABLES_HASH_SIZE 20000
|
||||||
|
|
||||||
#define TSDB_MAX_WAL_SIZE (1024*1024*3)
|
#define TSDB_MAX_WAL_SIZE (1024 * 1024 * 3)
|
||||||
|
|
||||||
#define TSDB_ARB_DUMMY_TIME 4765104000000 // 2121-01-01 00:00:00.000, :P
|
#define TSDB_ARB_DUMMY_TIME 4765104000000 // 2121-01-01 00:00:00.000, :P
|
||||||
|
|
||||||
|
|
|
@ -75,7 +75,7 @@ struct tmq_message_t {
|
||||||
SMqConsumeRsp rsp;
|
SMqConsumeRsp rsp;
|
||||||
};
|
};
|
||||||
|
|
||||||
typedef struct SMqClientVg {
|
typedef struct {
|
||||||
// statistics
|
// statistics
|
||||||
int64_t pollCnt;
|
int64_t pollCnt;
|
||||||
// offset
|
// offset
|
||||||
|
@ -86,7 +86,7 @@ typedef struct SMqClientVg {
|
||||||
SEpSet epSet;
|
SEpSet epSet;
|
||||||
} SMqClientVg;
|
} SMqClientVg;
|
||||||
|
|
||||||
typedef struct SMqClientTopic {
|
typedef struct {
|
||||||
// subscribe info
|
// subscribe info
|
||||||
int32_t sqlLen;
|
int32_t sqlLen;
|
||||||
char* sql;
|
char* sql;
|
||||||
|
@ -779,11 +779,9 @@ tmq_message_t* tmq_consumer_poll(tmq_t* tmq, int64_t blocking_time) {
|
||||||
param->pVg = pVg;
|
param->pVg = pVg;
|
||||||
tsem_init(¶m->rspSem, 0, 0);
|
tsem_init(¶m->rspSem, 0, 0);
|
||||||
|
|
||||||
|
|
||||||
SRequestObj* pRequest = createRequest(tmq->pTscObj, NULL, NULL, TDMT_VND_CONSUME);
|
SRequestObj* pRequest = createRequest(tmq->pTscObj, NULL, NULL, TDMT_VND_CONSUME);
|
||||||
pRequest->body.requestMsg = (SDataBuf){.pData = pReq, .len = sizeof(SMqConsumeReq), .handle = NULL};
|
pRequest->body.requestMsg = (SDataBuf){.pData = pReq, .len = sizeof(SMqConsumeReq), .handle = NULL};
|
||||||
|
|
||||||
|
|
||||||
SMsgSendInfo* sendInfo = buildMsgInfoImpl(pRequest);
|
SMsgSendInfo* sendInfo = buildMsgInfoImpl(pRequest);
|
||||||
sendInfo->requestObjRefId = 0;
|
sendInfo->requestObjRefId = 0;
|
||||||
sendInfo->param = param;
|
sendInfo->param = param;
|
||||||
|
|
|
@ -34,6 +34,7 @@ int32_t tInitSubmitMsgIter(SSubmitReq *pMsg, SSubmitMsgIter *pIter) {
|
||||||
}
|
}
|
||||||
|
|
||||||
pIter->totalLen = pMsg->length;
|
pIter->totalLen = pMsg->length;
|
||||||
|
ASSERT(pIter->totalLen > 0);
|
||||||
pIter->len = 0;
|
pIter->len = 0;
|
||||||
pIter->pMsg = pMsg;
|
pIter->pMsg = pMsg;
|
||||||
if (pMsg->length <= sizeof(SSubmitReq)) {
|
if (pMsg->length <= sizeof(SSubmitReq)) {
|
||||||
|
@ -52,6 +53,7 @@ int32_t tGetSubmitMsgNext(SSubmitMsgIter *pIter, SSubmitBlk **pPBlock) {
|
||||||
} else {
|
} else {
|
||||||
SSubmitBlk *pSubmitBlk = (SSubmitBlk *)POINTER_SHIFT(pIter->pMsg, pIter->len);
|
SSubmitBlk *pSubmitBlk = (SSubmitBlk *)POINTER_SHIFT(pIter->pMsg, pIter->len);
|
||||||
pIter->len += (sizeof(SSubmitBlk) + pSubmitBlk->dataLen + pSubmitBlk->schemaLen);
|
pIter->len += (sizeof(SSubmitBlk) + pSubmitBlk->dataLen + pSubmitBlk->schemaLen);
|
||||||
|
ASSERT(pIter->len > 0);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pIter->len > pIter->totalLen) {
|
if (pIter->len > pIter->totalLen) {
|
||||||
|
|
|
@ -226,7 +226,7 @@ static FORCE_INLINE int tqReadHandleSetTbUidList(STqReadHandle *pHandle, const S
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
void tqReadHandleSetMsg(STqReadHandle *pHandle, SSubmitReq *pMsg, int64_t ver);
|
int32_t tqReadHandleSetMsg(STqReadHandle *pHandle, SSubmitReq *pMsg, int64_t ver);
|
||||||
bool tqNextDataBlock(STqReadHandle *pHandle);
|
bool tqNextDataBlock(STqReadHandle *pHandle);
|
||||||
int tqRetrieveDataBlockInfo(STqReadHandle *pHandle, SDataBlockInfo *pBlockInfo);
|
int tqRetrieveDataBlockInfo(STqReadHandle *pHandle, SDataBlockInfo *pBlockInfo);
|
||||||
// return SArray<SColumnInfoData>
|
// return SArray<SColumnInfoData>
|
||||||
|
|
|
@ -31,13 +31,28 @@ STqReadHandle* tqInitSubmitMsgScanner(SMeta* pMeta) {
|
||||||
return pReadHandle;
|
return pReadHandle;
|
||||||
}
|
}
|
||||||
|
|
||||||
void tqReadHandleSetMsg(STqReadHandle* pReadHandle, SSubmitReq* pMsg, int64_t ver) {
|
int32_t tqReadHandleSetMsg(STqReadHandle* pReadHandle, SSubmitReq* pMsg, int64_t ver) {
|
||||||
pReadHandle->pMsg = pMsg;
|
pReadHandle->pMsg = pMsg;
|
||||||
pMsg->length = htonl(pMsg->length);
|
pMsg->length = htonl(pMsg->length);
|
||||||
pMsg->numOfBlocks = htonl(pMsg->numOfBlocks);
|
pMsg->numOfBlocks = htonl(pMsg->numOfBlocks);
|
||||||
tInitSubmitMsgIter(pMsg, &pReadHandle->msgIter);
|
|
||||||
|
if (tInitSubmitMsgIter(pMsg, &pReadHandle->msgIter) < 0) return -1;
|
||||||
|
while (true) {
|
||||||
|
if (tGetSubmitMsgNext(&pReadHandle->msgIter, &pReadHandle->pBlock) < 0) return -1;
|
||||||
|
if (pReadHandle->pBlock == NULL) break;
|
||||||
|
|
||||||
|
pReadHandle->pBlock->uid = htobe64(pReadHandle->pBlock->uid);
|
||||||
|
pReadHandle->pBlock->tid = htonl(pReadHandle->pBlock->tid);
|
||||||
|
pReadHandle->pBlock->sversion = htonl(pReadHandle->pBlock->sversion);
|
||||||
|
pReadHandle->pBlock->dataLen = htonl(pReadHandle->pBlock->dataLen);
|
||||||
|
pReadHandle->pBlock->schemaLen = htonl(pReadHandle->pBlock->schemaLen);
|
||||||
|
pReadHandle->pBlock->numOfRows = htons(pReadHandle->pBlock->numOfRows);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (tInitSubmitMsgIter(pMsg, &pReadHandle->msgIter) < 0) return -1;
|
||||||
pReadHandle->ver = ver;
|
pReadHandle->ver = ver;
|
||||||
memset(&pReadHandle->blkIter, 0, sizeof(SSubmitBlkIter));
|
memset(&pReadHandle->blkIter, 0, sizeof(SSubmitBlkIter));
|
||||||
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
bool tqNextDataBlock(STqReadHandle* pHandle) {
|
bool tqNextDataBlock(STqReadHandle* pHandle) {
|
||||||
|
@ -47,19 +62,19 @@ bool tqNextDataBlock(STqReadHandle* pHandle) {
|
||||||
}
|
}
|
||||||
if (pHandle->pBlock == NULL) return false;
|
if (pHandle->pBlock == NULL) return false;
|
||||||
|
|
||||||
pHandle->pBlock->uid = htobe64(pHandle->pBlock->uid);
|
/*pHandle->pBlock->uid = htobe64(pHandle->pBlock->uid);*/
|
||||||
/*if (pHandle->tbUid == pHandle->pBlock->uid) {*/
|
/*if (pHandle->tbUid == pHandle->pBlock->uid) {*/
|
||||||
ASSERT(pHandle->tbIdHash);
|
ASSERT(pHandle->tbIdHash);
|
||||||
void* ret = taosHashGet(pHandle->tbIdHash, &pHandle->pBlock->uid, sizeof(int64_t));
|
void* ret = taosHashGet(pHandle->tbIdHash, &pHandle->pBlock->uid, sizeof(int64_t));
|
||||||
if (ret != NULL) {
|
if (ret != NULL) {
|
||||||
/*printf("retrieve one tb %ld\n", pHandle->pBlock->uid);*/
|
/*printf("retrieve one tb %ld\n", pHandle->pBlock->uid);*/
|
||||||
pHandle->pBlock->tid = htonl(pHandle->pBlock->tid);
|
/*pHandle->pBlock->tid = htonl(pHandle->pBlock->tid);*/
|
||||||
pHandle->pBlock->sversion = htonl(pHandle->pBlock->sversion);
|
/*pHandle->pBlock->sversion = htonl(pHandle->pBlock->sversion);*/
|
||||||
pHandle->pBlock->dataLen = htonl(pHandle->pBlock->dataLen);
|
/*pHandle->pBlock->dataLen = htonl(pHandle->pBlock->dataLen);*/
|
||||||
pHandle->pBlock->schemaLen = htonl(pHandle->pBlock->schemaLen);
|
/*pHandle->pBlock->schemaLen = htonl(pHandle->pBlock->schemaLen);*/
|
||||||
pHandle->pBlock->numOfRows = htons(pHandle->pBlock->numOfRows);
|
/*pHandle->pBlock->numOfRows = htons(pHandle->pBlock->numOfRows);*/
|
||||||
return true;
|
return true;
|
||||||
} else {
|
/*} else {*/
|
||||||
/*printf("skip one tb %ld\n", pHandle->pBlock->uid);*/
|
/*printf("skip one tb %ld\n", pHandle->pBlock->uid);*/
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -14,9 +14,9 @@
|
||||||
*/
|
*/
|
||||||
|
|
||||||
#include "executor.h"
|
#include "executor.h"
|
||||||
#include "tq.h"
|
|
||||||
#include "executorimpl.h"
|
#include "executorimpl.h"
|
||||||
#include "planner.h"
|
#include "planner.h"
|
||||||
|
#include "tq.h"
|
||||||
|
|
||||||
static int32_t doSetStreamBlock(SOperatorInfo* pOperator, void* input, char* id) {
|
static int32_t doSetStreamBlock(SOperatorInfo* pOperator, void* input, char* id) {
|
||||||
ASSERT(pOperator != NULL);
|
ASSERT(pOperator != NULL);
|
||||||
|
@ -34,7 +34,10 @@ static int32_t doSetStreamBlock(SOperatorInfo* pOperator, void* input, char* id)
|
||||||
return doSetStreamBlock(pOperator->pDownstream[0], input, id);
|
return doSetStreamBlock(pOperator->pDownstream[0], input, id);
|
||||||
} else {
|
} else {
|
||||||
SStreamBlockScanInfo* pInfo = pOperator->info;
|
SStreamBlockScanInfo* pInfo = pOperator->info;
|
||||||
tqReadHandleSetMsg(pInfo->readerHandle, input, 0);
|
if (tqReadHandleSetMsg(pInfo->readerHandle, input, 0) < 0) {
|
||||||
|
qError("submit msg error while set stream msg, %s" PRIx64, id);
|
||||||
|
return TSDB_CODE_QRY_APP_ERROR;
|
||||||
|
}
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -48,9 +51,9 @@ int32_t qSetStreamInput(qTaskInfo_t tinfo, const void* input) {
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
SExecTaskInfo* pTaskInfo = (SExecTaskInfo*) tinfo;
|
SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
|
||||||
|
|
||||||
int32_t code = doSetStreamBlock(pTaskInfo->pRoot, (void*) input, GET_TASKID(pTaskInfo));
|
int32_t code = doSetStreamBlock(pTaskInfo->pRoot, (void*)input, GET_TASKID(pTaskInfo));
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
qError("%s failed to set the stream block data", GET_TASKID(pTaskInfo));
|
qError("%s failed to set the stream block data", GET_TASKID(pTaskInfo));
|
||||||
} else {
|
} else {
|
||||||
|
|
Loading…
Reference in New Issue