refine query interface
This commit is contained in:
parent
5ebc377965
commit
93f950c740
|
@ -1525,9 +1525,23 @@ typedef struct SMqSetCVgReq {
|
|||
char* sql;
|
||||
char* logicalPlan;
|
||||
char* physicalPlan;
|
||||
SArray* tasks; // SArray<SSubQueryMsg>
|
||||
SSubQueryMsg msg;
|
||||
} SMqSetCVgReq;
|
||||
|
||||
static FORCE_INLINE int32_t tEncodeSSubQueryMsg(void** buf, const SSubQueryMsg* pMsg) {
|
||||
int32_t tlen = sizeof(SSubQueryMsg) + pMsg->contentLen;
|
||||
if (buf == NULL) return tlen;
|
||||
memcpy(*buf, pMsg, tlen);
|
||||
*buf = POINTER_SHIFT(*buf, tlen);
|
||||
return tlen;
|
||||
}
|
||||
|
||||
static FORCE_INLINE void* tDecodeSSubQueryMsg(void* buf, SSubQueryMsg* pMsg) {
|
||||
int32_t tlen = sizeof(SSubQueryMsg) + ((SSubQueryMsg*)buf)->contentLen;
|
||||
memcpy(pMsg, buf, tlen);
|
||||
return POINTER_SHIFT(buf, tlen);
|
||||
}
|
||||
|
||||
static FORCE_INLINE int32_t tEncodeSMqSetCVgReq(void** buf, const SMqSetCVgReq* pReq) {
|
||||
int32_t tlen = 0;
|
||||
tlen += taosEncodeFixedI32(buf, pReq->vgId);
|
||||
|
@ -1537,6 +1551,7 @@ static FORCE_INLINE int32_t tEncodeSMqSetCVgReq(void** buf, const SMqSetCVgReq*
|
|||
tlen += taosEncodeString(buf, pReq->sql);
|
||||
tlen += taosEncodeString(buf, pReq->logicalPlan);
|
||||
tlen += taosEncodeString(buf, pReq->physicalPlan);
|
||||
tlen += tEncodeSSubQueryMsg(buf, &pReq->msg);
|
||||
return tlen;
|
||||
}
|
||||
|
||||
|
@ -1548,7 +1563,7 @@ static FORCE_INLINE void* tDecodeSMqSetCVgReq(void* buf, SMqSetCVgReq* pReq) {
|
|||
buf = taosDecodeString(buf, &pReq->sql);
|
||||
buf = taosDecodeString(buf, &pReq->logicalPlan);
|
||||
buf = taosDecodeString(buf, &pReq->physicalPlan);
|
||||
pReq->tasks = NULL;
|
||||
buf = tDecodeSSubQueryMsg(buf, &pReq->msg);
|
||||
return buf;
|
||||
}
|
||||
|
||||
|
|
|
@ -109,12 +109,71 @@ typedef struct STableMetaOutput {
|
|||
STableMeta *tbMeta;
|
||||
} STableMetaOutput;
|
||||
|
||||
const SSchema* tGetTbnameColumnSchema();
|
||||
typedef struct SDataBuf {
|
||||
void *pData;
|
||||
uint32_t len;
|
||||
} SDataBuf;
|
||||
|
||||
typedef int32_t (*__async_send_cb_fn_t)(void* param, const SDataBuf* pMsg, int32_t code);
|
||||
typedef int32_t (*__async_exec_fn_t)(void* param);
|
||||
|
||||
typedef struct SMsgSendInfo {
|
||||
__async_send_cb_fn_t fp; //async callback function
|
||||
void *param;
|
||||
uint64_t requestId;
|
||||
uint64_t requestObjRefId;
|
||||
int32_t msgType;
|
||||
SDataBuf msgInfo;
|
||||
} SMsgSendInfo;
|
||||
|
||||
typedef struct SQueryNodeAddr {
|
||||
int32_t nodeId; // vgId or qnodeId
|
||||
int8_t inUse;
|
||||
int8_t numOfEps;
|
||||
SEpAddr epAddr[TSDB_MAX_REPLICA];
|
||||
} SQueryNodeAddr;
|
||||
|
||||
static FORCE_INLINE void tConvertQueryAddrToEpSet(SEpSet* pEpSet, const SQueryNodeAddr* pAddr) {
|
||||
pEpSet->inUse = pAddr->inUse;
|
||||
pEpSet->numOfEps = pAddr->numOfEps;
|
||||
for (int j = 0; j < TSDB_MAX_REPLICA; j++) {
|
||||
pEpSet->port[j] = pAddr->epAddr[j].port;
|
||||
memcpy(pEpSet->fqdn[j], pAddr->epAddr[j].fqdn, TSDB_FQDN_LEN);
|
||||
}
|
||||
}
|
||||
|
||||
int32_t initTaskQueue();
|
||||
int32_t cleanupTaskQueue();
|
||||
|
||||
/**
|
||||
*
|
||||
* @param execFn The asynchronously execution function
|
||||
* @param execParam The parameters of the execFn
|
||||
* @param code The response code during execution the execFn
|
||||
* @return
|
||||
*/
|
||||
int32_t taosAsyncExec(__async_exec_fn_t execFn, void* execParam, int32_t* code);
|
||||
|
||||
/**
|
||||
* Asynchronously send message to server, after the response received, the callback will be incured.
|
||||
*
|
||||
* @param pTransporter
|
||||
* @param epSet
|
||||
* @param pTransporterId
|
||||
* @param pInfo
|
||||
* @return
|
||||
*/
|
||||
int32_t asyncSendMsgToServer(void *pTransporter, SEpSet* epSet, int64_t* pTransporterId, const SMsgSendInfo* pInfo);
|
||||
|
||||
void initQueryModuleMsgHandle();
|
||||
|
||||
const SSchema* tGetTbnameColumnSchema();
|
||||
bool tIsValidSchema(struct SSchema* pSchema, int32_t numOfCols, int32_t numOfTags);
|
||||
|
||||
extern int32_t (*queryBuildMsg[TDMT_MAX])(void* input, char **msg, int32_t msgSize, int32_t *msgLen);
|
||||
extern int32_t (*queryProcessMsgRsp[TDMT_MAX])(void* output, char *msg, int32_t msgSize);
|
||||
|
||||
|
||||
#define SET_META_TYPE_NONE(t) (t) = META_TYPE_NON_TABLE
|
||||
#define SET_META_TYPE_CTABLE(t) (t) = META_TYPE_CTABLE
|
||||
#define SET_META_TYPE_TABLE(t) (t) = META_TYPE_TABLE
|
||||
|
|
|
@ -83,56 +83,6 @@ int rpcGetConnInfo(void *thandle, SRpcConnInfo *pInfo);
|
|||
void rpcSendRecv(void *shandle, SEpSet *pEpSet, SRpcMsg *pReq, SRpcMsg *pRsp);
|
||||
int rpcReportProgress(void *pConn, char *pCont, int contLen);
|
||||
void rpcCancelRequest(int64_t rid);
|
||||
|
||||
typedef struct SDataBuf {
|
||||
void *pData;
|
||||
uint32_t len;
|
||||
} SDataBuf;
|
||||
|
||||
typedef int32_t (*__async_send_cb_fn_t)(void* param, const SDataBuf* pMsg, int32_t code);
|
||||
typedef int32_t (*__async_exec_fn_t)(void* param);
|
||||
|
||||
typedef struct SMsgSendInfo {
|
||||
__async_send_cb_fn_t fp; //async callback function
|
||||
void *param;
|
||||
uint64_t requestId;
|
||||
uint64_t requestObjRefId;
|
||||
int32_t msgType;
|
||||
SDataBuf msgInfo;
|
||||
} SMsgSendInfo;
|
||||
|
||||
typedef struct SQueryNodeAddr {
|
||||
int32_t nodeId; // vgId or qnodeId
|
||||
int8_t inUse;
|
||||
int8_t numOfEps;
|
||||
SEpAddr epAddr[TSDB_MAX_REPLICA];
|
||||
} SQueryNodeAddr;
|
||||
|
||||
bool tIsValidSchema(struct SSchema* pSchema, int32_t numOfCols, int32_t numOfTags);
|
||||
|
||||
int32_t initTaskQueue();
|
||||
int32_t cleanupTaskQueue();
|
||||
|
||||
/**
|
||||
*
|
||||
* @param execFn The asynchronously execution function
|
||||
* @param execParam The parameters of the execFn
|
||||
* @param code The response code during execution the execFn
|
||||
* @return
|
||||
*/
|
||||
int32_t taosAsyncExec(__async_exec_fn_t execFn, void* execParam, int32_t* code);
|
||||
|
||||
/**
|
||||
* Asynchronously send message to server, after the response received, the callback will be incured.
|
||||
*
|
||||
* @param pTransporter
|
||||
* @param epSet
|
||||
* @param pTransporterId
|
||||
* @param pInfo
|
||||
* @return
|
||||
*/
|
||||
int32_t asyncSendMsgToServer(void *pTransporter, SEpSet* epSet, int64_t* pTransporterId, const SMsgSendInfo* pInfo);
|
||||
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
#endif
|
||||
|
|
|
@ -327,11 +327,13 @@ typedef struct SMqTopicConsumer {
|
|||
#endif
|
||||
|
||||
typedef struct SMqConsumerEp {
|
||||
int32_t vgId; // -1 for unassigned
|
||||
SEpSet epset;
|
||||
int64_t consumerId; // -1 for unassigned
|
||||
int64_t lastConsumerHbTs;
|
||||
int64_t lastVgHbTs;
|
||||
int32_t vgId; // -1 for unassigned
|
||||
SEpSet epset;
|
||||
int64_t consumerId; // -1 for unassigned
|
||||
int64_t lastConsumerHbTs;
|
||||
int64_t lastVgHbTs;
|
||||
int32_t execLen;
|
||||
SSubQueryMsg qExec;
|
||||
} SMqConsumerEp;
|
||||
|
||||
static FORCE_INLINE int32_t tEncodeSMqConsumerEp(void** buf, SMqConsumerEp* pConsumerEp) {
|
||||
|
@ -339,6 +341,7 @@ static FORCE_INLINE int32_t tEncodeSMqConsumerEp(void** buf, SMqConsumerEp* pCon
|
|||
tlen += taosEncodeFixedI32(buf, pConsumerEp->vgId);
|
||||
tlen += taosEncodeSEpSet(buf, &pConsumerEp->epset);
|
||||
tlen += taosEncodeFixedI64(buf, pConsumerEp->consumerId);
|
||||
tlen += tEncodeSSubQueryMsg(buf, &pConsumerEp->qExec);
|
||||
return tlen;
|
||||
}
|
||||
|
||||
|
@ -346,6 +349,8 @@ static FORCE_INLINE void* tDecodeSMqConsumerEp(void** buf, SMqConsumerEp* pConsu
|
|||
buf = taosDecodeFixedI32(buf, &pConsumerEp->vgId);
|
||||
buf = taosDecodeSEpSet(buf, &pConsumerEp->epset);
|
||||
buf = taosDecodeFixedI64(buf, &pConsumerEp->consumerId);
|
||||
buf = tDecodeSSubQueryMsg(buf, &pConsumerEp->qExec);
|
||||
pConsumerEp->execLen = sizeof(SSubQueryMsg) + pConsumerEp->qExec.contentLen;
|
||||
return buf;
|
||||
}
|
||||
|
||||
|
|
|
@ -105,6 +105,7 @@ static int32_t mndProcessMqTimerMsg(SMnodeMsg *pMsg) {
|
|||
strcpy(req.sql, pTopic->sql);
|
||||
strcpy(req.logicalPlan, pTopic->logicalPlan);
|
||||
strcpy(req.physicalPlan, pTopic->physicalPlan);
|
||||
memcpy(&req.msg, &pCEp->qExec, pCEp->execLen);
|
||||
int32_t tlen = tEncodeSMqSetCVgReq(NULL, &req);
|
||||
void *reqStr = malloc(tlen);
|
||||
if (reqStr == NULL) {
|
||||
|
@ -143,7 +144,21 @@ static int32_t mndProcessMqTimerMsg(SMnodeMsg *pMsg) {
|
|||
static int mndInitUnassignedVg(SMnode *pMnode, SMqTopicObj *pTopic, SArray *unassignedVg) {
|
||||
SMqConsumerEp CEp;
|
||||
CEp.lastConsumerHbTs = CEp.lastVgHbTs = -1;
|
||||
int32_t sz;
|
||||
//convert phyplan to dag
|
||||
SQueryDag *pDag = qStringToDag(pTopic->physicalPlan);
|
||||
SArray *pArray;
|
||||
if (schedulerConvertDagToTaskList(pDag, &pArray) < 0) {
|
||||
|
||||
}
|
||||
int32_t sz = taosArrayGetSize(pArray);
|
||||
//convert dag to msg
|
||||
for (int32_t i = 0; i < sz; i++) {
|
||||
STaskInfo* pTaskInfo = taosArrayGet(pArray, i);
|
||||
int32_t vgId = pTaskInfo->addr.nodeId;
|
||||
SEpSet epSet;
|
||||
tConvertQueryAddrToEpSet(&epSet, &pTaskInfo->addr);
|
||||
}
|
||||
/*pTopic->physicalPlan;*/
|
||||
SVgObj *pVgroup = NULL;
|
||||
SSdb *pSdb = pMnode->pSdb;
|
||||
void *pIter = sdbFetch(pSdb, SDB_VGROUP, NULL, (void **)&pVgroup);
|
||||
|
@ -156,6 +171,7 @@ static int mndInitUnassignedVg(SMnode *pMnode, SMqTopicObj *pTopic, SArray *unas
|
|||
pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void **)&pVgroup);
|
||||
}
|
||||
return 0;
|
||||
qDestroyQueryDag(pDag);
|
||||
}
|
||||
|
||||
static int mndBuildMqSetConsumerVgReq(SMnode *pMnode, STrans *pTrans, SMqConsumerObj *pConsumer,
|
||||
|
|
|
@ -681,7 +681,6 @@ int32_t tqProcessSetConnReq(STQ* pTq, SMqSetCVgReq* pReq) {
|
|||
if (schedulerConvertDagToTaskList(pDag, &pArray) < 0) {
|
||||
// TODO: handle error
|
||||
}
|
||||
ASSERT(taosArrayGetSize(pArray) == 0);
|
||||
STaskInfo *pInfo = taosArrayGet(pArray, 0);
|
||||
SArray* pTasks;
|
||||
schedulerCopyTask(pInfo, &pTasks, TQ_BUFFER_SIZE);
|
||||
|
@ -733,6 +732,7 @@ int tqRetrieveDataBlockInfo(STqReadHandle* pHandle, SDataBlockInfo* pBlockInfo)
|
|||
// TODO: filter out unused column
|
||||
return 0;
|
||||
}
|
||||
|
||||
SArray* tqRetrieveDataBlock(STqReadHandle* pHandle) {
|
||||
int32_t sversion = pHandle->pBlock->sversion;
|
||||
SSchemaWrapper* pSchemaWrapper = metaGetTableSchema(pHandle->pMeta, pHandle->pBlock->uid, sversion, true);
|
||||
|
@ -762,7 +762,3 @@ SArray* tqRetrieveDataBlock(STqReadHandle* pHandle) {
|
|||
taosArrayPush(pArray, &colInfo);
|
||||
return pArray;
|
||||
}
|
||||
/*int tqLoadDataBlock(SExecTaskInfo* pTaskInfo, SSubmitBlkScanInfo* pSubmitBlkScanInfo, SSDataBlock* pBlock, uint32_t
|
||||
* status) {*/
|
||||
/*return 0;*/
|
||||
/*}*/
|
||||
|
|
Loading…
Reference in New Issue