Merge branch '3.0' into fix/TD-16561
This commit is contained in:
commit
578dd9b75b
|
@ -88,9 +88,9 @@ int32_t create_stream() {
|
|||
/*const char* sql = "select min(k), max(k), sum(k) as sum_of_k from st1";*/
|
||||
/*const char* sql = "select sum(k) from tu1 interval(10m)";*/
|
||||
/*pRes = tmq_create_stream(pConn, "stream1", "out1", sql);*/
|
||||
pRes = taos_query(pConn,
|
||||
"create stream stream1 trigger at_once into abc2.outstb as select _wstartts, sum(k) from st1 "
|
||||
"partition by tbname interval(10m) ");
|
||||
pRes = taos_query(
|
||||
pConn,
|
||||
"create stream stream1 trigger at_once into abc1.outstb as select _wstartts, sum(k) from st1 interval(10m) ");
|
||||
if (taos_errno(pRes) != 0) {
|
||||
printf("failed to create stream stream1, reason:%s\n", taos_errstr(pRes));
|
||||
return -1;
|
||||
|
@ -107,11 +107,4 @@ int main(int argc, char* argv[]) {
|
|||
code = init_env();
|
||||
}
|
||||
create_stream();
|
||||
#if 0
|
||||
tmq_t* tmq = build_consumer();
|
||||
tmq_list_t* topic_list = build_topic_list();
|
||||
/*perf_loop(tmq, topic_list);*/
|
||||
/*basic_consume_loop(tmq, topic_list);*/
|
||||
sync_consume_loop(tmq, topic_list);
|
||||
#endif
|
||||
}
|
||||
|
|
|
@ -34,7 +34,6 @@ typedef enum {
|
|||
WRITE_QUEUE,
|
||||
APPLY_QUEUE,
|
||||
SYNC_QUEUE,
|
||||
MERGE_QUEUE,
|
||||
QUEUE_MAX,
|
||||
} EQueueType;
|
||||
|
||||
|
|
|
@ -16,8 +16,8 @@
|
|||
#ifndef _TD_SNODE_H_
|
||||
#define _TD_SNODE_H_
|
||||
|
||||
#include "tmsgcb.h"
|
||||
#include "tmsg.h"
|
||||
#include "tmsgcb.h"
|
||||
#include "trpc.h"
|
||||
|
||||
#ifdef __cplusplus
|
||||
|
@ -68,8 +68,8 @@ int32_t sndGetLoad(SSnode *pSnode, SSnodeLoad *pLoad);
|
|||
* @param pMsg The request message
|
||||
* @param pRsp The response message
|
||||
*/
|
||||
void sndProcessUMsg(SSnode *pSnode, SRpcMsg *pMsg);
|
||||
void sndProcessSMsg(SSnode *pSnode, SRpcMsg *pMsg);
|
||||
int32_t sndProcessUMsg(SSnode *pSnode, SRpcMsg *pMsg);
|
||||
int32_t sndProcessSMsg(SSnode *pSnode, SRpcMsg *pMsg);
|
||||
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
|
|
|
@ -36,7 +36,6 @@ typedef struct SPlanContext {
|
|||
int64_t watermark;
|
||||
char* pMsg;
|
||||
int32_t msgLen;
|
||||
// double filesFactor;
|
||||
} SPlanContext;
|
||||
|
||||
// Create the physical plan for the query, according to the AST.
|
||||
|
|
|
@ -152,7 +152,7 @@ void* streamDataBlockDecode(const void* buf, SStreamDataBlock* pInput);
|
|||
typedef struct {
|
||||
char* qmsg;
|
||||
// followings are not applicable to encoder and decoder
|
||||
void* inputHandle;
|
||||
// void* inputHandle;
|
||||
void* executor;
|
||||
} STaskExec;
|
||||
|
||||
|
@ -240,12 +240,13 @@ struct SStreamTask {
|
|||
int8_t inputType;
|
||||
int8_t status;
|
||||
|
||||
int8_t sourceType;
|
||||
int8_t execType;
|
||||
int8_t sinkType;
|
||||
int8_t dispatchType;
|
||||
int16_t dispatchMsgType;
|
||||
|
||||
int8_t dataScan;
|
||||
|
||||
// node info
|
||||
int32_t childId;
|
||||
int32_t nodeId;
|
||||
|
|
|
@ -95,9 +95,12 @@ SArray *smGetMsgHandles() {
|
|||
if (dmSetMgmtHandle(pArray, TDMT_MON_SM_INFO, smPutNodeMsgToMonitorQueue, 0) == NULL) goto _OVER;
|
||||
|
||||
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_DEPLOY, smPutNodeMsgToMgmtQueue, 1) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_TASK_DROP, smPutNodeMsgToMgmtQueue, 1) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_RUN, smPutNodeMsgToMgmtQueue, 1) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_DISPATCH, smPutNodeMsgToMgmtQueue, 1) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_DISPATCH_RSP, smPutNodeMsgToMgmtQueue, 1) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_RECOVER, smPutNodeMsgToMgmtQueue, 1) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_RECOVER_RSP, smPutNodeMsgToMgmtQueue, 1) == NULL) goto _OVER;
|
||||
|
||||
code = 0;
|
||||
_OVER:
|
||||
|
|
|
@ -55,7 +55,9 @@ static void smProcessUniqueQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t num
|
|||
taosGetQitem(qall, (void **)&pMsg);
|
||||
|
||||
dTrace("msg:%p, get from snode-unique queue", pMsg);
|
||||
sndProcessUMsg(pMgmt->pSnode, pMsg);
|
||||
if (sndProcessUMsg(pMgmt->pSnode, pMsg) < 0) {
|
||||
ASSERT(0);
|
||||
}
|
||||
|
||||
dTrace("msg:%p, is freed", pMsg);
|
||||
rpcFreeCont(pMsg->pCont);
|
||||
|
@ -67,7 +69,9 @@ static void smProcessSharedQueue(SQueueInfo *pInfo, SRpcMsg *pMsg) {
|
|||
SSnodeMgmt *pMgmt = pInfo->ahandle;
|
||||
|
||||
dTrace("msg:%p, get from snode-shared queue", pMsg);
|
||||
sndProcessSMsg(pMgmt->pSnode, pMsg);
|
||||
if (sndProcessSMsg(pMgmt->pSnode, pMsg) < 0) {
|
||||
ASSERT(0);
|
||||
}
|
||||
|
||||
dTrace("msg:%p, is freed", pMsg);
|
||||
rpcFreeCont(pMsg->pCont);
|
||||
|
|
|
@ -170,10 +170,6 @@ static int32_t vmPutMsgToQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg, EQueueType qtyp
|
|||
dTrace("vgId:%d, msg:%p put into vnode-sync queue", pVnode->vgId, pMsg);
|
||||
taosWriteQitem(pVnode->pSyncQ, pMsg);
|
||||
break;
|
||||
case MERGE_QUEUE:
|
||||
dTrace("vgId:%d, msg:%p put into vnode-merge queue", pVnode->vgId, pMsg);
|
||||
taosWriteQitem(pVnode->pMergeQ, pMsg);
|
||||
break;
|
||||
case APPLY_QUEUE:
|
||||
dTrace("vgId:%d, msg:%p put into vnode-apply queue", pVnode->vgId, pMsg);
|
||||
taosWriteQitem(pVnode->pApplyQ, pMsg);
|
||||
|
@ -196,8 +192,6 @@ int32_t vmPutMsgToQueryQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) { return vmPutMsg
|
|||
|
||||
int32_t vmPutMsgToFetchQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) { return vmPutMsgToQueue(pMgmt, pMsg, FETCH_QUEUE); }
|
||||
|
||||
int32_t vmPutMsgToMergeQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) { return vmPutMsgToQueue(pMgmt, pMsg, MERGE_QUEUE); }
|
||||
|
||||
int32_t vmPutMsgToMgmtQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
|
||||
dTrace("msg:%p, put into vnode-mgmt queue", pMsg);
|
||||
taosWriteQitem(pMgmt->mgmtWorker.queue, pMsg);
|
||||
|
@ -243,9 +237,6 @@ int32_t vmGetQueueSize(SVnodeMgmt *pMgmt, int32_t vgId, EQueueType qtype) {
|
|||
case FETCH_QUEUE:
|
||||
size = taosQueueItemSize(pVnode->pFetchQ);
|
||||
break;
|
||||
case MERGE_QUEUE:
|
||||
size = taosQueueItemSize(pVnode->pMergeQ);
|
||||
break;
|
||||
default:
|
||||
break;
|
||||
}
|
||||
|
|
|
@ -63,9 +63,8 @@ int32_t mndConvertRsmaTask(char** pDst, int32_t* pDstLen, const char* ast, int64
|
|||
.topicQuery = false,
|
||||
.streamQuery = true,
|
||||
.rSmaQuery = true,
|
||||
.triggerType = STREAM_TRIGGER_AT_ONCE,
|
||||
.triggerType = triggerType,
|
||||
.watermark = watermark,
|
||||
/*.filesFactor = filesFactor,*/
|
||||
};
|
||||
|
||||
if (qCreateQueryPlan(&cxt, &pPlan, NULL) < 0) {
|
||||
|
@ -270,7 +269,6 @@ int32_t mndAddShuffleSinkTasksToStream(SMnode* pMnode, STrans* pTrans, SStreamOb
|
|||
pTask->epSet = mndGetVgroupEpset(pMnode, pVgroup);
|
||||
|
||||
// source
|
||||
pTask->sourceType = TASK_SOURCE__MERGE;
|
||||
pTask->inputType = TASK_INPUT_TYPE__DATA_BLOCK;
|
||||
|
||||
// exec
|
||||
|
@ -316,7 +314,6 @@ int32_t mndAddFixedSinkTaskToStream(SMnode* pMnode, STrans* pTrans, SStreamObj*
|
|||
#endif
|
||||
pTask->epSet = mndGetVgroupEpset(pMnode, &pStream->fixedSinkVg);
|
||||
// source
|
||||
pTask->sourceType = TASK_SOURCE__MERGE;
|
||||
pTask->inputType = TASK_INPUT_TYPE__DATA_BLOCK;
|
||||
|
||||
// exec
|
||||
|
@ -427,6 +424,8 @@ int32_t mndScheduleStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream) {
|
|||
SStreamTask* pTask = tNewSStreamTask(pStream->uid);
|
||||
mndAddTaskToTaskSet(taskSourceLevel, pTask);
|
||||
|
||||
pTask->dataScan = 1;
|
||||
|
||||
// input
|
||||
pTask->inputType = TASK_INPUT_TYPE__SUMBIT_BLOCK;
|
||||
|
||||
|
@ -470,6 +469,8 @@ int32_t mndScheduleStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream) {
|
|||
SStreamTask* pTask = tNewSStreamTask(pStream->uid);
|
||||
mndAddTaskToTaskSet(taskOneLevel, pTask);
|
||||
|
||||
pTask->dataScan = 1;
|
||||
|
||||
// input
|
||||
pTask->inputType = TASK_INPUT_TYPE__SUMBIT_BLOCK;
|
||||
|
||||
|
|
|
@ -56,7 +56,6 @@ SStreamTask* sndMetaGetTask(SStreamMeta* pMeta, int32_t taskId);
|
|||
int32_t sndMetaRemoveTask(SStreamMeta* pMeta, int32_t taskId);
|
||||
|
||||
int32_t sndDropTaskOfStream(SStreamMeta* pMeta, int64_t streamId);
|
||||
|
||||
int32_t sndStopTaskOfStream(SStreamMeta* pMeta, int64_t streamId);
|
||||
int32_t sndResumeTaskOfStream(SStreamMeta* pMeta, int64_t streamId);
|
||||
|
||||
|
|
|
@ -76,45 +76,158 @@ int32_t sndMetaRemoveTask(SStreamMeta *pMeta, int32_t taskId) {
|
|||
return taosHashRemove(pMeta->pHash, &taskId, sizeof(int32_t));
|
||||
}
|
||||
|
||||
static int32_t sndProcessTaskExecReq(SSnode *pSnode, SRpcMsg *pMsg) {
|
||||
/*SStreamExecMsgHead *pHead = pMsg->pCont;*/
|
||||
/*int32_t taskId = pHead->streamTaskId;*/
|
||||
/*SStreamTask *pTask = sndMetaGetTask(pSnode->pMeta, taskId);*/
|
||||
/*if (pTask == NULL) {*/
|
||||
/*return -1;*/
|
||||
/*}*/
|
||||
static int32_t sndProcessTaskDeployReq(SSnode *pNode, SRpcMsg *pMsg) {
|
||||
SStreamMeta *pMeta = pNode->pMeta;
|
||||
char *msg = pMsg->pCont;
|
||||
int32_t msgLen = pMsg->contLen;
|
||||
|
||||
SStreamTask *pTask = taosMemoryCalloc(1, sizeof(SStreamTask));
|
||||
if (pTask == NULL) {
|
||||
return -1;
|
||||
}
|
||||
SDecoder decoder;
|
||||
tDecoderInit(&decoder, (uint8_t *)msg, msgLen);
|
||||
if (tDecodeSStreamTask(&decoder, pTask) < 0) {
|
||||
ASSERT(0);
|
||||
}
|
||||
tDecoderClear(&decoder);
|
||||
|
||||
pTask->status = TASK_STATUS__IDLE;
|
||||
|
||||
pTask->inputQueue = streamQueueOpen();
|
||||
pTask->outputQueue = streamQueueOpen();
|
||||
pTask->inputStatus = TASK_INPUT_STATUS__NORMAL;
|
||||
pTask->outputStatus = TASK_INPUT_STATUS__NORMAL;
|
||||
|
||||
if (pTask->inputQueue == NULL || pTask->outputQueue == NULL) goto FAIL;
|
||||
|
||||
pTask->pMsgCb = &pNode->msgCb;
|
||||
|
||||
ASSERT(pTask->execType != TASK_EXEC__NONE);
|
||||
|
||||
SReadHandle handle = {
|
||||
.pMsgCb = &pNode->msgCb,
|
||||
};
|
||||
|
||||
/*pTask->exec.inputHandle = NULL;*/
|
||||
pTask->exec.executor = qCreateStreamExecTaskInfo(pTask->exec.qmsg, &handle);
|
||||
ASSERT(pTask->exec.executor);
|
||||
|
||||
streamSetupTrigger(pTask);
|
||||
|
||||
qInfo("deploy stream: stream id %ld task id %d child id %d on snode", pTask->streamId, pTask->taskId, pTask->childId);
|
||||
|
||||
return 0;
|
||||
|
||||
FAIL:
|
||||
if (pTask->inputQueue) streamQueueClose(pTask->inputQueue);
|
||||
if (pTask->outputQueue) streamQueueClose(pTask->outputQueue);
|
||||
return -1;
|
||||
}
|
||||
|
||||
static int32_t sndProcessTaskRunReq(SSnode *pNode, SRpcMsg *pMsg) {
|
||||
SStreamMeta *pMeta = pNode->pMeta;
|
||||
SStreamTaskRunReq *pReq = pMsg->pCont;
|
||||
int32_t taskId = pReq->taskId;
|
||||
SStreamTask *pTask = *(SStreamTask **)taosHashGet(pMeta->pHash, &taskId, sizeof(int32_t));
|
||||
streamTaskProcessRunReq(pTask, &pNode->msgCb);
|
||||
return 0;
|
||||
}
|
||||
|
||||
void sndProcessUMsg(SSnode *pSnode, SRpcMsg *pMsg) {
|
||||
static int32_t sndProcessTaskDispatchReq(SSnode *pNode, SRpcMsg *pMsg) {
|
||||
SStreamMeta *pMeta = pNode->pMeta;
|
||||
|
||||
char *msgStr = pMsg->pCont;
|
||||
char *msgBody = POINTER_SHIFT(msgStr, sizeof(SMsgHead));
|
||||
int32_t msgLen = pMsg->contLen - sizeof(SMsgHead);
|
||||
|
||||
SStreamDispatchReq req;
|
||||
SDecoder decoder;
|
||||
tDecoderInit(&decoder, msgBody, msgLen);
|
||||
tDecodeStreamDispatchReq(&decoder, &req);
|
||||
int32_t taskId = req.taskId;
|
||||
SStreamTask *pTask = *(SStreamTask **)taosHashGet(pMeta->pHash, &taskId, sizeof(int32_t));
|
||||
SRpcMsg rsp = {
|
||||
.info = pMsg->info,
|
||||
.code = 0,
|
||||
};
|
||||
streamProcessDispatchReq(pTask, &pNode->msgCb, &req, &rsp);
|
||||
return 0;
|
||||
}
|
||||
|
||||
static int32_t sndProcessTaskRecoverReq(SSnode *pNode, SRpcMsg *pMsg) {
|
||||
SStreamMeta *pMeta = pNode->pMeta;
|
||||
|
||||
SStreamTaskRecoverReq *pReq = pMsg->pCont;
|
||||
int32_t taskId = pReq->taskId;
|
||||
SStreamTask *pTask = *(SStreamTask **)taosHashGet(pMeta->pHash, &taskId, sizeof(int32_t));
|
||||
streamProcessRecoverReq(pTask, &pNode->msgCb, pReq, pMsg);
|
||||
return 0;
|
||||
}
|
||||
|
||||
static int32_t sndProcessTaskDispatchRsp(SSnode *pNode, SRpcMsg *pMsg) {
|
||||
SStreamMeta *pMeta = pNode->pMeta;
|
||||
|
||||
SStreamDispatchRsp *pRsp = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
|
||||
int32_t taskId = pRsp->taskId;
|
||||
SStreamTask *pTask = *(SStreamTask **)taosHashGet(pMeta->pHash, &taskId, sizeof(int32_t));
|
||||
streamProcessDispatchRsp(pTask, &pNode->msgCb, pRsp);
|
||||
return 0;
|
||||
}
|
||||
|
||||
static int32_t sndProcessTaskRecoverRsp(SSnode *pNode, SRpcMsg *pMsg) {
|
||||
SStreamMeta *pMeta = pNode->pMeta;
|
||||
|
||||
SStreamTaskRecoverRsp *pRsp = pMsg->pCont;
|
||||
int32_t taskId = pRsp->taskId;
|
||||
SStreamTask *pTask = *(SStreamTask **)taosHashGet(pMeta->pHash, &taskId, sizeof(int32_t));
|
||||
streamProcessRecoverRsp(pTask, pRsp);
|
||||
return 0;
|
||||
}
|
||||
|
||||
static int32_t sndProcessTaskDropReq(SSnode *pNode, SRpcMsg *pMsg) {
|
||||
SStreamMeta *pMeta = pNode->pMeta;
|
||||
|
||||
char *msg = pMsg->pCont;
|
||||
int32_t msgLen = pMsg->contLen;
|
||||
SVDropStreamTaskReq *pReq = (SVDropStreamTaskReq *)msg;
|
||||
int32_t code = taosHashRemove(pMeta->pHash, &pReq->taskId, sizeof(int32_t));
|
||||
ASSERT(code == 0);
|
||||
if (code == 0) {
|
||||
// sendrsp
|
||||
}
|
||||
return code;
|
||||
}
|
||||
|
||||
int32_t sndProcessUMsg(SSnode *pSnode, SRpcMsg *pMsg) {
|
||||
// stream deploy
|
||||
// stream stop/resume
|
||||
// operator exec
|
||||
if (pMsg->msgType == TDMT_STREAM_TASK_DEPLOY) {
|
||||
void *msg = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
|
||||
SStreamTask *pTask = taosMemoryMalloc(sizeof(SStreamTask));
|
||||
if (pTask == NULL) {
|
||||
switch (pMsg->msgType) {
|
||||
case TDMT_STREAM_TASK_DEPLOY:
|
||||
return sndProcessTaskDeployReq(pSnode, pMsg);
|
||||
case TDMT_VND_STREAM_TASK_DROP:
|
||||
return sndProcessTaskDropReq(pSnode, pMsg);
|
||||
default:
|
||||
ASSERT(0);
|
||||
return;
|
||||
}
|
||||
SDecoder decoder;
|
||||
tDecoderInit(&decoder, msg, pMsg->contLen - sizeof(SMsgHead));
|
||||
tDecodeSStreamTask(&decoder, pTask);
|
||||
tDecoderClear(&decoder);
|
||||
|
||||
sndMetaDeployTask(pSnode->pMeta, pTask);
|
||||
/*} else if (pMsg->msgType == TDMT_SND_TASK_EXEC) {*/
|
||||
/*sndProcessTaskExecReq(pSnode, pMsg);*/
|
||||
} else {
|
||||
ASSERT(0);
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
void sndProcessSMsg(SSnode *pSnode, SRpcMsg *pMsg) {
|
||||
// operator exec
|
||||
/*if (pMsg->msgType == TDMT_SND_TASK_EXEC) {*/
|
||||
/*sndProcessTaskExecReq(pSnode, pMsg);*/
|
||||
/*} else {*/
|
||||
ASSERT(0);
|
||||
/*}*/
|
||||
int32_t sndProcessSMsg(SSnode *pSnode, SRpcMsg *pMsg) {
|
||||
switch (pMsg->msgType) {
|
||||
case TDMT_STREAM_TASK_RUN:
|
||||
return sndProcessTaskRunReq(pSnode, pMsg);
|
||||
case TDMT_STREAM_TASK_DISPATCH:
|
||||
return sndProcessTaskDispatchReq(pSnode, pMsg);
|
||||
case TDMT_STREAM_TASK_RECOVER:
|
||||
return sndProcessTaskRecoverReq(pSnode, pMsg);
|
||||
case TDMT_STREAM_TASK_DISPATCH_RSP:
|
||||
return sndProcessTaskDispatchRsp(pSnode, pMsg);
|
||||
case TDMT_STREAM_TASK_RECOVER_RSP:
|
||||
return sndProcessTaskRecoverRsp(pSnode, pMsg);
|
||||
default:
|
||||
ASSERT(0);
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
|
|
@ -116,7 +116,7 @@ typedef void *tsdbReaderT;
|
|||
#define BLOCK_LOAD_TABLE_SEQ_ORDER 2
|
||||
#define BLOCK_LOAD_TABLE_RR_ORDER 3
|
||||
|
||||
tsdbReaderT *tsdbReaderOpen(SVnode *pVnode, SQueryTableDataCond *pCond, STableListInfo *tableInfoGroup, uint64_t qId,
|
||||
tsdbReaderT tsdbReaderOpen(SVnode *pVnode, SQueryTableDataCond *pCond, STableListInfo *tableInfoGroup, uint64_t qId,
|
||||
uint64_t taskId);
|
||||
tsdbReaderT tsdbQueryCacheLast(SVnode *pVnode, SQueryTableDataCond *pCond, STableListInfo *groupList, uint64_t qId,
|
||||
void *pMemRef);
|
||||
|
|
|
@ -121,7 +121,7 @@ int tsdbInsertData(STsdb* pTsdb, int64_t version, SSubmitReq* pMsg, SSu
|
|||
int32_t tsdbInsertTableData(STsdb* pTsdb, int64_t version, SSubmitMsgIter* pMsgIter, SSubmitBlk* pBlock,
|
||||
SSubmitBlkRsp* pRsp);
|
||||
int32_t tsdbDeleteTableData(STsdb* pTsdb, int64_t version, tb_uid_t suid, tb_uid_t uid, TSKEY sKey, TSKEY eKey);
|
||||
tsdbReaderT* tsdbReaderOpen(SVnode* pVnode, SQueryTableDataCond* pCond, STableListInfo* tableList, uint64_t qId,
|
||||
tsdbReaderT tsdbReaderOpen(SVnode* pVnode, SQueryTableDataCond* pCond, STableListInfo* tableList, uint64_t qId,
|
||||
uint64_t taskId);
|
||||
tsdbReaderT tsdbQueryCacheLastT(STsdb* tsdb, SQueryTableDataCond* pCond, STableListInfo* tableList, uint64_t qId,
|
||||
void* pMemRef);
|
||||
|
|
|
@ -125,10 +125,10 @@ int32_t tqProcessOffsetCommitReq(STQ* pTq, char* msg, int32_t msgLen) {
|
|||
|
||||
if (offset.type == TMQ_OFFSET__SNAPSHOT) {
|
||||
tqDebug("receive offset commit msg to %s on vg %d, offset(type:snapshot) uid: %ld, ts: %ld", offset.subKey,
|
||||
pTq->pVnode->config.vgId, offset.uid, offset.ts);
|
||||
TD_VID(pTq->pVnode), offset.uid, offset.ts);
|
||||
} else if (offset.type == TMQ_OFFSET__LOG) {
|
||||
tqDebug("receive offset commit msg to %s on vg %d, offset(type:log) version: %ld", offset.subKey,
|
||||
pTq->pVnode->config.vgId, offset.version);
|
||||
TD_VID(pTq->pVnode), offset.version);
|
||||
} else {
|
||||
ASSERT(0);
|
||||
}
|
||||
|
@ -159,7 +159,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) {
|
|||
if (pOffset != NULL) {
|
||||
ASSERT(pOffset->type == TMQ_OFFSET__LOG);
|
||||
tqDebug("consumer %ld, restore offset of %s on vg %d, offset(type:log) version: %ld", consumerId, pReq->subKey,
|
||||
pTq->pVnode->config.vgId, pOffset->version);
|
||||
TD_VID(pTq->pVnode), pOffset->version);
|
||||
fetchOffset = pOffset->version + 1;
|
||||
} else {
|
||||
if (pReq->currentOffset == TMQ_CONF__RESET_OFFSET__EARLIEAST) {
|
||||
|
@ -167,13 +167,13 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) {
|
|||
} else if (pReq->currentOffset == TMQ_CONF__RESET_OFFSET__LATEST) {
|
||||
fetchOffset = walGetCommittedVer(pTq->pWal);
|
||||
} else if (pReq->currentOffset == TMQ_CONF__RESET_OFFSET__NONE) {
|
||||
tqError("tmq poll: no offset committed for consumer %ld in vg %d, subkey %s", consumerId,
|
||||
pTq->pVnode->config.vgId, pReq->subKey);
|
||||
tqError("tmq poll: no offset committed for consumer %ld in vg %d, subkey %s", consumerId, TD_VID(pTq->pVnode),
|
||||
pReq->subKey);
|
||||
terrno = TSDB_CODE_TQ_NO_COMMITTED_OFFSET;
|
||||
return -1;
|
||||
}
|
||||
tqDebug("consumer %ld, restore offset of %s on vg %d failed, config is %ld, set to %ld", consumerId, pReq->subKey,
|
||||
pTq->pVnode->config.vgId, pReq->currentOffset, fetchOffset);
|
||||
TD_VID(pTq->pVnode), pReq->currentOffset, fetchOffset);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -183,14 +183,14 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) {
|
|||
STqHandle* pHandle = taosHashGet(pTq->handles, pReq->subKey, strlen(pReq->subKey));
|
||||
/*ASSERT(pHandle);*/
|
||||
if (pHandle == NULL) {
|
||||
tqError("tmq poll: no consumer handle for consumer %ld in vg %d, subkey %s", consumerId, pTq->pVnode->config.vgId,
|
||||
tqError("tmq poll: no consumer handle for consumer %ld in vg %d, subkey %s", consumerId, TD_VID(pTq->pVnode),
|
||||
pReq->subKey);
|
||||
return -1;
|
||||
}
|
||||
|
||||
if (pHandle->consumerId != consumerId) {
|
||||
tqError("tmq poll: consumer handle mismatch for consumer %ld in vg %d, subkey %s, handle consumer id %ld",
|
||||
consumerId, pTq->pVnode->config.vgId, pReq->subKey, pHandle->consumerId);
|
||||
consumerId, TD_VID(pTq->pVnode), pReq->subKey, pHandle->consumerId);
|
||||
return -1;
|
||||
}
|
||||
|
||||
|
@ -304,7 +304,6 @@ int32_t tqProcessVgDeleteReq(STQ* pTq, char* msg, int32_t msgLen) {
|
|||
return 0;
|
||||
}
|
||||
|
||||
// TODO: persist meta into tdb
|
||||
int32_t tqProcessVgChangeReq(STQ* pTq, char* msg, int32_t msgLen) {
|
||||
SMqRebVgReq req = {0};
|
||||
tDecodeSMqRebVgReq(msg, &req);
|
||||
|
@ -346,10 +345,10 @@ int32_t tqProcessVgChangeReq(STQ* pTq, char* msg, int32_t msgLen) {
|
|||
pHandle->execHandle.execTb.suid = req.suid;
|
||||
SArray* tbUidList = taosArrayInit(0, sizeof(int64_t));
|
||||
tsdbGetCtbIdList(pTq->pVnode->pMeta, req.suid, tbUidList);
|
||||
tqDebug("vg %d, tq try get suid: %ld", pTq->pVnode->config.vgId, req.suid);
|
||||
tqDebug("vg %d, tq try get suid: %ld", TD_VID(pTq->pVnode), req.suid);
|
||||
for (int32_t i = 0; i < taosArrayGetSize(tbUidList); i++) {
|
||||
int64_t tbUid = *(int64_t*)taosArrayGet(tbUidList, i);
|
||||
tqDebug("vg %d, idx %d, uid: %ld", pTq->pVnode->config.vgId, i, tbUid);
|
||||
tqDebug("vg %d, idx %d, uid: %ld", TD_VID(pTq->pVnode), i, tbUid);
|
||||
}
|
||||
for (int32_t i = 0; i < 5; i++) {
|
||||
tqReadHandleSetTbUidList(pHandle->execHandle.pExecReader[i], tbUidList);
|
||||
|
@ -400,16 +399,21 @@ int32_t tqProcessTaskDeploy(STQ* pTq, char* msg, int32_t msgLen) {
|
|||
// exec
|
||||
if (pTask->execType != TASK_EXEC__NONE) {
|
||||
// expand runners
|
||||
STqReadHandle* pStreamReader = tqInitSubmitMsgScanner(pTq->pVnode->pMeta);
|
||||
SReadHandle handle = {
|
||||
.reader = pStreamReader,
|
||||
.meta = pTq->pVnode->pMeta,
|
||||
.pMsgCb = &pTq->pVnode->msgCb,
|
||||
.vnode = pTq->pVnode,
|
||||
};
|
||||
pTask->exec.inputHandle = pStreamReader;
|
||||
pTask->exec.executor = qCreateStreamExecTaskInfo(pTask->exec.qmsg, &handle);
|
||||
ASSERT(pTask->exec.executor);
|
||||
if (pTask->dataScan) {
|
||||
STqReadHandle* pStreamReader = tqInitSubmitMsgScanner(pTq->pVnode->pMeta);
|
||||
SReadHandle handle = {
|
||||
.reader = pStreamReader,
|
||||
.meta = pTq->pVnode->pMeta,
|
||||
.pMsgCb = &pTq->pVnode->msgCb,
|
||||
.vnode = pTq->pVnode,
|
||||
};
|
||||
/*pTask->exec.inputHandle = pStreamReader;*/
|
||||
pTask->exec.executor = qCreateStreamExecTaskInfo(pTask->exec.qmsg, &handle);
|
||||
ASSERT(pTask->exec.executor);
|
||||
} else {
|
||||
pTask->exec.executor = qCreateStreamExecTaskInfo(pTask->exec.qmsg, NULL);
|
||||
ASSERT(pTask->exec.executor);
|
||||
}
|
||||
}
|
||||
|
||||
// sink
|
||||
|
@ -431,7 +435,7 @@ int32_t tqProcessTaskDeploy(STQ* pTq, char* msg, int32_t msgLen) {
|
|||
|
||||
streamSetupTrigger(pTask);
|
||||
|
||||
tqInfo("deploy stream task id %d child id %d on vg %d", pTask->taskId, pTask->childId, pTq->pVnode->config.vgId);
|
||||
tqInfo("deploy stream task id %d child id %d on vg %d", pTask->taskId, pTask->childId, TD_VID(pTq->pVnode));
|
||||
|
||||
taosHashPut(pTq->pStreamTasks, &pTask->taskId, sizeof(int32_t), &pTask, sizeof(void*));
|
||||
|
||||
|
@ -464,7 +468,7 @@ int32_t tqProcessStreamTrigger(STQ* pTq, SSubmitReq* pReq) {
|
|||
continue;
|
||||
}
|
||||
|
||||
if (streamLaunchByWrite(pTask, pTq->pVnode->config.vgId, &pTq->pVnode->msgCb) < 0) {
|
||||
if (streamLaunchByWrite(pTask, TD_VID(pTq->pVnode), &pTq->pVnode->msgCb) < 0) {
|
||||
continue;
|
||||
}
|
||||
} else {
|
||||
|
@ -534,9 +538,9 @@ int32_t tqProcessTaskRecoverRsp(STQ* pTq, SRpcMsg* pMsg) {
|
|||
int32_t tqProcessTaskDropReq(STQ* pTq, char* msg, int32_t msgLen) {
|
||||
SVDropStreamTaskReq* pReq = (SVDropStreamTaskReq*)msg;
|
||||
int32_t code = taosHashRemove(pTq->pStreamTasks, &pReq->taskId, sizeof(int32_t));
|
||||
ASSERT(code == 0);
|
||||
if (code == 0) {
|
||||
// sendrsp
|
||||
}
|
||||
ASSERT(code == 0);
|
||||
return code;
|
||||
}
|
||||
|
|
|
@ -500,7 +500,7 @@ static int32_t setCurrentSchema(SVnode* pVnode, STsdbReadHandle* pTsdbReadHandle
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
tsdbReaderT* tsdbReaderOpen(SVnode* pVnode, SQueryTableDataCond* pCond, STableListInfo* tableList, uint64_t qId,
|
||||
tsdbReaderT tsdbReaderOpen(SVnode* pVnode, SQueryTableDataCond* pCond, STableListInfo* tableList, uint64_t qId,
|
||||
uint64_t taskId) {
|
||||
STsdbReadHandle* pTsdbReadHandle = tsdbQueryTablesImpl(pVnode, pCond, qId, taskId);
|
||||
if (pTsdbReadHandle == NULL) {
|
||||
|
@ -508,7 +508,7 @@ tsdbReaderT* tsdbReaderOpen(SVnode* pVnode, SQueryTableDataCond* pCond, STableLi
|
|||
}
|
||||
|
||||
if (emptyQueryTimewindow(pTsdbReadHandle)) {
|
||||
return (tsdbReaderT*)pTsdbReadHandle;
|
||||
return (tsdbReaderT)pTsdbReadHandle;
|
||||
}
|
||||
|
||||
// todo apply the lastkey of table check to avoid to load header file
|
||||
|
|
|
@ -412,6 +412,7 @@ int32_t qExplainResNodeToRowsImpl(SExplainResNode *pResNode, SExplainCtx *ctx, i
|
|||
QRY_ERR_RET(qExplainBufAppendExecInfo(pResNode->pExecInfo, tbuf, &tlen));
|
||||
EXPLAIN_ROW_APPEND(EXPLAIN_BLANK_FORMAT);
|
||||
}
|
||||
|
||||
EXPLAIN_ROW_APPEND(EXPLAIN_COLUMNS_FORMAT, pTblScanNode->scan.pScanCols->length);
|
||||
EXPLAIN_ROW_APPEND(EXPLAIN_BLANK_FORMAT);
|
||||
EXPLAIN_ROW_APPEND(EXPLAIN_WIDTH_FORMAT, pTblScanNode->scan.node.pOutputDataBlockDesc->totalRowSize);
|
||||
|
@ -426,27 +427,57 @@ int32_t qExplainResNodeToRowsImpl(SExplainResNode *pResNode, SExplainCtx *ctx, i
|
|||
EXPLAIN_ROW_NEW(level + 1, "I/O: ");
|
||||
|
||||
int32_t nodeNum = taosArrayGetSize(pResNode->pExecInfo);
|
||||
for (int32_t i = 0; i < nodeNum; ++i) {
|
||||
struct STableScanAnalyzeInfo info = {0};
|
||||
|
||||
int32_t maxIndex = 0;
|
||||
int32_t totalRows = 0;
|
||||
for(int32_t i = 0; i < nodeNum; ++i) {
|
||||
SExplainExecInfo *execInfo = taosArrayGet(pResNode->pExecInfo, i);
|
||||
STableScanAnalyzeInfo *pScanInfo = (STableScanAnalyzeInfo *)execInfo->verboseInfo;
|
||||
|
||||
EXPLAIN_ROW_APPEND("total_blocks=%d", pScanInfo->totalBlocks);
|
||||
EXPLAIN_ROW_APPEND(EXPLAIN_BLANK_FORMAT);
|
||||
info.totalBlocks += pScanInfo->totalBlocks;
|
||||
info.loadBlocks += pScanInfo->loadBlocks;
|
||||
info.totalRows += pScanInfo->totalRows;
|
||||
info.skipBlocks += pScanInfo->skipBlocks;
|
||||
info.filterTime += pScanInfo->filterTime;
|
||||
info.loadBlockStatis += pScanInfo->loadBlockStatis;
|
||||
info.totalCheckedRows += pScanInfo->totalCheckedRows;
|
||||
info.filterOutBlocks += pScanInfo->filterOutBlocks;
|
||||
|
||||
EXPLAIN_ROW_APPEND("load_blocks=%d", pScanInfo->loadBlocks);
|
||||
EXPLAIN_ROW_APPEND(EXPLAIN_BLANK_FORMAT);
|
||||
|
||||
EXPLAIN_ROW_APPEND("load_block_SMAs=%d", pScanInfo->loadBlockStatis);
|
||||
EXPLAIN_ROW_APPEND(EXPLAIN_BLANK_FORMAT);
|
||||
|
||||
EXPLAIN_ROW_APPEND("total_rows=%" PRIu64, pScanInfo->totalRows);
|
||||
EXPLAIN_ROW_APPEND(EXPLAIN_BLANK_FORMAT);
|
||||
|
||||
EXPLAIN_ROW_APPEND("check_rows=%" PRIu64, pScanInfo->totalCheckedRows);
|
||||
EXPLAIN_ROW_APPEND(EXPLAIN_BLANK_FORMAT);
|
||||
if (pScanInfo->totalRows > totalRows) {
|
||||
totalRows = pScanInfo->totalRows;
|
||||
maxIndex = i;
|
||||
}
|
||||
}
|
||||
|
||||
EXPLAIN_ROW_APPEND("total_blocks=%.1f", ((double)info.totalBlocks) / nodeNum);
|
||||
EXPLAIN_ROW_APPEND(EXPLAIN_BLANK_FORMAT);
|
||||
|
||||
EXPLAIN_ROW_APPEND("load_blocks=%.1f", ((double)info.loadBlocks) / nodeNum);
|
||||
EXPLAIN_ROW_APPEND(EXPLAIN_BLANK_FORMAT);
|
||||
|
||||
EXPLAIN_ROW_APPEND("load_block_SMAs=%.1f", ((double)info.loadBlockStatis) / nodeNum);
|
||||
EXPLAIN_ROW_APPEND(EXPLAIN_BLANK_FORMAT);
|
||||
|
||||
EXPLAIN_ROW_APPEND("total_rows=%.1f", ((double)info.totalRows) / nodeNum);
|
||||
EXPLAIN_ROW_APPEND(EXPLAIN_BLANK_FORMAT);
|
||||
|
||||
EXPLAIN_ROW_APPEND("check_rows=%.1f", ((double)info.totalCheckedRows) / nodeNum);
|
||||
EXPLAIN_ROW_APPEND(EXPLAIN_BLANK_FORMAT);
|
||||
EXPLAIN_ROW_END();
|
||||
|
||||
QRY_ERR_RET(qExplainResAppendRow(ctx, tbuf, tlen, level + 1));
|
||||
|
||||
//Rows out: Avg 4166.7 rows x 24 workers. Max 4187 rows (seg7) with 0.220 ms to first row, 1.738 ms to end, start offset by 1.470 ms.
|
||||
SExplainExecInfo *execInfo = taosArrayGet(pResNode->pExecInfo, maxIndex);
|
||||
STableScanAnalyzeInfo *p1 = (STableScanAnalyzeInfo *)execInfo->verboseInfo;
|
||||
|
||||
EXPLAIN_ROW_NEW(level + 1, " ");
|
||||
EXPLAIN_ROW_APPEND("max_row_task=%d, total_rows:%" PRId64 ", ep:%s (cost=%.3f..%.3f)", maxIndex, p1->totalRows, "tbd",
|
||||
execInfo->startupCost, execInfo->totalCost);
|
||||
EXPLAIN_ROW_APPEND(EXPLAIN_BLANK_FORMAT);
|
||||
EXPLAIN_ROW_END();
|
||||
|
||||
QRY_ERR_RET(qExplainResAppendRow(ctx, tbuf, tlen, level + 1));
|
||||
}
|
||||
|
||||
|
|
|
@ -369,6 +369,8 @@ typedef struct SSysTableScanInfo {
|
|||
typedef struct SBlockDistInfo {
|
||||
SSDataBlock* pResBlock;
|
||||
void* pHandle;
|
||||
SReadHandle readHandle;
|
||||
uint64_t uid; // table uid
|
||||
} SBlockDistInfo;
|
||||
|
||||
// todo remove this
|
||||
|
@ -740,7 +742,8 @@ SOperatorInfo* createSessionAggOperatorInfo(SOperatorInfo* downstream, SExprInfo
|
|||
SOperatorInfo* createGroupOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols,
|
||||
SSDataBlock* pResultBlock, SArray* pGroupColList, SNode* pCondition,
|
||||
SExprInfo* pScalarExprInfo, int32_t numOfScalarExpr, SExecTaskInfo* pTaskInfo);
|
||||
SOperatorInfo* createDataBlockInfoScanOperator(void* dataReader, SExecTaskInfo* pTaskInfo);
|
||||
SOperatorInfo* createDataBlockInfoScanOperator(void* dataReader, SReadHandle* readHandle, uint64_t uid, SBlockDistScanPhysiNode* pBlockScanNode,
|
||||
SExecTaskInfo* pTaskInfo);
|
||||
|
||||
SOperatorInfo* createStreamScanOperatorInfo(void* pDataReader, SReadHandle* pHandle,
|
||||
STableScanPhysiNode* pTableScanNode, SExecTaskInfo* pTaskInfo, STimeWindowAggSupp* pTwSup);
|
||||
|
|
|
@ -99,7 +99,7 @@ int32_t qSetMultiStreamInput(qTaskInfo_t tinfo, const void* pBlocks, size_t numO
|
|||
}
|
||||
|
||||
qTaskInfo_t qCreateStreamExecTaskInfo(void* msg, void* streamReadHandle) {
|
||||
if (msg == NULL || streamReadHandle == NULL) {
|
||||
if (msg == NULL) {
|
||||
return NULL;
|
||||
}
|
||||
|
||||
|
|
|
@ -13,10 +13,10 @@
|
|||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
#include "os.h"
|
||||
#include "tref.h"
|
||||
#include "dataSinkMgt.h"
|
||||
#include "os.h"
|
||||
#include "tmsg.h"
|
||||
#include "tref.h"
|
||||
#include "tudf.h"
|
||||
|
||||
#include "executor.h"
|
||||
|
@ -24,15 +24,13 @@
|
|||
#include "query.h"
|
||||
|
||||
static TdThreadOnce initPoolOnce = PTHREAD_ONCE_INIT;
|
||||
int32_t exchangeObjRefPool = -1;
|
||||
int32_t exchangeObjRefPool = -1;
|
||||
|
||||
static void initRefPool() {
|
||||
exchangeObjRefPool = taosOpenRef(1024, doDestroyExchangeOperatorInfo);
|
||||
}
|
||||
static void initRefPool() { exchangeObjRefPool = taosOpenRef(1024, doDestroyExchangeOperatorInfo); }
|
||||
|
||||
int32_t qCreateExecTask(SReadHandle* readHandle, int32_t vgId, uint64_t taskId, SSubplan* pSubplan,
|
||||
qTaskInfo_t* pTaskInfo, DataSinkHandle* handle, const char* sql, EOPTR_EXEC_MODEL model) {
|
||||
assert(readHandle != NULL && pSubplan != NULL);
|
||||
assert(pSubplan != NULL);
|
||||
SExecTaskInfo** pTask = (SExecTaskInfo**)pTaskInfo;
|
||||
|
||||
taosThreadOnce(&initPoolOnce, initRefPool);
|
||||
|
@ -47,57 +45,57 @@ int32_t qCreateExecTask(SReadHandle* readHandle, int32_t vgId, uint64_t taskId,
|
|||
if (code != TSDB_CODE_SUCCESS) {
|
||||
goto _error;
|
||||
}
|
||||
|
||||
|
||||
if (handle) {
|
||||
void* pSinkParam = NULL;
|
||||
code = createDataSinkParam(pSubplan->pDataSink, &pSinkParam, pTaskInfo);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
goto _error;
|
||||
}
|
||||
|
||||
|
||||
code = dsCreateDataSinker(pSubplan->pDataSink, handle, pSinkParam);
|
||||
}
|
||||
|
||||
_error:
|
||||
_error:
|
||||
// if failed to add ref for all tables in this query, abort current query
|
||||
return code;
|
||||
}
|
||||
|
||||
#ifdef TEST_IMPL
|
||||
// wait moment
|
||||
int waitMoment(SQInfo* pQInfo){
|
||||
if(pQInfo->sql) {
|
||||
int ms = 0;
|
||||
int waitMoment(SQInfo* pQInfo) {
|
||||
if (pQInfo->sql) {
|
||||
int ms = 0;
|
||||
char* pcnt = strstr(pQInfo->sql, " count(*)");
|
||||
if(pcnt) return 0;
|
||||
|
||||
if (pcnt) return 0;
|
||||
|
||||
char* pos = strstr(pQInfo->sql, " t_");
|
||||
if(pos){
|
||||
if (pos) {
|
||||
pos += 3;
|
||||
ms = atoi(pos);
|
||||
while(*pos >= '0' && *pos <= '9'){
|
||||
pos ++;
|
||||
while (*pos >= '0' && *pos <= '9') {
|
||||
pos++;
|
||||
}
|
||||
char unit_char = *pos;
|
||||
if(unit_char == 'h'){
|
||||
ms *= 3600*1000;
|
||||
} else if(unit_char == 'm'){
|
||||
ms *= 60*1000;
|
||||
} else if(unit_char == 's'){
|
||||
if (unit_char == 'h') {
|
||||
ms *= 3600 * 1000;
|
||||
} else if (unit_char == 'm') {
|
||||
ms *= 60 * 1000;
|
||||
} else if (unit_char == 's') {
|
||||
ms *= 1000;
|
||||
}
|
||||
}
|
||||
if(ms == 0) return 0;
|
||||
if (ms == 0) return 0;
|
||||
printf("test wait sleep %dms. sql=%s ...\n", ms, pQInfo->sql);
|
||||
|
||||
if(ms < 1000) {
|
||||
|
||||
if (ms < 1000) {
|
||||
taosMsleep(ms);
|
||||
} else {
|
||||
int used_ms = 0;
|
||||
while(used_ms < ms) {
|
||||
while (used_ms < ms) {
|
||||
taosMsleep(1000);
|
||||
used_ms += 1000;
|
||||
if(isTaskKilled(pQInfo)){
|
||||
if (isTaskKilled(pQInfo)) {
|
||||
printf("test check query is canceled, sleep break.%s\n", pQInfo->sql);
|
||||
break;
|
||||
}
|
||||
|
@ -108,15 +106,14 @@ int waitMoment(SQInfo* pQInfo){
|
|||
}
|
||||
#endif
|
||||
|
||||
int32_t qExecTask(qTaskInfo_t tinfo, SSDataBlock** pRes, uint64_t *useconds) {
|
||||
int32_t qExecTask(qTaskInfo_t tinfo, SSDataBlock** pRes, uint64_t* useconds) {
|
||||
SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
|
||||
int64_t threadId = taosGetSelfPthreadId();
|
||||
|
||||
*pRes = NULL;
|
||||
int64_t curOwner = 0;
|
||||
if ((curOwner = atomic_val_compare_exchange_64(&pTaskInfo->owner, 0, threadId)) != 0) {
|
||||
qError("%s-%p execTask is now executed by thread:%p", GET_TASKID(pTaskInfo), pTaskInfo,
|
||||
(void*)curOwner);
|
||||
qError("%s-%p execTask is now executed by thread:%p", GET_TASKID(pTaskInfo), pTaskInfo, (void*)curOwner);
|
||||
pTaskInfo->code = TSDB_CODE_QRY_IN_EXEC;
|
||||
return pTaskInfo->code;
|
||||
}
|
||||
|
@ -152,18 +149,18 @@ int32_t qExecTask(qTaskInfo_t tinfo, SSDataBlock** pRes, uint64_t *useconds) {
|
|||
|
||||
cleanUpUdfs();
|
||||
|
||||
int32_t current = (*pRes != NULL)? (*pRes)->info.rows:0;
|
||||
int32_t current = (*pRes != NULL) ? (*pRes)->info.rows : 0;
|
||||
uint64_t total = pTaskInfo->pRoot->resultInfo.totalRows;
|
||||
|
||||
qDebug("%s task suspended, %d rows returned, total:%" PRId64 " rows, in sinkNode:%d, elapsed:%.2f ms",
|
||||
GET_TASKID(pTaskInfo), current, total, 0, el/1000.0);
|
||||
GET_TASKID(pTaskInfo), current, total, 0, el / 1000.0);
|
||||
|
||||
atomic_store_64(&pTaskInfo->owner, 0);
|
||||
return pTaskInfo->code;
|
||||
}
|
||||
|
||||
int32_t qKillTask(qTaskInfo_t qinfo) {
|
||||
SExecTaskInfo *pTaskInfo = (SExecTaskInfo *)qinfo;
|
||||
SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)qinfo;
|
||||
|
||||
if (pTaskInfo == NULL) {
|
||||
return TSDB_CODE_QRY_INVALID_QHANDLE;
|
||||
|
@ -182,7 +179,7 @@ int32_t qKillTask(qTaskInfo_t qinfo) {
|
|||
}
|
||||
|
||||
int32_t qAsyncKillTask(qTaskInfo_t qinfo) {
|
||||
SExecTaskInfo *pTaskInfo = (SExecTaskInfo *)qinfo;
|
||||
SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)qinfo;
|
||||
|
||||
if (pTaskInfo == NULL) {
|
||||
return TSDB_CODE_QRY_INVALID_QHANDLE;
|
||||
|
@ -195,7 +192,7 @@ int32_t qAsyncKillTask(qTaskInfo_t qinfo) {
|
|||
}
|
||||
|
||||
int32_t qIsTaskCompleted(qTaskInfo_t qinfo) {
|
||||
SExecTaskInfo *pTaskInfo = (SExecTaskInfo *)qinfo;
|
||||
SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)qinfo;
|
||||
|
||||
if (pTaskInfo == NULL) {
|
||||
return TSDB_CODE_QRY_INVALID_QHANDLE;
|
||||
|
@ -205,18 +202,18 @@ int32_t qIsTaskCompleted(qTaskInfo_t qinfo) {
|
|||
}
|
||||
|
||||
void qDestroyTask(qTaskInfo_t qTaskHandle) {
|
||||
SExecTaskInfo* pTaskInfo = (SExecTaskInfo*) qTaskHandle;
|
||||
qDebug("%s execTask completed, numOfRows:%"PRId64, GET_TASKID(pTaskInfo), pTaskInfo->pRoot->resultInfo.totalRows);
|
||||
SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)qTaskHandle;
|
||||
qDebug("%s execTask completed, numOfRows:%" PRId64, GET_TASKID(pTaskInfo), pTaskInfo->pRoot->resultInfo.totalRows);
|
||||
|
||||
queryCostStatis(pTaskInfo); // print the query cost summary
|
||||
queryCostStatis(pTaskInfo); // print the query cost summary
|
||||
doDestroyTask(pTaskInfo);
|
||||
}
|
||||
|
||||
int32_t qGetExplainExecInfo(qTaskInfo_t tinfo, int32_t *resNum, SExplainExecInfo **pRes) {
|
||||
SExecTaskInfo *pTaskInfo = (SExecTaskInfo *)tinfo;
|
||||
int32_t capacity = 0;
|
||||
int32_t qGetExplainExecInfo(qTaskInfo_t tinfo, int32_t* resNum, SExplainExecInfo** pRes) {
|
||||
SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
|
||||
int32_t capacity = 0;
|
||||
|
||||
return getOperatorExplainExecInfo(pTaskInfo->pRoot, pRes, &capacity, resNum);
|
||||
return getOperatorExplainExecInfo(pTaskInfo->pRoot, pRes, &capacity, resNum);
|
||||
}
|
||||
|
||||
int32_t qSerializeTaskStatus(qTaskInfo_t tinfo, char** pOutput, int32_t* len) {
|
||||
|
|
|
@ -141,8 +141,8 @@ static int32_t doCopyToSDataBlock(SExecTaskInfo* taskInfo, SSDataBlock* pBlock,
|
|||
SqlFunctionCtx* pCtx, int32_t numOfExprs);
|
||||
|
||||
static void initCtxOutputBuffer(SqlFunctionCtx* pCtx, int32_t size);
|
||||
static void doSetTableGroupOutputBuf(SOperatorInfo* pOperator, SAggOperatorInfo* pAggInfo, int32_t numOfOutput, uint64_t groupId);
|
||||
|
||||
static void doSetTableGroupOutputBuf(SOperatorInfo* pOperator, SAggOperatorInfo* pAggInfo, int32_t numOfOutput,
|
||||
uint64_t groupId);
|
||||
|
||||
// setup the output buffer for each operator
|
||||
static bool hasNull(SColumn* pColumn, SColumnDataAgg* pStatis) {
|
||||
|
@ -1230,8 +1230,9 @@ void initResultRow(SResultRow* pResultRow) {
|
|||
* offset[0] offset[1] offset[2]
|
||||
*/
|
||||
// TODO refactor: some function move away
|
||||
void setFunctionResultOutput(SOperatorInfo *pOperator, SOptrBasicInfo *pInfo, SAggSupporter* pSup, int32_t stage, int32_t numOfExprs) {
|
||||
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
|
||||
void setFunctionResultOutput(SOperatorInfo* pOperator, SOptrBasicInfo* pInfo, SAggSupporter* pSup, int32_t stage,
|
||||
int32_t numOfExprs) {
|
||||
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
|
||||
SqlFunctionCtx* pCtx = pOperator->exprSupp.pCtx;
|
||||
int32_t* rowEntryInfoOffset = pOperator->exprSupp.rowEntryInfoOffset;
|
||||
|
||||
|
@ -1380,9 +1381,10 @@ void extractQualifiedTupleByFilterResult(SSDataBlock* pBlock, const int8_t* rowR
|
|||
}
|
||||
}
|
||||
|
||||
void doSetTableGroupOutputBuf(SOperatorInfo* pOperator, SAggOperatorInfo* pAggInfo, int32_t numOfOutput, uint64_t groupId) {
|
||||
void doSetTableGroupOutputBuf(SOperatorInfo* pOperator, SAggOperatorInfo* pAggInfo, int32_t numOfOutput,
|
||||
uint64_t groupId) {
|
||||
// for simple group by query without interval, all the tables belong to one group result.
|
||||
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
|
||||
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
|
||||
SResultRowInfo* pResultRowInfo = &pAggInfo->binfo.resultRowInfo;
|
||||
SqlFunctionCtx* pCtx = pOperator->exprSupp.pCtx;
|
||||
int32_t* rowEntryInfoOffset = pOperator->exprSupp.rowEntryInfoOffset;
|
||||
|
@ -2040,8 +2042,8 @@ static int32_t doSendFetchDataRequest(SExchangeInfo* pExchangeInfo, SExecTaskInf
|
|||
}
|
||||
|
||||
int32_t extractDataBlockFromFetchRsp(SSDataBlock* pRes, SLoadRemoteDataInfo* pLoadInfo, int32_t numOfRows, char* pData,
|
||||
int32_t compLen, int32_t numOfOutput, int64_t startTs, uint64_t* total,
|
||||
SArray* pColList) {
|
||||
int32_t compLen, int32_t numOfOutput, int64_t startTs, uint64_t* total,
|
||||
SArray* pColList) {
|
||||
if (pColList == NULL) { // data from other sources
|
||||
blockCompressDecode(pRes, numOfOutput, numOfRows, pData);
|
||||
pRes->info.rows = numOfRows;
|
||||
|
@ -2165,8 +2167,9 @@ static SSDataBlock* concurrentlyLoadRemoteDataImpl(SOperatorInfo* pOperator, SEx
|
|||
}
|
||||
|
||||
SRetrieveTableRsp* pTableRsp = pDataInfo->pRsp;
|
||||
code = extractDataBlockFromFetchRsp(pExchangeInfo->pResult, pLoadInfo, pTableRsp->numOfRows, pTableRsp->data,
|
||||
pTableRsp->compLen, pTableRsp->numOfCols, startTs, &pDataInfo->totalRows, NULL);
|
||||
code =
|
||||
extractDataBlockFromFetchRsp(pExchangeInfo->pResult, pLoadInfo, pTableRsp->numOfRows, pTableRsp->data,
|
||||
pTableRsp->compLen, pTableRsp->numOfCols, startTs, &pDataInfo->totalRows, NULL);
|
||||
if (code != 0) {
|
||||
taosMemoryFreeClear(pDataInfo->pRsp);
|
||||
goto _error;
|
||||
|
@ -2281,7 +2284,7 @@ static SSDataBlock* seqLoadRemoteData(SOperatorInfo* pOperator) {
|
|||
SRetrieveTableRsp* pTableRsp = pDataInfo->pRsp;
|
||||
int32_t code =
|
||||
extractDataBlockFromFetchRsp(pExchangeInfo->pResult, pLoadInfo, pTableRsp->numOfRows, pTableRsp->data,
|
||||
pTableRsp->compLen, pTableRsp->numOfCols, startTs, &pDataInfo->totalRows, NULL);
|
||||
pTableRsp->compLen, pTableRsp->numOfCols, startTs, &pDataInfo->totalRows, NULL);
|
||||
|
||||
if (pRsp->completed == 1) {
|
||||
qDebug("%s fetch msg rsp from vgId:%d, taskId:0x%" PRIx64 " numOfRows:%d, rowsOfSource:%" PRIu64
|
||||
|
@ -2673,8 +2676,8 @@ static SSDataBlock* doSortedMerge(SOperatorInfo* pOperator) {
|
|||
}
|
||||
|
||||
int32_t numOfBufPage = pInfo->sortBufSize / pInfo->bufPageSize;
|
||||
pInfo->pSortHandle = tsortCreateSortHandle(pInfo->pSortInfo, SORT_MULTISOURCE_MERGE, pInfo->bufPageSize,
|
||||
numOfBufPage, pInfo->binfo.pRes, "GET_TASKID(pTaskInfo)");
|
||||
pInfo->pSortHandle = tsortCreateSortHandle(pInfo->pSortInfo, SORT_MULTISOURCE_MERGE, pInfo->bufPageSize, numOfBufPage,
|
||||
pInfo->binfo.pRes, "GET_TASKID(pTaskInfo)");
|
||||
|
||||
tsortSetFetchRawDataFp(pInfo->pSortHandle, loadNextDataBlock, NULL, NULL);
|
||||
|
||||
|
@ -2812,7 +2815,8 @@ int32_t getTableScanInfo(SOperatorInfo* pOperator, int32_t* order, int32_t* scan
|
|||
// todo add more information about exchange operation
|
||||
int32_t type = pOperator->operatorType;
|
||||
if (type == QUERY_NODE_PHYSICAL_PLAN_EXCHANGE || type == QUERY_NODE_PHYSICAL_PLAN_SYSTABLE_SCAN ||
|
||||
type == QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN || type == QUERY_NODE_PHYSICAL_PLAN_TAG_SCAN) {
|
||||
type == QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN || type == QUERY_NODE_PHYSICAL_PLAN_TAG_SCAN ||
|
||||
type == QUERY_NODE_PHYSICAL_PLAN_BLOCK_DIST_SCAN) {
|
||||
*order = TSDB_ORDER_ASC;
|
||||
*scanFlag = MAIN_SCAN;
|
||||
return TSDB_CODE_SUCCESS;
|
||||
|
@ -2840,7 +2844,6 @@ static int32_t doOpenAggregateOptr(SOperatorInfo* pOperator) {
|
|||
SAggOperatorInfo* pAggInfo = pOperator->info;
|
||||
|
||||
SExprSupp* pSup = &pOperator->exprSupp;
|
||||
SOptrBasicInfo* pInfo = &pAggInfo->binfo;
|
||||
SOperatorInfo* downstream = pOperator->pDownstream[0];
|
||||
|
||||
int64_t st = taosGetTimestampUs();
|
||||
|
@ -3130,7 +3133,7 @@ static SSDataBlock* doProjectOperation(SOperatorInfo* pOperator) {
|
|||
SProjectOperatorInfo* pProjectInfo = pOperator->info;
|
||||
SOptrBasicInfo* pInfo = &pProjectInfo->binfo;
|
||||
|
||||
SExprSupp* pSup = &pOperator->exprSupp;
|
||||
SExprSupp* pSup = &pOperator->exprSupp;
|
||||
SSDataBlock* pRes = pInfo->pRes;
|
||||
blockDataCleanup(pRes);
|
||||
|
||||
|
@ -3403,8 +3406,8 @@ void cleanupAggSup(SAggSupporter* pAggSup) {
|
|||
destroyDiskbasedBuf(pAggSup->pResultBuf);
|
||||
}
|
||||
|
||||
int32_t initAggInfo(SExprSupp* pSup, SAggSupporter* pAggSup, SExprInfo* pExprInfo, int32_t numOfCols,
|
||||
size_t keyBufSize, const char* pkey) {
|
||||
int32_t initAggInfo(SExprSupp* pSup, SAggSupporter* pAggSup, SExprInfo* pExprInfo, int32_t numOfCols, size_t keyBufSize,
|
||||
const char* pkey) {
|
||||
initExprSupp(pSup, pExprInfo, numOfCols);
|
||||
doInitAggInfoSup(pAggSup, pSup->pCtx, numOfCols, keyBufSize, pkey);
|
||||
for (int32_t i = 0; i < numOfCols; ++i) {
|
||||
|
@ -3457,12 +3460,12 @@ SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SExprInfo*
|
|||
initBasicInfo(&pInfo->binfo, pResultBlock);
|
||||
initExprSupp(&pInfo->scalarExprSup, pScalarExprInfo, numOfScalarExpr);
|
||||
|
||||
pInfo->groupId = INT32_MIN;
|
||||
pOperator->name = "TableAggregate";
|
||||
pInfo->groupId = INT32_MIN;
|
||||
pOperator->name = "TableAggregate";
|
||||
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_HASH_AGG;
|
||||
pOperator->blocking = true;
|
||||
pOperator->status = OP_NOT_OPENED;
|
||||
pOperator->info = pInfo;
|
||||
pOperator->blocking = true;
|
||||
pOperator->status = OP_NOT_OPENED;
|
||||
pOperator->info = pInfo;
|
||||
pOperator->pTaskInfo = pTaskInfo;
|
||||
|
||||
pOperator->fpSet = createOperatorFpSet(doOpenAggregateOptr, getAggregateResult, NULL, NULL, destroyAggOperatorInfo,
|
||||
|
@ -3578,25 +3581,26 @@ static SArray* setRowTsColumnOutputInfo(SqlFunctionCtx* pCtx, int32_t numOfCols)
|
|||
return pList;
|
||||
}
|
||||
|
||||
SOperatorInfo* createProjectOperatorInfo(SOperatorInfo* downstream, SProjectPhysiNode* pProjPhyNode, SExecTaskInfo* pTaskInfo) {
|
||||
SOperatorInfo* createProjectOperatorInfo(SOperatorInfo* downstream, SProjectPhysiNode* pProjPhyNode,
|
||||
SExecTaskInfo* pTaskInfo) {
|
||||
SProjectOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SProjectOperatorInfo));
|
||||
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
|
||||
if (pInfo == NULL || pOperator == NULL) {
|
||||
goto _error;
|
||||
}
|
||||
|
||||
int32_t numOfCols = 0;
|
||||
int32_t numOfCols = 0;
|
||||
SExprInfo* pExprInfo = createExprInfo(pProjPhyNode->pProjections, NULL, &numOfCols);
|
||||
|
||||
SSDataBlock* pResBlock = createResDataBlock(pProjPhyNode->node.pOutputDataBlockDesc);
|
||||
SLimit limit = {.limit = pProjPhyNode->limit, .offset = pProjPhyNode->offset};
|
||||
SLimit slimit = {.limit = pProjPhyNode->slimit, .offset = pProjPhyNode->soffset};
|
||||
|
||||
pInfo->limit = limit;
|
||||
pInfo->slimit = slimit;
|
||||
pInfo->curOffset = limit.offset;
|
||||
pInfo->curSOffset = slimit.offset;
|
||||
pInfo->binfo.pRes = pResBlock;
|
||||
pInfo->limit = limit;
|
||||
pInfo->slimit = slimit;
|
||||
pInfo->curOffset = limit.offset;
|
||||
pInfo->curSOffset = slimit.offset;
|
||||
pInfo->binfo.pRes = pResBlock;
|
||||
pInfo->pFilterNode = pProjPhyNode->node.pConditions;
|
||||
|
||||
int32_t numOfRows = 4096;
|
||||
|
@ -3614,12 +3618,12 @@ SOperatorInfo* createProjectOperatorInfo(SOperatorInfo* downstream, SProjectPhys
|
|||
setFunctionResultOutput(pOperator, &pInfo->binfo, &pInfo->aggSup, MAIN_SCAN, numOfCols);
|
||||
|
||||
pInfo->pPseudoColInfo = setRowTsColumnOutputInfo(pOperator->exprSupp.pCtx, numOfCols);
|
||||
pOperator->name = "ProjectOperator";
|
||||
pOperator->name = "ProjectOperator";
|
||||
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_PROJECT;
|
||||
pOperator->blocking = false;
|
||||
pOperator->status = OP_NOT_OPENED;
|
||||
pOperator->info = pInfo;
|
||||
pOperator->pTaskInfo = pTaskInfo;
|
||||
pOperator->blocking = false;
|
||||
pOperator->status = OP_NOT_OPENED;
|
||||
pOperator->info = pInfo;
|
||||
pOperator->pTaskInfo = pTaskInfo;
|
||||
|
||||
pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doProjectOperation, NULL, NULL,
|
||||
destroyProjectOperatorInfo, NULL, NULL, NULL);
|
||||
|
@ -3639,7 +3643,7 @@ _error:
|
|||
static SSDataBlock* doApplyIndefinitFunction(SOperatorInfo* pOperator) {
|
||||
SIndefOperatorInfo* pIndefInfo = pOperator->info;
|
||||
SOptrBasicInfo* pInfo = &pIndefInfo->binfo;
|
||||
SExprSupp* pSup = &pOperator->exprSupp;
|
||||
SExprSupp* pSup = &pOperator->exprSupp;
|
||||
|
||||
SSDataBlock* pRes = pInfo->pRes;
|
||||
blockDataCleanup(pRes);
|
||||
|
@ -3677,7 +3681,7 @@ static SSDataBlock* doApplyIndefinitFunction(SOperatorInfo* pOperator) {
|
|||
SExprSupp* pScalarSup = &pIndefInfo->scalarSup;
|
||||
if (pScalarSup->pExprInfo != NULL) {
|
||||
code = projectApplyFunctions(pScalarSup->pExprInfo, pBlock, pBlock, pScalarSup->pCtx, pScalarSup->numOfExprs,
|
||||
pIndefInfo->pPseudoColInfo);
|
||||
pIndefInfo->pPseudoColInfo);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
longjmp(pTaskInfo->env, code);
|
||||
}
|
||||
|
@ -3686,8 +3690,8 @@ static SSDataBlock* doApplyIndefinitFunction(SOperatorInfo* pOperator) {
|
|||
setInputDataBlock(pOperator, pSup->pCtx, pBlock, order, scanFlag, false);
|
||||
blockDataEnsureCapacity(pInfo->pRes, pInfo->pRes->info.rows + pBlock->info.rows);
|
||||
|
||||
code = projectApplyFunctions(pOperator->exprSupp.pExprInfo, pInfo->pRes, pBlock, pSup->pCtx, pOperator->exprSupp.numOfExprs,
|
||||
pIndefInfo->pPseudoColInfo);
|
||||
code = projectApplyFunctions(pOperator->exprSupp.pExprInfo, pInfo->pRes, pBlock, pSup->pCtx,
|
||||
pOperator->exprSupp.numOfExprs, pIndefInfo->pPseudoColInfo);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
longjmp(pTaskInfo->env, code);
|
||||
}
|
||||
|
@ -3745,14 +3749,14 @@ SOperatorInfo* createIndefinitOutputOperatorInfo(SOperatorInfo* downstream, SPhy
|
|||
pInfo->binfo.pRes = pResBlock;
|
||||
pInfo->pPseudoColInfo = setRowTsColumnOutputInfo(pSup->pCtx, numOfExpr);
|
||||
|
||||
pOperator->name = "IndefinitOperator";
|
||||
pOperator->name = "IndefinitOperator";
|
||||
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_PROJECT;
|
||||
pOperator->blocking = false;
|
||||
pOperator->status = OP_NOT_OPENED;
|
||||
pOperator->info = pInfo;
|
||||
pOperator->blocking = false;
|
||||
pOperator->status = OP_NOT_OPENED;
|
||||
pOperator->info = pInfo;
|
||||
pOperator->exprSupp.pExprInfo = pExprInfo;
|
||||
pOperator->exprSupp.numOfExprs = numOfExpr;
|
||||
pOperator->pTaskInfo = pTaskInfo;
|
||||
pOperator->pTaskInfo = pTaskInfo;
|
||||
|
||||
pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doApplyIndefinitFunction, NULL, NULL,
|
||||
destroyIndefinitOperatorInfo, NULL, NULL, NULL);
|
||||
|
@ -3827,11 +3831,11 @@ SOperatorInfo* createFillOperatorInfo(SOperatorInfo* downstream, SFillPhysiNode*
|
|||
goto _error;
|
||||
}
|
||||
|
||||
int32_t num = 0;
|
||||
SSDataBlock* pResBlock = createResDataBlock(pPhyFillNode->node.pOutputDataBlockDesc);
|
||||
SExprInfo* pExprInfo = createExprInfo(pPhyFillNode->pTargets, NULL, &num);
|
||||
SInterval* pInterval = &((SIntervalAggOperatorInfo*)downstream->info)->interval;
|
||||
int32_t type = convertFillType(pPhyFillNode->mode);
|
||||
int32_t num = 0;
|
||||
SSDataBlock* pResBlock = createResDataBlock(pPhyFillNode->node.pOutputDataBlockDesc);
|
||||
SExprInfo* pExprInfo = createExprInfo(pPhyFillNode->pTargets, NULL, &num);
|
||||
SInterval* pInterval = &((SIntervalAggOperatorInfo*)downstream->info)->interval;
|
||||
int32_t type = convertFillType(pPhyFillNode->mode);
|
||||
|
||||
SResultInfo* pResultInfo = &pOperator->resultInfo;
|
||||
initResultSizeInfo(pOperator, 4096);
|
||||
|
@ -3842,16 +3846,16 @@ SOperatorInfo* createFillOperatorInfo(SOperatorInfo* downstream, SFillPhysiNode*
|
|||
goto _error;
|
||||
}
|
||||
|
||||
pInfo->pRes = pResBlock;
|
||||
pInfo->pRes = pResBlock;
|
||||
pInfo->multigroupResult = multigroupResult;
|
||||
pOperator->name = "FillOperator";
|
||||
pOperator->blocking = false;
|
||||
pOperator->status = OP_NOT_OPENED;
|
||||
pOperator->name = "FillOperator";
|
||||
pOperator->blocking = false;
|
||||
pOperator->status = OP_NOT_OPENED;
|
||||
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_FILL;
|
||||
pOperator->exprSupp.pExprInfo = pExprInfo;
|
||||
pOperator->exprSupp.numOfExprs = num;
|
||||
pOperator->info = pInfo;
|
||||
pOperator->pTaskInfo = pTaskInfo;
|
||||
pOperator->exprSupp.pExprInfo = pExprInfo;
|
||||
pOperator->exprSupp.numOfExprs = num;
|
||||
pOperator->info = pInfo;
|
||||
pOperator->pTaskInfo = pTaskInfo;
|
||||
|
||||
pOperator->fpSet =
|
||||
createOperatorFpSet(operatorDummyOpenFn, doFill, NULL, NULL, destroySFillOperatorInfo, NULL, NULL, NULL);
|
||||
|
@ -4050,10 +4054,14 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
|
|||
STimeWindowAggSupp twSup = {
|
||||
.waterMark = pTableScanNode->watermark, .calTrigger = pTableScanNode->triggerType, .maxTs = INT64_MIN};
|
||||
tsdbReaderT pDataReader = NULL;
|
||||
if (pHandle->vnode) {
|
||||
pDataReader = doCreateDataReader(pTableScanNode, pHandle, pTableListInfo, (uint64_t)queryId, taskId, pTagCond);
|
||||
} else {
|
||||
getTableList(pHandle->meta, pScanPhyNode, pTableListInfo, pTagCond);
|
||||
|
||||
if (pHandle) {
|
||||
if (pHandle->vnode) {
|
||||
pDataReader =
|
||||
doCreateDataReader(pTableScanNode, pHandle, pTableListInfo, (uint64_t)queryId, taskId, pTagCond);
|
||||
} else {
|
||||
getTableList(pHandle->meta, pScanPhyNode, pTableListInfo, pTagCond);
|
||||
}
|
||||
}
|
||||
|
||||
if (pDataReader == NULL && terrno != 0) {
|
||||
|
@ -4087,6 +4095,46 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
|
|||
}
|
||||
|
||||
return createTagScanOperatorInfo(pHandle, pScanPhyNode, pTableListInfo, pTaskInfo);
|
||||
} else if (QUERY_NODE_PHYSICAL_PLAN_BLOCK_DIST_SCAN == type) {
|
||||
SBlockDistScanPhysiNode* pBlockNode = (SBlockDistScanPhysiNode*) pPhyNode;
|
||||
pTableListInfo->pTableList = taosArrayInit(4, sizeof(STableKeyInfo));
|
||||
|
||||
if (pBlockNode->tableType == TSDB_SUPER_TABLE) {
|
||||
int32_t code = tsdbGetAllTableList(pHandle->meta, pBlockNode->uid, pTableListInfo->pTableList);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
pTaskInfo->code = terrno;
|
||||
return NULL;
|
||||
}
|
||||
} else { // Create one table group.
|
||||
STableKeyInfo info = {.lastKey = 0, .uid = pBlockNode->uid};
|
||||
taosArrayPush(pTableListInfo->pTableList, &info);
|
||||
}
|
||||
|
||||
SQueryTableDataCond cond = {0};
|
||||
|
||||
{
|
||||
cond.order = TSDB_ORDER_ASC;
|
||||
cond.numOfCols = 1;
|
||||
cond.colList = taosMemoryCalloc(1, sizeof(SColumnInfo));
|
||||
if (cond.colList == NULL) {
|
||||
terrno = TSDB_CODE_QRY_OUT_OF_MEMORY;
|
||||
return NULL;
|
||||
}
|
||||
|
||||
cond.colList->colId = 1;
|
||||
cond.colList->type = TSDB_DATA_TYPE_TIMESTAMP;
|
||||
cond.colList->bytes = sizeof(TSKEY);
|
||||
|
||||
cond.numOfTWindows = 1;
|
||||
cond.twindows = taosMemoryCalloc(1, sizeof(STimeWindow));
|
||||
cond.twindows[0] = (STimeWindow){.skey = INT64_MIN, .ekey = INT64_MAX};
|
||||
cond.suid = pBlockNode->suid;
|
||||
cond.type = BLOCK_LOAD_OFFSET_SEQ_ORDER;
|
||||
}
|
||||
tsdbReaderT* pReader = tsdbReaderOpen(pHandle->vnode, &cond, pTableListInfo, queryId, taskId);
|
||||
cleanupQueryTableDataCond(&cond);
|
||||
|
||||
return createDataBlockInfoScanOperator(pReader, pHandle, cond.suid, pBlockNode, pTaskInfo);
|
||||
} else {
|
||||
ASSERT(0);
|
||||
}
|
||||
|
@ -4270,7 +4318,7 @@ SArray* extractColumnInfo(SNodeList* pNodeList) {
|
|||
SColumn c = {0};
|
||||
c.slotId = pNode->slotId;
|
||||
c.colId = pNode->slotId;
|
||||
c.type = pValNode->node.type;
|
||||
c.type = pValNode->node.type;
|
||||
c.bytes = pValNode->node.resType.bytes;
|
||||
c.scale = pValNode->node.resType.scale;
|
||||
c.precision = pValNode->node.resType.precision;
|
||||
|
@ -4284,8 +4332,7 @@ SArray* extractColumnInfo(SNodeList* pNodeList) {
|
|||
|
||||
tsdbReaderT doCreateDataReader(STableScanPhysiNode* pTableScanNode, SReadHandle* pHandle,
|
||||
STableListInfo* pTableListInfo, uint64_t queryId, uint64_t taskId, SNode* pTagCond) {
|
||||
int32_t code =
|
||||
getTableList(pHandle->meta, &pTableScanNode->scan, pTableListInfo, pTagCond);
|
||||
int32_t code = getTableList(pHandle->meta, &pTableScanNode->scan, pTableListInfo, pTagCond);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
goto _error;
|
||||
}
|
||||
|
@ -4374,7 +4421,7 @@ int32_t decodeOperator(SOperatorInfo* ops, const char* result, int32_t length) {
|
|||
ASSERT(length == *(int32_t*)result);
|
||||
|
||||
const char* data = result + sizeof(int32_t);
|
||||
code = ops->fpSet.decodeResultRow(ops, (char*) data);
|
||||
code = ops->fpSet.decodeResultRow(ops, (char*)data);
|
||||
if (code != TDB_CODE_SUCCESS) {
|
||||
return code;
|
||||
}
|
||||
|
|
|
@ -13,6 +13,7 @@
|
|||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
#include <vnode.h>
|
||||
#include "filter.h"
|
||||
#include "function.h"
|
||||
#include "functionMgt.h"
|
||||
|
@ -367,13 +368,14 @@ void setTbNameColData(void* pMeta, const SSDataBlock* pBlock, SColumnInfoData* p
|
|||
|
||||
static SSDataBlock* doTableScanImpl(SOperatorInfo* pOperator) {
|
||||
STableScanInfo* pTableScanInfo = pOperator->info;
|
||||
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
|
||||
SSDataBlock* pBlock = pTableScanInfo->pResBlock;
|
||||
|
||||
int64_t st = taosGetTimestampUs();
|
||||
|
||||
while (tsdbNextDataBlock(pTableScanInfo->dataReader)) {
|
||||
if (isTaskKilled(pOperator->pTaskInfo)) {
|
||||
longjmp(pOperator->pTaskInfo->env, TSDB_CODE_TSC_QUERY_CANCELLED);
|
||||
if (isTaskKilled(pTaskInfo)) {
|
||||
longjmp(pTaskInfo->env, TSDB_CODE_TSC_QUERY_CANCELLED);
|
||||
}
|
||||
|
||||
// process this data block based on the probabilities
|
||||
|
@ -396,7 +398,7 @@ static SSDataBlock* doTableScanImpl(SOperatorInfo* pOperator) {
|
|||
continue;
|
||||
}
|
||||
|
||||
uint64_t* groupId = taosHashGet(pOperator->pTaskInfo->tableqinfoList.map, &pBlock->info.uid, sizeof(int64_t));
|
||||
uint64_t* groupId = taosHashGet(pTaskInfo->tableqinfoList.map, &pBlock->info.uid, sizeof(int64_t));
|
||||
if (groupId) {
|
||||
pBlock->info.groupId = *groupId;
|
||||
}
|
||||
|
@ -525,7 +527,7 @@ SOperatorInfo* createTableScanOperatorInfo(STableScanPhysiNode* pTableScanNode,
|
|||
goto _error;
|
||||
}
|
||||
|
||||
//taosSsleep(20);
|
||||
// taosSsleep(20);
|
||||
|
||||
SDataBlockDescNode* pDescNode = pTableScanNode->scan.node.pOutputDataBlockDesc;
|
||||
|
||||
|
@ -589,44 +591,76 @@ SOperatorInfo* createTableSeqScanOperatorInfo(void* pReadHandle, SExecTaskInfo*
|
|||
pInfo->dataReader = pReadHandle;
|
||||
// pInfo->prevGroupId = -1;
|
||||
|
||||
pOperator->name = "TableSeqScanOperator";
|
||||
pOperator->name = "TableSeqScanOperator";
|
||||
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_TABLE_SEQ_SCAN;
|
||||
pOperator->blocking = false;
|
||||
pOperator->status = OP_NOT_OPENED;
|
||||
pOperator->info = pInfo;
|
||||
pOperator->pTaskInfo = pTaskInfo;
|
||||
pOperator->blocking = false;
|
||||
pOperator->status = OP_NOT_OPENED;
|
||||
pOperator->info = pInfo;
|
||||
pOperator->pTaskInfo = pTaskInfo;
|
||||
|
||||
pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doTableScanImpl, NULL, NULL, NULL, NULL, NULL, NULL);
|
||||
return pOperator;
|
||||
}
|
||||
|
||||
static int32_t doGetTableRowSize(void* pMeta, uint64_t uid) {
|
||||
int32_t rowLen = 0;
|
||||
|
||||
SMetaReader mr = {0};
|
||||
metaReaderInit(&mr, pMeta, 0);
|
||||
metaGetTableEntryByUid(&mr, uid);
|
||||
if (mr.me.type == TSDB_SUPER_TABLE) {
|
||||
int32_t numOfCols = mr.me.stbEntry.schemaRow.nCols;
|
||||
for(int32_t i = 0; i < numOfCols; ++i) {
|
||||
rowLen += mr.me.stbEntry.schemaRow.pSchema[i].bytes;
|
||||
}
|
||||
} else if (mr.me.type == TSDB_CHILD_TABLE) {
|
||||
uint64_t suid = mr.me.ctbEntry.suid;
|
||||
metaGetTableEntryByUid(&mr, suid);
|
||||
int32_t numOfCols = mr.me.stbEntry.schemaRow.nCols;
|
||||
|
||||
for(int32_t i = 0; i < numOfCols; ++i) {
|
||||
rowLen += mr.me.stbEntry.schemaRow.pSchema[i].bytes;
|
||||
}
|
||||
} else if (mr.me.type == TSDB_NORMAL_TABLE) {
|
||||
int32_t numOfCols = mr.me.ntbEntry.schemaRow.nCols;
|
||||
for(int32_t i = 0; i < numOfCols; ++i) {
|
||||
rowLen += mr.me.ntbEntry.schemaRow.pSchema[i].bytes;
|
||||
}
|
||||
}
|
||||
|
||||
metaReaderClear(&mr);
|
||||
return rowLen;
|
||||
}
|
||||
|
||||
static SSDataBlock* doBlockInfoScan(SOperatorInfo* pOperator) {
|
||||
if (pOperator->status == OP_EXEC_DONE) {
|
||||
return NULL;
|
||||
}
|
||||
|
||||
STableScanInfo* pTableScanInfo = pOperator->info;
|
||||
SBlockDistInfo* pBlockScanInfo = pOperator->info;
|
||||
|
||||
STableBlockDistInfo blockDistInfo = {0};
|
||||
blockDistInfo.maxRows = INT_MIN;
|
||||
blockDistInfo.minRows = INT_MAX;
|
||||
STableBlockDistInfo blockDistInfo = {.minRows = INT_MAX, .maxRows = INT_MIN};
|
||||
blockDistInfo.rowSize = doGetTableRowSize(pBlockScanInfo->readHandle.meta, pBlockScanInfo->uid);
|
||||
|
||||
tsdbGetFileBlocksDistInfo(pTableScanInfo->dataReader, &blockDistInfo);
|
||||
blockDistInfo.numOfInmemRows = (int32_t)tsdbGetNumOfRowsInMemTable(pTableScanInfo->dataReader);
|
||||
tsdbGetFileBlocksDistInfo(pBlockScanInfo->pHandle, &blockDistInfo);
|
||||
blockDistInfo.numOfInmemRows = (int32_t)tsdbGetNumOfRowsInMemTable(pBlockScanInfo->pHandle);
|
||||
|
||||
SSDataBlock* pBlock = pTableScanInfo->pResBlock;
|
||||
pBlock->info.rows = 1;
|
||||
SSDataBlock* pBlock = pBlockScanInfo->pResBlock;
|
||||
|
||||
SColumnInfoData* pColInfo = taosArrayGet(pBlock->pDataBlock, 0);
|
||||
int32_t slotId = pOperator->exprSupp.pExprInfo->base.resSchema.slotId;
|
||||
SColumnInfoData* pColInfo = taosArrayGet(pBlock->pDataBlock, slotId);
|
||||
|
||||
int32_t len = tSerializeBlockDistInfo(NULL, 0, &blockDistInfo);
|
||||
char* p = taosMemoryCalloc(1, len + VARSTR_HEADER_SIZE);
|
||||
tSerializeBlockDistInfo(varDataVal(p), len, &blockDistInfo);
|
||||
varDataSetLen(p, len);
|
||||
|
||||
blockDataEnsureCapacity(pBlock, 1);
|
||||
colDataAppend(pColInfo, 0, p, false);
|
||||
taosMemoryFree(p);
|
||||
|
||||
pBlock->info.rows = 1;
|
||||
|
||||
pOperator->status = OP_EXEC_DONE;
|
||||
return pBlock;
|
||||
}
|
||||
|
@ -636,7 +670,8 @@ static void destroyBlockDistScanOperatorInfo(void* param, int32_t numOfOutput) {
|
|||
blockDataDestroy(pDistInfo->pResBlock);
|
||||
}
|
||||
|
||||
SOperatorInfo* createDataBlockInfoScanOperator(void* dataReader, SExecTaskInfo* pTaskInfo) {
|
||||
SOperatorInfo* createDataBlockInfoScanOperator(void* dataReader, SReadHandle* readHandle, uint64_t uid,
|
||||
SBlockDistScanPhysiNode* pBlockScanNode, SExecTaskInfo* pTaskInfo) {
|
||||
SBlockDistInfo* pInfo = taosMemoryCalloc(1, sizeof(SBlockDistInfo));
|
||||
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
|
||||
if (pInfo == NULL || pOperator == NULL) {
|
||||
|
@ -644,21 +679,20 @@ SOperatorInfo* createDataBlockInfoScanOperator(void* dataReader, SExecTaskInfo*
|
|||
goto _error;
|
||||
}
|
||||
|
||||
pInfo->pHandle = dataReader;
|
||||
pInfo->pHandle = dataReader;
|
||||
pInfo->readHandle = *readHandle;
|
||||
pInfo->uid = uid;
|
||||
pInfo->pResBlock = createResDataBlock(pBlockScanNode->node.pOutputDataBlockDesc);
|
||||
|
||||
pInfo->pResBlock = taosMemoryCalloc(1, sizeof(SSDataBlock));
|
||||
int32_t numOfCols = 0;
|
||||
SExprInfo* pExprInfo = createExprInfo(pBlockScanNode->pScanPseudoCols, NULL, &numOfCols);
|
||||
initExprSupp(&pOperator->exprSupp, pExprInfo, numOfCols);
|
||||
|
||||
SColumnInfoData infoData = {0};
|
||||
infoData.info.type = TSDB_DATA_TYPE_VARCHAR;
|
||||
infoData.info.bytes = 1024;
|
||||
|
||||
taosArrayPush(pInfo->pResBlock->pDataBlock, &infoData);
|
||||
|
||||
pOperator->name = "DataBlockInfoScanOperator";
|
||||
// pOperator->operatorType = OP_TableBlockInfoScan;
|
||||
pOperator->blocking = false;
|
||||
pOperator->status = OP_NOT_OPENED;
|
||||
pOperator->info = pInfo;
|
||||
pOperator->name = "DataBlockDistScanOperator";
|
||||
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_BLOCK_DIST_SCAN;
|
||||
pOperator->blocking = false;
|
||||
pOperator->status = OP_NOT_OPENED;
|
||||
pOperator->info = pInfo;
|
||||
pOperator->pTaskInfo = pTaskInfo;
|
||||
|
||||
pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doBlockInfoScan, NULL, NULL,
|
||||
|
@ -706,9 +740,8 @@ static bool prepareDataScan(SStreamBlockScanInfo* pInfo) {
|
|||
SStreamAggSupporter* pAggSup = pInfo->sessionSup.pStreamAggSup;
|
||||
int64_t gap = pInfo->sessionSup.gap;
|
||||
int32_t winIndex = 0;
|
||||
SResultWindowInfo* pCurWin =
|
||||
getSessionTimeWindow(pAggSup, tsCols[pInfo->updateResIndex], INT64_MIN,
|
||||
pSDB->info.groupId, gap, &winIndex);
|
||||
SResultWindowInfo* pCurWin =
|
||||
getSessionTimeWindow(pAggSup, tsCols[pInfo->updateResIndex], INT64_MIN, pSDB->info.groupId, gap, &winIndex);
|
||||
win = pCurWin->win;
|
||||
pInfo->updateResIndex +=
|
||||
updateSessionWindowInfo(pCurWin, tsCols, NULL, pSDB->info.rows, pInfo->updateResIndex, gap, NULL);
|
||||
|
@ -789,23 +822,23 @@ static SSDataBlock* doDataScan(SStreamBlockScanInfo* pInfo) {
|
|||
if (!pResult) {
|
||||
return NULL;
|
||||
}
|
||||
|
||||
|
||||
if (pResult->info.groupId == pInfo->groupId) {
|
||||
return pResult;
|
||||
}
|
||||
}
|
||||
|
||||
/* Todo(liuyao) for partition by column
|
||||
SSDataBlock* pBlock = createOneDataBlock(pResult, true);
|
||||
blockDataCleanup(pResult);
|
||||
for (int32_t i = 0; i < pBlock->info.rows; i++) {
|
||||
uint64_t id = getGroupId(pInfo->pOperatorDumy, pBlock, i);
|
||||
if (id == pInfo->groupId) {
|
||||
copyOneRow(pResult, pBlock, i);
|
||||
/* Todo(liuyao) for partition by column
|
||||
SSDataBlock* pBlock = createOneDataBlock(pResult, true);
|
||||
blockDataCleanup(pResult);
|
||||
for (int32_t i = 0; i < pBlock->info.rows; i++) {
|
||||
uint64_t id = getGroupId(pInfo->pOperatorDumy, pBlock, i);
|
||||
if (id == pInfo->groupId) {
|
||||
copyOneRow(pResult, pBlock, i);
|
||||
}
|
||||
}
|
||||
}
|
||||
return pResult;
|
||||
*/
|
||||
return pResult;
|
||||
*/
|
||||
}
|
||||
|
||||
static void setUpdateData(SStreamBlockScanInfo* pInfo, SSDataBlock* pBlock, SSDataBlock* pUpdateBlock) {
|
||||
|
@ -820,7 +853,7 @@ static void setUpdateData(SStreamBlockScanInfo* pInfo, SSDataBlock* pBlock, SSDa
|
|||
int32_t rowId = *(int32_t*)taosArrayGet(pInfo->tsArray, pInfo->tsArrayIndex);
|
||||
pInfo->groupId = getGroupId(pInfo->pOperatorDumy, pBlock, rowId);
|
||||
int32_t i = 0;
|
||||
for ( ; i < size; i++) {
|
||||
for (; i < size; i++) {
|
||||
rowId = *(int32_t*)taosArrayGet(pInfo->tsArray, i + pInfo->tsArrayIndex);
|
||||
uint64_t id = getGroupId(pInfo->pOperatorDumy, pBlock, rowId);
|
||||
if (pInfo->groupId != id) {
|
||||
|
@ -1050,9 +1083,6 @@ SOperatorInfo* createStreamScanOperatorInfo(void* pDataReader, SReadHandle* pHan
|
|||
SScanPhysiNode* pScanPhyNode = &pTableScanNode->scan;
|
||||
|
||||
SDataBlockDescNode* pDescNode = pScanPhyNode->node.pOutputDataBlockDesc;
|
||||
SOperatorInfo* pTableScanDummy = createTableScanOperatorInfo(pTableScanNode, pDataReader, pHandle, pTaskInfo);
|
||||
|
||||
STableScanInfo* pSTInfo = (STableScanInfo*)pTableScanDummy->info;
|
||||
|
||||
int32_t numOfCols = 0;
|
||||
pInfo->pColMatchInfo = extractColMatchInfo(pScanPhyNode->pScanCols, pDescNode, &numOfCols, COL_MATCH_FROM_COL_ID);
|
||||
|
@ -1069,16 +1099,6 @@ SOperatorInfo* createStreamScanOperatorInfo(void* pDataReader, SReadHandle* pHan
|
|||
}
|
||||
}
|
||||
|
||||
// set the extract column id to streamHandle
|
||||
tqReadHandleSetColIdList((STqReadHandle*)pHandle->reader, pColIds);
|
||||
SArray* tableIdList = extractTableIdList(&pTaskInfo->tableqinfoList);
|
||||
int32_t code = tqReadHandleSetTbUidList(pHandle->reader, tableIdList);
|
||||
if (code != 0) {
|
||||
taosArrayDestroy(tableIdList);
|
||||
goto _error;
|
||||
}
|
||||
taosArrayDestroy(tableIdList);
|
||||
|
||||
pInfo->pBlockLists = taosArrayInit(4, POINTER_BYTES);
|
||||
if (pInfo->pBlockLists == NULL) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
|
@ -1090,10 +1110,31 @@ SOperatorInfo* createStreamScanOperatorInfo(void* pDataReader, SReadHandle* pHan
|
|||
goto _error;
|
||||
}
|
||||
|
||||
if (pSTInfo->interval.interval > 0 && pDataReader) {
|
||||
pInfo->pUpdateInfo = updateInfoInitP(&pSTInfo->interval, pTwSup->waterMark);
|
||||
} else {
|
||||
pInfo->pUpdateInfo = NULL;
|
||||
if (pHandle) {
|
||||
SOperatorInfo* pTableScanDummy = createTableScanOperatorInfo(pTableScanNode, pDataReader, pHandle, pTaskInfo);
|
||||
STableScanInfo* pSTInfo = (STableScanInfo*)pTableScanDummy->info;
|
||||
if (pSTInfo->interval.interval > 0) {
|
||||
pInfo->pUpdateInfo = updateInfoInitP(&pSTInfo->interval, pTwSup->waterMark);
|
||||
} else {
|
||||
pInfo->pUpdateInfo = NULL;
|
||||
}
|
||||
pInfo->pOperatorDumy = pTableScanDummy;
|
||||
pInfo->interval = pSTInfo->interval;
|
||||
|
||||
pInfo->readHandle = *pHandle;
|
||||
ASSERT(pHandle->reader);
|
||||
pInfo->streamBlockReader = pHandle->reader;
|
||||
pInfo->tableUid = pScanPhyNode->uid;
|
||||
|
||||
// set the extract column id to streamHandle
|
||||
tqReadHandleSetColIdList((STqReadHandle*)pHandle->reader, pColIds);
|
||||
SArray* tableIdList = extractTableIdList(&pTaskInfo->tableqinfoList);
|
||||
int32_t code = tqReadHandleSetTbUidList(pHandle->reader, tableIdList);
|
||||
if (code != 0) {
|
||||
taosArrayDestroy(tableIdList);
|
||||
goto _error;
|
||||
}
|
||||
taosArrayDestroy(tableIdList);
|
||||
}
|
||||
|
||||
// create the pseduo columns info
|
||||
|
@ -1101,19 +1142,14 @@ SOperatorInfo* createStreamScanOperatorInfo(void* pDataReader, SReadHandle* pHan
|
|||
pInfo->pPseudoExpr = createExprInfo(pTableScanNode->scan.pScanPseudoCols, NULL, &pInfo->numOfPseudoExpr);
|
||||
}
|
||||
|
||||
pInfo->readHandle = *pHandle;
|
||||
pInfo->tableUid = pScanPhyNode->uid;
|
||||
pInfo->streamBlockReader = pHandle->reader;
|
||||
pInfo->pRes = createResDataBlock(pDescNode);
|
||||
pInfo->pUpdateRes = createResDataBlock(pDescNode);
|
||||
pInfo->pCondition = pScanPhyNode->node.pConditions;
|
||||
pInfo->pDataReader = pDataReader;
|
||||
pInfo->scanMode = STREAM_SCAN_FROM_READERHANDLE;
|
||||
pInfo->pOperatorDumy = pTableScanDummy;
|
||||
pInfo->interval = pSTInfo->interval;
|
||||
pInfo->sessionSup = (SessionWindowSupporter){.pStreamAggSup = NULL, .gap = -1};
|
||||
pInfo->groupId = 0;
|
||||
|
||||
|
||||
pOperator->name = "StreamBlockScanOperator";
|
||||
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN;
|
||||
pOperator->blocking = false;
|
||||
|
@ -1392,15 +1428,15 @@ static SSDataBlock* doSysTableScan(SOperatorInfo* pOperator) {
|
|||
|
||||
// table comment
|
||||
pColInfoData = taosArrayGet(p->pDataBlock, 8);
|
||||
if(pInfo->pCur->mr.me.ctbEntry.commentLen > 0) {
|
||||
if (pInfo->pCur->mr.me.ctbEntry.commentLen > 0) {
|
||||
char comment[TSDB_TB_COMMENT_LEN + VARSTR_HEADER_SIZE] = {0};
|
||||
STR_TO_VARSTR(comment, pInfo->pCur->mr.me.ctbEntry.comment);
|
||||
colDataAppend(pColInfoData, numOfRows, comment, false);
|
||||
}else if(pInfo->pCur->mr.me.ctbEntry.commentLen == 0) {
|
||||
} else if (pInfo->pCur->mr.me.ctbEntry.commentLen == 0) {
|
||||
char comment[VARSTR_HEADER_SIZE + VARSTR_HEADER_SIZE] = {0};
|
||||
STR_TO_VARSTR(comment, "");
|
||||
colDataAppend(pColInfoData, numOfRows, comment, false);
|
||||
}else{
|
||||
} else {
|
||||
colDataAppendNULL(pColInfoData, numOfRows);
|
||||
}
|
||||
|
||||
|
@ -1428,15 +1464,15 @@ static SSDataBlock* doSysTableScan(SOperatorInfo* pOperator) {
|
|||
|
||||
// table comment
|
||||
pColInfoData = taosArrayGet(p->pDataBlock, 8);
|
||||
if(pInfo->pCur->mr.me.ntbEntry.commentLen > 0) {
|
||||
if (pInfo->pCur->mr.me.ntbEntry.commentLen > 0) {
|
||||
char comment[TSDB_TB_COMMENT_LEN + VARSTR_HEADER_SIZE] = {0};
|
||||
STR_TO_VARSTR(comment, pInfo->pCur->mr.me.ntbEntry.comment);
|
||||
colDataAppend(pColInfoData, numOfRows, comment, false);
|
||||
}else if(pInfo->pCur->mr.me.ntbEntry.commentLen == 0) {
|
||||
} else if (pInfo->pCur->mr.me.ntbEntry.commentLen == 0) {
|
||||
char comment[VARSTR_HEADER_SIZE + VARSTR_HEADER_SIZE] = {0};
|
||||
STR_TO_VARSTR(comment, "");
|
||||
colDataAppend(pColInfoData, numOfRows, comment, false);
|
||||
}else{
|
||||
} else {
|
||||
colDataAppendNULL(pColInfoData, numOfRows);
|
||||
}
|
||||
|
||||
|
@ -1536,7 +1572,7 @@ static SSDataBlock* doSysTableScan(SOperatorInfo* pOperator) {
|
|||
}
|
||||
|
||||
extractDataBlockFromFetchRsp(pInfo->pRes, &pInfo->loadInfo, pRsp->numOfRows, pRsp->data, pRsp->compLen,
|
||||
pOperator->exprSupp.numOfExprs, startTs, NULL, pInfo->scanCols);
|
||||
pOperator->exprSupp.numOfExprs, startTs, NULL, pInfo->scanCols);
|
||||
|
||||
// todo log the filter info
|
||||
doFilterResult(pInfo);
|
||||
|
@ -1833,7 +1869,10 @@ SOperatorInfo* createTagScanOperatorInfo(SReadHandle* pReadHandle, STagScanPhysi
|
|||
int32_t num = 0;
|
||||
int32_t numOfExprs = 0;
|
||||
SExprInfo* pExprInfo = createExprInfo(pPhyNode->pScanPseudoCols, NULL, &numOfExprs);
|
||||
SArray* colList = extractColMatchInfo(pPhyNode->pScanPseudoCols, pDescNode, &num, COL_MATCH_FROM_COL_ID);
|
||||
SArray* colList = extractColMatchInfo(pPhyNode->pScanPseudoCols, pDescNode, &num, COL_MATCH_FROM_COL_ID);
|
||||
|
||||
|
||||
initExprSupp(&pOperator->exprSupp, pExprInfo, numOfExprs);
|
||||
|
||||
pInfo->pTableList = pTableListInfo;
|
||||
pInfo->pColMatchInfo = colList;
|
||||
|
@ -1842,13 +1881,12 @@ SOperatorInfo* createTagScanOperatorInfo(SReadHandle* pReadHandle, STagScanPhysi
|
|||
pInfo->curPos = 0;
|
||||
pInfo->pFilterNode = pPhyNode->node.pConditions;
|
||||
|
||||
pOperator->name = "TagScanOperator";
|
||||
pOperator->name = "TagScanOperator";
|
||||
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_TAG_SCAN;
|
||||
|
||||
pOperator->blocking = false;
|
||||
pOperator->status = OP_NOT_OPENED;
|
||||
pOperator->info = pInfo;
|
||||
pOperator->exprSupp.pExprInfo = pExprInfo;
|
||||
pOperator->exprSupp.numOfExprs = numOfExprs;
|
||||
pOperator->pTaskInfo = pTaskInfo;
|
||||
|
||||
initResultSizeInfo(pOperator, 4096);
|
||||
|
@ -1918,8 +1956,7 @@ typedef struct STableMergeScanInfo {
|
|||
int32_t createMultipleDataReaders(STableScanPhysiNode* pTableScanNode, SReadHandle* pHandle,
|
||||
STableListInfo* pTableListInfo, SArray* arrayReader, uint64_t queryId,
|
||||
uint64_t taskId, SNode* pTagCond) {
|
||||
int32_t code =
|
||||
getTableList(pHandle->meta, &pTableScanNode->scan, pTableListInfo, pTagCond);
|
||||
int32_t code = getTableList(pHandle->meta, &pTableScanNode->scan, pTableListInfo, pTagCond);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
goto _error;
|
||||
}
|
||||
|
@ -1956,7 +1993,7 @@ _error:
|
|||
|
||||
static int32_t loadDataBlockFromOneTable(SOperatorInfo* pOperator, STableMergeScanInfo* pTableScanInfo,
|
||||
int32_t readerIdx, SSDataBlock* pBlock, uint32_t* status) {
|
||||
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
|
||||
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
|
||||
STableMergeScanInfo* pInfo = pOperator->info;
|
||||
|
||||
SFileBlockLoadRecorder* pCost = &pTableScanInfo->readRecorder;
|
||||
|
@ -2143,9 +2180,8 @@ int32_t doOpenTableMergeScanOperator(SOperatorInfo* pOperator) {
|
|||
|
||||
int32_t numOfBufPage = pInfo->sortBufSize / pInfo->bufPageSize;
|
||||
|
||||
pInfo->pSortHandle =
|
||||
tsortCreateSortHandle(pInfo->pSortInfo, SORT_MULTISOURCE_MERGE, pInfo->bufPageSize,
|
||||
numOfBufPage, pInfo->pSortInputBlock, pTaskInfo->id.str);
|
||||
pInfo->pSortHandle = tsortCreateSortHandle(pInfo->pSortInfo, SORT_MULTISOURCE_MERGE, pInfo->bufPageSize, numOfBufPage,
|
||||
pInfo->pSortInputBlock, pTaskInfo->id.str);
|
||||
|
||||
tsortSetFetchRawDataFp(pInfo->pSortHandle, getTableDataBlock, NULL, NULL);
|
||||
|
||||
|
@ -2209,8 +2245,7 @@ SSDataBlock* doTableMergeScan(SOperatorInfo* pOperator) {
|
|||
longjmp(pTaskInfo->env, code);
|
||||
}
|
||||
|
||||
SSDataBlock* pBlock =
|
||||
getSortedTableMergeScanBlockData(pInfo->pSortHandle, pOperator->resultInfo.capacity, pOperator);
|
||||
SSDataBlock* pBlock = getSortedTableMergeScanBlockData(pInfo->pSortHandle, pOperator->resultInfo.capacity, pOperator);
|
||||
|
||||
if (pBlock != NULL) {
|
||||
pOperator->resultInfo.totalRows += pBlock->info.rows;
|
||||
|
@ -2243,20 +2278,20 @@ void destroyTableMergeScanOperatorInfo(void* param, int32_t numOfOutput) {
|
|||
|
||||
typedef struct STableMergeScanExecInfo {
|
||||
SFileBlockLoadRecorder blockRecorder;
|
||||
SSortExecInfo sortExecInfo;
|
||||
SSortExecInfo sortExecInfo;
|
||||
} STableMergeScanExecInfo;
|
||||
|
||||
int32_t getTableMergeScanExplainExecInfo(SOperatorInfo* pOptr, void** pOptrExplain, uint32_t* len) {
|
||||
ASSERT(pOptr != NULL);
|
||||
// TODO: merge these two info into one struct
|
||||
STableMergeScanExecInfo* execInfo = taosMemoryCalloc(1, sizeof(STableMergeScanExecInfo));
|
||||
STableMergeScanInfo* pInfo = pOptr->info;
|
||||
STableMergeScanInfo* pInfo = pOptr->info;
|
||||
execInfo->blockRecorder = pInfo->readRecorder;
|
||||
execInfo->sortExecInfo = tsortGetSortExecInfo(pInfo->pSortHandle);
|
||||
|
||||
*pOptrExplain = execInfo;
|
||||
*len = sizeof(STableMergeScanExecInfo);
|
||||
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
|
@ -2271,8 +2306,7 @@ SOperatorInfo* createTableMergeScanOperatorInfo(STableScanPhysiNode* pTableScanN
|
|||
SDataBlockDescNode* pDescNode = pTableScanNode->scan.node.pOutputDataBlockDesc;
|
||||
|
||||
int32_t numOfCols = 0;
|
||||
SArray* pColList =
|
||||
extractColMatchInfo(pTableScanNode->scan.pScanCols, pDescNode, &numOfCols, COL_MATCH_FROM_COL_ID);
|
||||
SArray* pColList = extractColMatchInfo(pTableScanNode->scan.pScanCols, pDescNode, &numOfCols, COL_MATCH_FROM_COL_ID);
|
||||
|
||||
int32_t code = initQueryTableDataCond(&pInfo->cond, pTableScanNode);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
|
@ -2286,16 +2320,16 @@ SOperatorInfo* createTableMergeScanOperatorInfo(STableScanPhysiNode* pTableScanN
|
|||
|
||||
pInfo->scanInfo = (SScanInfo){.numOfAsc = pTableScanNode->scanSeq[0], .numOfDesc = pTableScanNode->scanSeq[1]};
|
||||
|
||||
pInfo->readHandle = *readHandle;
|
||||
pInfo->interval = extractIntervalInfo(pTableScanNode);
|
||||
pInfo->readHandle = *readHandle;
|
||||
pInfo->interval = extractIntervalInfo(pTableScanNode);
|
||||
pInfo->sample.sampleRatio = pTableScanNode->ratio;
|
||||
pInfo->sample.seed = taosGetTimestampSec();
|
||||
pInfo->dataBlockLoadFlag = pTableScanNode->dataRequired;
|
||||
pInfo->pFilterNode = pTableScanNode->scan.node.pConditions;
|
||||
pInfo->dataReaders = dataReaders;
|
||||
pInfo->scanFlag = MAIN_SCAN;
|
||||
pInfo->pColMatchInfo = pColList;
|
||||
pInfo->curTWinIdx = 0;
|
||||
pInfo->sample.seed = taosGetTimestampSec();
|
||||
pInfo->dataBlockLoadFlag = pTableScanNode->dataRequired;
|
||||
pInfo->pFilterNode = pTableScanNode->scan.node.pConditions;
|
||||
pInfo->dataReaders = dataReaders;
|
||||
pInfo->scanFlag = MAIN_SCAN;
|
||||
pInfo->pColMatchInfo = pColList;
|
||||
pInfo->curTWinIdx = 0;
|
||||
|
||||
pInfo->pResBlock = createResDataBlock(pDescNode);
|
||||
|
||||
|
@ -2313,22 +2347,22 @@ SOperatorInfo* createTableMergeScanOperatorInfo(STableScanPhysiNode* pTableScanN
|
|||
pInfo->pSortInputBlock = createOneDataBlock(pInfo->pResBlock, false);
|
||||
|
||||
int32_t rowSize = pInfo->pResBlock->info.rowSize;
|
||||
pInfo->bufPageSize = getProperSortPageSize(rowSize);
|
||||
pInfo->bufPageSize = getProperSortPageSize(rowSize);
|
||||
|
||||
// todo the total available buffer should be determined by total capacity of buffer of this task.
|
||||
// the additional one is reserved for merge result
|
||||
pInfo->sortBufSize = pInfo->bufPageSize * (taosArrayGetSize(dataReaders) + 1);
|
||||
pInfo->hasGroupId = false;
|
||||
pInfo->sortBufSize = pInfo->bufPageSize * (taosArrayGetSize(dataReaders) + 1);
|
||||
pInfo->hasGroupId = false;
|
||||
pInfo->prefetchedTuple = NULL;
|
||||
|
||||
pOperator->name = "TableMergeScanOperator";
|
||||
pOperator->name = "TableMergeScanOperator";
|
||||
// TODO : change it
|
||||
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_TABLE_MERGE_SCAN;
|
||||
pOperator->blocking = false;
|
||||
pOperator->status = OP_NOT_OPENED;
|
||||
pOperator->info = pInfo;
|
||||
pOperator->exprSupp.numOfExprs = numOfCols;
|
||||
pOperator->pTaskInfo = pTaskInfo;
|
||||
pOperator->blocking = false;
|
||||
pOperator->status = OP_NOT_OPENED;
|
||||
pOperator->info = pInfo;
|
||||
pOperator->exprSupp.numOfExprs = numOfCols;
|
||||
pOperator->pTaskInfo = pTaskInfo;
|
||||
initResultSizeInfo(pOperator, 1024);
|
||||
|
||||
pOperator->fpSet =
|
||||
|
|
|
@ -192,6 +192,8 @@ int32_t twaFunction(SqlFunctionCtx *pCtx);
|
|||
int32_t twaFinalize(struct SqlFunctionCtx *pCtx, SSDataBlock* pBlock);
|
||||
|
||||
bool getSelectivityFuncEnv(SFunctionNode* pFunc, SFuncExecEnv* pEnv);
|
||||
|
||||
bool blockDistSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResultInfo);
|
||||
int32_t blockDistFunction(SqlFunctionCtx *pCtx);
|
||||
int32_t blockDistFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock);
|
||||
|
||||
|
|
|
@ -419,7 +419,7 @@ static int32_t translateTopBot(SFunctionNode* pFunc, char* pErrBuf, int32_t len)
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t topCreateMergePara(SNodeList* pRawParameters, SNode* pPartialRes, SNodeList** pParameters) {
|
||||
int32_t topBotCreateMergePara(SNodeList* pRawParameters, SNode* pPartialRes, SNodeList** pParameters) {
|
||||
int32_t code = nodesListMakeAppend(pParameters, pPartialRes);
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = nodesListStrictAppend(*pParameters, nodesCloneNode(nodesListGetNode(pRawParameters, 1)));
|
||||
|
@ -427,65 +427,6 @@ int32_t topCreateMergePara(SNodeList* pRawParameters, SNode* pPartialRes, SNodeL
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static int32_t translateTopBotImpl(SFunctionNode* pFunc, char* pErrBuf, int32_t len, bool isPartial) {
|
||||
int32_t numOfParams = LIST_LENGTH(pFunc->pParameterList);
|
||||
|
||||
if (isPartial) {
|
||||
if (2 != numOfParams) {
|
||||
return invaildFuncParaNumErrMsg(pErrBuf, len, pFunc->functionName);
|
||||
}
|
||||
|
||||
uint8_t para1Type = ((SExprNode*)nodesListGetNode(pFunc->pParameterList, 0))->resType.type;
|
||||
uint8_t para2Type = ((SExprNode*)nodesListGetNode(pFunc->pParameterList, 1))->resType.type;
|
||||
if (!IS_NUMERIC_TYPE(para1Type) || !IS_INTEGER_TYPE(para2Type)) {
|
||||
return invaildFuncParaTypeErrMsg(pErrBuf, len, pFunc->functionName);
|
||||
}
|
||||
|
||||
// param1
|
||||
SNode* pParamNode1 = nodesListGetNode(pFunc->pParameterList, 1);
|
||||
if (nodeType(pParamNode1) != QUERY_NODE_VALUE) {
|
||||
return invaildFuncParaTypeErrMsg(pErrBuf, len, pFunc->functionName);
|
||||
}
|
||||
|
||||
SValueNode* pValue = (SValueNode*)pParamNode1;
|
||||
if (pValue->node.resType.type != TSDB_DATA_TYPE_BIGINT) {
|
||||
return invaildFuncParaTypeErrMsg(pErrBuf, len, pFunc->functionName);
|
||||
}
|
||||
|
||||
if (pValue->datum.i < 1 || pValue->datum.i > 100) {
|
||||
return invaildFuncParaValueErrMsg(pErrBuf, len, pFunc->functionName);
|
||||
}
|
||||
|
||||
pValue->notReserved = true;
|
||||
|
||||
// set result type
|
||||
pFunc->node.resType =
|
||||
(SDataType){.bytes = getTopBotInfoSize(pValue->datum.i) + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY};
|
||||
} else {
|
||||
if (1 != numOfParams) {
|
||||
return invaildFuncParaNumErrMsg(pErrBuf, len, pFunc->functionName);
|
||||
}
|
||||
|
||||
uint8_t para1Type = ((SExprNode*)nodesListGetNode(pFunc->pParameterList, 0))->resType.type;
|
||||
if (TSDB_DATA_TYPE_BINARY != para1Type) {
|
||||
return invaildFuncParaTypeErrMsg(pErrBuf, len, pFunc->functionName);
|
||||
}
|
||||
|
||||
// Do nothing. We can only access output of partial functions as input,
|
||||
// so original input type cannot be obtained, resType will be set same
|
||||
// as original function input type after merge function created.
|
||||
}
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static int32_t translateTopBotPartial(SFunctionNode* pFunc, char* pErrBuf, int32_t len) {
|
||||
return translateTopBotImpl(pFunc, pErrBuf, len, true);
|
||||
}
|
||||
|
||||
static int32_t translateTopBotMerge(SFunctionNode* pFunc, char* pErrBuf, int32_t len) {
|
||||
return translateTopBotImpl(pFunc, pErrBuf, len, false);
|
||||
}
|
||||
|
||||
static int32_t translateSpread(SFunctionNode* pFunc, char* pErrBuf, int32_t len) {
|
||||
if (1 != LIST_LENGTH(pFunc->pParameterList)) {
|
||||
return invaildFuncParaNumErrMsg(pErrBuf, len, pFunc->functionName);
|
||||
|
@ -1735,31 +1676,9 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
|
|||
.processFunc = topFunction,
|
||||
.finalizeFunc = topBotFinalize,
|
||||
.combineFunc = topCombine,
|
||||
.pPartialFunc = "_top_partial",
|
||||
.pMergeFunc = "_top_merge",
|
||||
// .createMergeParaFuc = topCreateMergePara
|
||||
},
|
||||
{
|
||||
.name = "_top_partial",
|
||||
.type = FUNCTION_TYPE_TOP_PARTIAL,
|
||||
.classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_SELECT_FUNC | FUNC_MGT_INDEFINITE_ROWS_FUNC,
|
||||
.translateFunc = translateTopBotPartial,
|
||||
.getEnvFunc = getTopBotFuncEnv,
|
||||
.initFunc = topBotFunctionSetup,
|
||||
.processFunc = topFunction,
|
||||
.finalizeFunc = topBotPartialFinalize,
|
||||
.combineFunc = topCombine,
|
||||
},
|
||||
{
|
||||
.name = "_top_merge",
|
||||
.type = FUNCTION_TYPE_TOP_MERGE,
|
||||
.classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_SELECT_FUNC | FUNC_MGT_INDEFINITE_ROWS_FUNC,
|
||||
.translateFunc = translateTopBotMerge,
|
||||
.getEnvFunc = getTopBotMergeFuncEnv,
|
||||
.initFunc = functionSetup,
|
||||
.processFunc = topFunctionMerge,
|
||||
.finalizeFunc = topBotMergeFinalize,
|
||||
.combineFunc = topCombine,
|
||||
.pPartialFunc = "top",
|
||||
.pMergeFunc = "top",
|
||||
.createMergeParaFuc = topBotCreateMergePara
|
||||
},
|
||||
{
|
||||
.name = "bottom",
|
||||
|
@ -1771,30 +1690,9 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
|
|||
.processFunc = bottomFunction,
|
||||
.finalizeFunc = topBotFinalize,
|
||||
.combineFunc = bottomCombine,
|
||||
.pPartialFunc = "_bottom_partial",
|
||||
.pMergeFunc = "_bottom_merge"
|
||||
},
|
||||
{
|
||||
.name = "_bottom_partial",
|
||||
.type = FUNCTION_TYPE_BOTTOM_PARTIAL,
|
||||
.classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_SELECT_FUNC | FUNC_MGT_INDEFINITE_ROWS_FUNC,
|
||||
.translateFunc = translateTopBotPartial,
|
||||
.getEnvFunc = getTopBotFuncEnv,
|
||||
.initFunc = topBotFunctionSetup,
|
||||
.processFunc = bottomFunction,
|
||||
.finalizeFunc = topBotPartialFinalize,
|
||||
.combineFunc = bottomCombine,
|
||||
},
|
||||
{
|
||||
.name = "_bottom_merge",
|
||||
.type = FUNCTION_TYPE_BOTTOM_MERGE,
|
||||
.classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_SELECT_FUNC | FUNC_MGT_INDEFINITE_ROWS_FUNC,
|
||||
.translateFunc = translateTopBotMerge,
|
||||
.getEnvFunc = getTopBotMergeFuncEnv,
|
||||
.initFunc = functionSetup,
|
||||
.processFunc = bottomFunctionMerge,
|
||||
.finalizeFunc = topBotMergeFinalize,
|
||||
.combineFunc = bottomCombine,
|
||||
.pPartialFunc = "bottom",
|
||||
.pMergeFunc = "bottom",
|
||||
.createMergeParaFuc = topBotCreateMergePara
|
||||
},
|
||||
{
|
||||
.name = "spread",
|
||||
|
@ -2524,7 +2422,9 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
|
|||
.getEnvFunc = getSelectivityFuncEnv, // todo remove this function later.
|
||||
.initFunc = functionSetup,
|
||||
.processFunc = NULL,
|
||||
.finalizeFunc = NULL
|
||||
.finalizeFunc = NULL,
|
||||
.pPartialFunc = "_select_value",
|
||||
.pMergeFunc = "_select_value"
|
||||
},
|
||||
{
|
||||
.name = "_block_dist",
|
||||
|
@ -2532,6 +2432,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
|
|||
.classification = FUNC_MGT_AGG_FUNC,
|
||||
.translateFunc = translateBlockDistFunc,
|
||||
.getEnvFunc = getBlockDistFuncEnv,
|
||||
.initFunc = blockDistSetup,
|
||||
.processFunc = blockDistFunction,
|
||||
.finalizeFunc = blockDistFinalize
|
||||
},
|
||||
|
|
|
@ -67,8 +67,7 @@ typedef struct STopBotResItem {
|
|||
|
||||
typedef struct STopBotRes {
|
||||
int32_t maxSize;
|
||||
int16_t type; // store the original input type, used in merge function
|
||||
int32_t numOfItems;
|
||||
int16_t type;
|
||||
STopBotResItem* pItems;
|
||||
} STopBotRes;
|
||||
|
||||
|
@ -2872,12 +2871,6 @@ bool getTopBotFuncEnv(SFunctionNode* pFunc, SFuncExecEnv* pEnv) {
|
|||
return true;
|
||||
}
|
||||
|
||||
bool getTopBotMergeFuncEnv(SFunctionNode* pFunc, SFuncExecEnv* pEnv) {
|
||||
// intermediate result is binary and length contains VAR header size
|
||||
pEnv->calcMemSize = pFunc->node.resType.bytes;
|
||||
return true;
|
||||
}
|
||||
|
||||
bool topBotFunctionSetup(SqlFunctionCtx* pCtx, SResultRowEntryInfo* pResInfo) {
|
||||
if (!functionSetup(pCtx, pResInfo)) {
|
||||
return false;
|
||||
|
@ -2952,50 +2945,6 @@ int32_t bottomFunction(SqlFunctionCtx* pCtx) {
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static void topBotTransferInfo(SqlFunctionCtx* pCtx, STopBotRes* pInput, bool isTopQuery) {
|
||||
for (int32_t i = 0; i < pInput->numOfItems; i++) {
|
||||
addResult(pCtx, &pInput->pItems[i], pInput->type, isTopQuery);
|
||||
}
|
||||
}
|
||||
|
||||
int32_t topFunctionMerge(SqlFunctionCtx* pCtx) {
|
||||
SResultRowEntryInfo* pEntryInfo = GET_RES_INFO(pCtx);
|
||||
SInputColumnInfoData* pInput = &pCtx->input;
|
||||
SColumnInfoData* pCol = pInput->pData[0];
|
||||
ASSERT(pCol->info.type == TSDB_DATA_TYPE_BINARY);
|
||||
|
||||
int32_t start = pInput->startRowIndex;
|
||||
char* data = colDataGetData(pCol, start);
|
||||
STopBotRes* pInputInfo = (STopBotRes*)varDataVal(data);
|
||||
STopBotRes* pInfo = GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx));
|
||||
|
||||
pInfo->maxSize = pInputInfo->maxSize;
|
||||
pInfo->type = pInputInfo->type;
|
||||
topBotTransferInfo(pCtx, pInputInfo, true);
|
||||
SET_VAL(GET_RES_INFO(pCtx), pEntryInfo->numOfRes, pEntryInfo->numOfRes);
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t bottomFunctionMerge(SqlFunctionCtx* pCtx) {
|
||||
SResultRowEntryInfo* pEntryInfo = GET_RES_INFO(pCtx);
|
||||
SInputColumnInfoData* pInput = &pCtx->input;
|
||||
SColumnInfoData* pCol = pInput->pData[0];
|
||||
ASSERT(pCol->info.type == TSDB_DATA_TYPE_BINARY);
|
||||
|
||||
int32_t start = pInput->startRowIndex;
|
||||
char* data = colDataGetData(pCol, start);
|
||||
STopBotRes* pInputInfo = (STopBotRes*)varDataVal(data);
|
||||
STopBotRes* pInfo = GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx));
|
||||
|
||||
pInfo->maxSize = pInputInfo->maxSize;
|
||||
pInfo->type = pInputInfo->type;
|
||||
topBotTransferInfo(pCtx, pInputInfo, false);
|
||||
SET_VAL(GET_RES_INFO(pCtx), pEntryInfo->numOfRes, pEntryInfo->numOfRes);
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static int32_t topBotResComparFn(const void* p1, const void* p2, const void* param) {
|
||||
uint16_t type = *(uint16_t*)param;
|
||||
|
||||
|
@ -3044,8 +2993,6 @@ void doAddIntoResult(SqlFunctionCtx* pCtx, void* pData, int32_t rowIndex, SSData
|
|||
|
||||
// allocate the buffer and keep the data of this row into the new allocated buffer
|
||||
pEntryInfo->numOfRes++;
|
||||
// accumulate number of items for each vgroup, this info is needed for merge
|
||||
pRes->numOfItems++;
|
||||
taosheapsort((void*)pItems, sizeof(STopBotResItem), pEntryInfo->numOfRes, (const void*)&type, topBotResComparFn,
|
||||
!isTopQuery);
|
||||
} else { // replace the minimum value in the result
|
||||
|
@ -3144,7 +3091,7 @@ void copyTupleData(SqlFunctionCtx* pCtx, int32_t rowIndex, const SSDataBlock* pS
|
|||
releaseBufPage(pCtx->pBuf, pPage);
|
||||
}
|
||||
|
||||
int32_t topBotFinalizeImpl(SqlFunctionCtx* pCtx, SSDataBlock* pBlock, bool isMerge) {
|
||||
int32_t topBotFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
|
||||
SResultRowEntryInfo* pEntryInfo = GET_RES_INFO(pCtx);
|
||||
STopBotRes* pRes = GET_ROWCELL_INTERBUF(pEntryInfo);
|
||||
|
||||
|
@ -3164,39 +3111,13 @@ int32_t topBotFinalizeImpl(SqlFunctionCtx* pCtx, SSDataBlock* pBlock, bool isMer
|
|||
colDataAppend(pCol, currentRow, (const char*)&pItem->v.i, false);
|
||||
}
|
||||
|
||||
if (!isMerge) {
|
||||
setSelectivityValue(pCtx, pBlock, &pRes->pItems[i].tuplePos, currentRow);
|
||||
}
|
||||
setSelectivityValue(pCtx, pBlock, &pRes->pItems[i].tuplePos, currentRow);
|
||||
currentRow += 1;
|
||||
}
|
||||
|
||||
return pEntryInfo->numOfRes;
|
||||
}
|
||||
|
||||
int32_t topBotFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) { return topBotFinalizeImpl(pCtx, pBlock, false); }
|
||||
|
||||
int32_t topBotMergeFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
|
||||
return topBotFinalizeImpl(pCtx, pBlock, true);
|
||||
}
|
||||
|
||||
int32_t topBotPartialFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
|
||||
SResultRowEntryInfo* pEntryInfo = GET_RES_INFO(pCtx);
|
||||
STopBotRes* pRes = GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx));
|
||||
int32_t resultBytes = getTopBotInfoSize(pRes->maxSize);
|
||||
char* res = taosMemoryCalloc(resultBytes + VARSTR_HEADER_SIZE, sizeof(char));
|
||||
|
||||
memcpy(varDataVal(res), pRes, resultBytes);
|
||||
varDataSetLen(res, resultBytes);
|
||||
|
||||
int32_t slotId = pCtx->pExpr->base.resSchema.slotId;
|
||||
SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, slotId);
|
||||
|
||||
colDataAppend(pCol, pBlock->info.rows, res, false);
|
||||
|
||||
taosMemoryFree(res);
|
||||
return 1;
|
||||
}
|
||||
|
||||
void addResult(SqlFunctionCtx* pCtx, STopBotResItem* pSourceItem, int16_t type, bool isTopQuery) {
|
||||
SResultRowEntryInfo* pEntryInfo = GET_RES_INFO(pCtx);
|
||||
STopBotRes* pRes = getTopBotOutputInfo(pCtx);
|
||||
|
@ -3211,8 +3132,6 @@ void addResult(SqlFunctionCtx* pCtx, STopBotResItem* pSourceItem, int16_t type,
|
|||
pItem->tuplePos.pageId = -1;
|
||||
replaceTupleData(&pItem->tuplePos, &pSourceItem->tuplePos);
|
||||
pEntryInfo->numOfRes++;
|
||||
// accumulate number of items for each vgroup, this info is needed for merge
|
||||
pRes->numOfItems++;
|
||||
taosheapsort((void*)pItems, sizeof(STopBotResItem), pEntryInfo->numOfRes, (const void*)&type, topBotResComparFn,
|
||||
!isTopQuery);
|
||||
} else { // replace the minimum value in the result
|
||||
|
@ -5004,7 +4923,19 @@ int32_t twaFinalize(struct SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
|
|||
return functionFinalize(pCtx, pBlock);
|
||||
}
|
||||
|
||||
bool blockDistSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResultInfo) {
|
||||
if (!functionSetup(pCtx, pResultInfo)) {
|
||||
return false;
|
||||
}
|
||||
|
||||
STableBlockDistInfo* pInfo = GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx));
|
||||
pInfo->minRows = INT32_MAX;
|
||||
return true;
|
||||
}
|
||||
|
||||
int32_t blockDistFunction(SqlFunctionCtx* pCtx) {
|
||||
const int32_t BLOCK_DIST_RESULT_ROWS = 24;
|
||||
|
||||
SInputColumnInfoData* pInput = &pCtx->input;
|
||||
SColumnInfoData* pInputCol = pInput->pData[0];
|
||||
|
||||
|
@ -5022,6 +4953,11 @@ int32_t blockDistFunction(SqlFunctionCtx* pCtx) {
|
|||
pDistInfo->totalRows += p1.totalRows;
|
||||
pDistInfo->numOfFiles += p1.numOfFiles;
|
||||
|
||||
pDistInfo->defMinRows = p1.defMinRows;
|
||||
pDistInfo->defMaxRows = p1.defMaxRows;
|
||||
pDistInfo->rowSize = p1.rowSize;
|
||||
pDistInfo->numOfSmallBlocks = p1.numOfSmallBlocks;
|
||||
|
||||
if (pDistInfo->minRows > p1.minRows) {
|
||||
pDistInfo->minRows = p1.minRows;
|
||||
}
|
||||
|
@ -5033,7 +4969,7 @@ int32_t blockDistFunction(SqlFunctionCtx* pCtx) {
|
|||
pDistInfo->blockRowsHisto[i] += p1.blockRowsHisto[i];
|
||||
}
|
||||
|
||||
pResInfo->numOfRes = 1;
|
||||
pResInfo->numOfRes = BLOCK_DIST_RESULT_ROWS; // default output rows
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
|
@ -5045,7 +4981,7 @@ int32_t tSerializeBlockDistInfo(void* buf, int32_t bufLen, const STableBlockDist
|
|||
if (tEncodeU32(&encoder, pInfo->rowSize) < 0) return -1;
|
||||
|
||||
if (tEncodeU16(&encoder, pInfo->numOfFiles) < 0) return -1;
|
||||
if (tEncodeU32(&encoder, pInfo->rowSize) < 0) return -1;
|
||||
if (tEncodeU32(&encoder, pInfo->numOfBlocks) < 0) return -1;
|
||||
if (tEncodeU32(&encoder, pInfo->numOfTables) < 0) return -1;
|
||||
|
||||
if (tEncodeU64(&encoder, pInfo->totalSize) < 0) return -1;
|
||||
|
@ -5076,7 +5012,7 @@ int32_t tDeserializeBlockDistInfo(void* buf, int32_t bufLen, STableBlockDistInfo
|
|||
if (tDecodeU32(&decoder, &pInfo->rowSize) < 0) return -1;
|
||||
|
||||
if (tDecodeU16(&decoder, &pInfo->numOfFiles) < 0) return -1;
|
||||
if (tDecodeU32(&decoder, &pInfo->rowSize) < 0) return -1;
|
||||
if (tDecodeU32(&decoder, &pInfo->numOfBlocks) < 0) return -1;
|
||||
if (tDecodeU32(&decoder, &pInfo->numOfTables) < 0) return -1;
|
||||
|
||||
if (tDecodeU64(&decoder, &pInfo->totalSize) < 0) return -1;
|
||||
|
@ -5098,32 +5034,29 @@ int32_t tDeserializeBlockDistInfo(void* buf, int32_t bufLen, STableBlockDistInfo
|
|||
|
||||
int32_t blockDistFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
|
||||
SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx);
|
||||
char* pData = GET_ROWCELL_INTERBUF(pResInfo);
|
||||
STableBlockDistInfo* pData = GET_ROWCELL_INTERBUF(pResInfo);
|
||||
|
||||
SColumnInfoData* pColInfo = taosArrayGet(pBlock->pDataBlock, 0);
|
||||
|
||||
int32_t row = 0;
|
||||
|
||||
STableBlockDistInfo info = {0};
|
||||
tDeserializeBlockDistInfo(varDataVal(pData), varDataLen(pData), &info);
|
||||
|
||||
char st[256] = {0};
|
||||
double totalRawSize = pData->totalRows * pData->rowSize;
|
||||
int32_t len =
|
||||
sprintf(st + VARSTR_HEADER_SIZE, "Blocks=[%d] Size=[%.3fKb] Average_Block_size=[%.3fKb] Compression_Ratio=[%.3f]",
|
||||
info.numOfBlocks, info.totalSize / 1024.0, info.totalSize / (info.numOfBlocks * 1024.0),
|
||||
info.totalSize / (info.totalRows * info.rowSize * 1.0));
|
||||
sprintf(st + VARSTR_HEADER_SIZE, "Total_Blocks=[%d] Total_Size=[%.2f Kb] Average_size=[%.2f Kb] Compression_Ratio=[%.2f %c]",
|
||||
pData->numOfBlocks, pData->totalSize / 1024.0, ((double)pData->totalSize) / pData->numOfBlocks,
|
||||
pData->totalSize * 100 / totalRawSize, '%');
|
||||
|
||||
varDataSetLen(st, len);
|
||||
colDataAppend(pColInfo, row++, st, false);
|
||||
|
||||
len = sprintf(st + VARSTR_HEADER_SIZE, "Total_Rows=[%ld] MinRows=[%d] MaxRows=[%d] Averge_Rows=[%ld] Inmem_Rows=[%d]",
|
||||
info.totalRows, info.minRows, info.maxRows, info.totalRows / info.numOfBlocks, info.numOfInmemRows);
|
||||
len = sprintf(st + VARSTR_HEADER_SIZE, "Total_Rows=[%"PRId64"] Inmem_Rows=[%d] MinRows=[%d] MaxRows=[%d] Average_Rows=[%"PRId64"]",
|
||||
pData->totalRows, pData->numOfInmemRows, pData->minRows, pData->maxRows, pData->totalRows / pData->numOfBlocks);
|
||||
|
||||
varDataSetLen(st, len);
|
||||
colDataAppend(pColInfo, row++, st, false);
|
||||
|
||||
len = sprintf(st + VARSTR_HEADER_SIZE, "Total_Tables=[%d] Total_Files=[%d] Total_Vgroups=[%d]", info.numOfTables,
|
||||
info.numOfFiles, 0);
|
||||
len = sprintf(st + VARSTR_HEADER_SIZE, "Total_Tables=[%d] Total_Files=[%d] Total_Vgroups=[%d]", pData->numOfTables,
|
||||
pData->numOfFiles, 0);
|
||||
|
||||
varDataSetLen(st, len);
|
||||
colDataAppend(pColInfo, row++, st, false);
|
||||
|
@ -5135,40 +5068,56 @@ int32_t blockDistFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
|
|||
|
||||
int32_t maxVal = 0;
|
||||
int32_t minVal = INT32_MAX;
|
||||
for (int32_t i = 0; i < sizeof(info.blockRowsHisto) / sizeof(info.blockRowsHisto[0]); ++i) {
|
||||
if (maxVal < info.blockRowsHisto[i]) {
|
||||
maxVal = info.blockRowsHisto[i];
|
||||
for (int32_t i = 0; i < tListLen(pData->blockRowsHisto); ++i) {
|
||||
if (maxVal < pData->blockRowsHisto[i]) {
|
||||
maxVal = pData->blockRowsHisto[i];
|
||||
}
|
||||
|
||||
if (minVal > info.blockRowsHisto[i]) {
|
||||
minVal = info.blockRowsHisto[i];
|
||||
if (minVal > pData->blockRowsHisto[i]) {
|
||||
minVal = pData->blockRowsHisto[i];
|
||||
}
|
||||
}
|
||||
|
||||
int32_t delta = maxVal - minVal;
|
||||
int32_t step = delta / 50;
|
||||
if (step == 0) {
|
||||
step = 1;
|
||||
}
|
||||
|
||||
int32_t numOfBuckets = sizeof(info.blockRowsHisto) / sizeof(info.blockRowsHisto[0]);
|
||||
int32_t bucketRange = (info.maxRows - info.minRows) / numOfBuckets;
|
||||
int32_t numOfBuckets = sizeof(pData->blockRowsHisto) / sizeof(pData->blockRowsHisto[0]);
|
||||
int32_t bucketRange = (pData->maxRows - pData->minRows) / numOfBuckets;
|
||||
|
||||
for (int32_t i = 0; i < 20; ++i) {
|
||||
len += sprintf(st + VARSTR_HEADER_SIZE, "%04d |", info.defMinRows + bucketRange * (i + 1));
|
||||
bool singleModel = false;
|
||||
if (bucketRange == 0) {
|
||||
singleModel = true;
|
||||
step = 20;
|
||||
bucketRange = (pData->defMaxRows - pData->defMinRows) / numOfBuckets;
|
||||
}
|
||||
|
||||
for (int32_t i = 0; i < tListLen(pData->blockRowsHisto); ++i) {
|
||||
len = sprintf(st + VARSTR_HEADER_SIZE, "%04d |", pData->defMinRows + bucketRange * (i + 1));
|
||||
|
||||
int32_t num = 0;
|
||||
if (singleModel && pData->blockRowsHisto[i] > 0) {
|
||||
num = 20;
|
||||
} else {
|
||||
num = (pData->blockRowsHisto[i] + step - 1) / step;
|
||||
}
|
||||
|
||||
int32_t num = (info.blockRowsHisto[i] + step - 1) / step;
|
||||
for (int32_t j = 0; j < num; ++j) {
|
||||
int32_t x = sprintf(st + VARSTR_HEADER_SIZE + len, "%c", '|');
|
||||
len += x;
|
||||
}
|
||||
|
||||
double v = info.blockRowsHisto[i] * 100.0 / info.numOfBlocks;
|
||||
len += sprintf(st + VARSTR_HEADER_SIZE + len, " %d (%.3f%c)", info.blockRowsHisto[i], v, '%');
|
||||
double v = pData->blockRowsHisto[i] * 100.0 / pData->numOfBlocks;
|
||||
len += sprintf(st + VARSTR_HEADER_SIZE + len, " %d (%.2f%c)", pData->blockRowsHisto[i], v, '%');
|
||||
printf("%s\n", st);
|
||||
|
||||
varDataSetLen(st, len);
|
||||
colDataAppend(pColInfo, row++, st, false);
|
||||
}
|
||||
|
||||
return row;
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
typedef struct SDerivInfo {
|
||||
|
|
|
@ -419,6 +419,24 @@ static int32_t collectMetaKeyFromDelete(SCollectMetaKeyCxt* pCxt, SDeleteStmt* p
|
|||
return collectMetaKeyFromRealTableImpl(pCxt, (SRealTableNode*)pStmt->pFromTable, AUTH_TYPE_WRITE);
|
||||
}
|
||||
|
||||
static int32_t collectMetaKeyFromShowBlockDist(SCollectMetaKeyCxt* pCxt, SShowTableDistributedStmt* pStmt) {
|
||||
SName name = {.type = TSDB_TABLE_NAME_T, .acctId = pCxt->pParseCxt->acctId};
|
||||
strcpy(name.dbname, pStmt->dbName);
|
||||
strcpy(name.tname, pStmt->tableName);
|
||||
int32_t code = catalogRemoveTableMeta(pCxt->pParseCxt->pCatalog, &name);
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = reserveTableMetaInCache(pCxt->pParseCxt->acctId, pStmt->dbName, pStmt->tableName, pCxt->pMetaCache);
|
||||
}
|
||||
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = reserveTableVgroupInCache(pCxt->pParseCxt->acctId, pStmt->dbName, pStmt->tableName, pCxt->pMetaCache);
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = reserveDbVgInfoInCache(pCxt->pParseCxt->acctId, pStmt->dbName, pCxt->pMetaCache);
|
||||
}
|
||||
return code;
|
||||
}
|
||||
|
||||
static int32_t collectMetaKeyFromQuery(SCollectMetaKeyCxt* pCxt, SNode* pStmt) {
|
||||
pCxt->pStmt = pStmt;
|
||||
switch (nodeType(pStmt)) {
|
||||
|
@ -497,6 +515,8 @@ static int32_t collectMetaKeyFromQuery(SCollectMetaKeyCxt* pCxt, SNode* pStmt) {
|
|||
return collectMetaKeyFromShowTransactions(pCxt, (SShowStmt*)pStmt);
|
||||
case QUERY_NODE_DELETE_STMT:
|
||||
return collectMetaKeyFromDelete(pCxt, (SDeleteStmt*)pStmt);
|
||||
case QUERY_NODE_SHOW_TABLE_DISTRIBUTED_STMT:
|
||||
return collectMetaKeyFromShowBlockDist(pCxt, (SShowTableDistributedStmt*)pStmt);
|
||||
default:
|
||||
break;
|
||||
}
|
||||
|
|
|
@ -36,11 +36,11 @@ int32_t tEncodeSStreamTask(SEncoder* pEncoder, const SStreamTask* pTask) {
|
|||
if (tEncodeI32(pEncoder, pTask->taskId) < 0) return -1;
|
||||
if (tEncodeI8(pEncoder, pTask->inputType) < 0) return -1;
|
||||
if (tEncodeI8(pEncoder, pTask->status) < 0) return -1;
|
||||
if (tEncodeI8(pEncoder, pTask->sourceType) < 0) return -1;
|
||||
if (tEncodeI8(pEncoder, pTask->execType) < 0) return -1;
|
||||
if (tEncodeI8(pEncoder, pTask->sinkType) < 0) return -1;
|
||||
if (tEncodeI8(pEncoder, pTask->dispatchType) < 0) return -1;
|
||||
if (tEncodeI16(pEncoder, pTask->dispatchMsgType) < 0) return -1;
|
||||
if (tEncodeI8(pEncoder, pTask->dataScan) < 0) return -1;
|
||||
|
||||
if (tEncodeI32(pEncoder, pTask->childId) < 0) return -1;
|
||||
if (tEncodeI32(pEncoder, pTask->nodeId) < 0) return -1;
|
||||
|
@ -84,11 +84,11 @@ int32_t tDecodeSStreamTask(SDecoder* pDecoder, SStreamTask* pTask) {
|
|||
if (tDecodeI32(pDecoder, &pTask->taskId) < 0) return -1;
|
||||
if (tDecodeI8(pDecoder, &pTask->inputType) < 0) return -1;
|
||||
if (tDecodeI8(pDecoder, &pTask->status) < 0) return -1;
|
||||
if (tDecodeI8(pDecoder, &pTask->sourceType) < 0) return -1;
|
||||
if (tDecodeI8(pDecoder, &pTask->execType) < 0) return -1;
|
||||
if (tDecodeI8(pDecoder, &pTask->sinkType) < 0) return -1;
|
||||
if (tDecodeI8(pDecoder, &pTask->dispatchType) < 0) return -1;
|
||||
if (tDecodeI16(pDecoder, &pTask->dispatchMsgType) < 0) return -1;
|
||||
if (tDecodeI8(pDecoder, &pTask->dataScan) < 0) return -1;
|
||||
|
||||
if (tDecodeI32(pDecoder, &pTask->childId) < 0) return -1;
|
||||
if (tDecodeI32(pDecoder, &pTask->nodeId) < 0) return -1;
|
||||
|
|
Loading…
Reference in New Issue