feat(stream): support snode
This commit is contained in:
parent
384b02d2a2
commit
53ef66961a
|
@ -88,9 +88,9 @@ int32_t create_stream() {
|
||||||
/*const char* sql = "select min(k), max(k), sum(k) as sum_of_k from st1";*/
|
/*const char* sql = "select min(k), max(k), sum(k) as sum_of_k from st1";*/
|
||||||
/*const char* sql = "select sum(k) from tu1 interval(10m)";*/
|
/*const char* sql = "select sum(k) from tu1 interval(10m)";*/
|
||||||
/*pRes = tmq_create_stream(pConn, "stream1", "out1", sql);*/
|
/*pRes = tmq_create_stream(pConn, "stream1", "out1", sql);*/
|
||||||
pRes = taos_query(pConn,
|
pRes = taos_query(
|
||||||
"create stream stream1 trigger at_once into abc2.outstb as select _wstartts, sum(k) from st1 "
|
pConn,
|
||||||
"partition by tbname interval(10m) ");
|
"create stream stream1 trigger at_once into abc1.outstb as select _wstartts, sum(k) from st1 interval(10m) ");
|
||||||
if (taos_errno(pRes) != 0) {
|
if (taos_errno(pRes) != 0) {
|
||||||
printf("failed to create stream stream1, reason:%s\n", taos_errstr(pRes));
|
printf("failed to create stream stream1, reason:%s\n", taos_errstr(pRes));
|
||||||
return -1;
|
return -1;
|
||||||
|
@ -107,11 +107,4 @@ int main(int argc, char* argv[]) {
|
||||||
code = init_env();
|
code = init_env();
|
||||||
}
|
}
|
||||||
create_stream();
|
create_stream();
|
||||||
#if 0
|
|
||||||
tmq_t* tmq = build_consumer();
|
|
||||||
tmq_list_t* topic_list = build_topic_list();
|
|
||||||
/*perf_loop(tmq, topic_list);*/
|
|
||||||
/*basic_consume_loop(tmq, topic_list);*/
|
|
||||||
sync_consume_loop(tmq, topic_list);
|
|
||||||
#endif
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -34,7 +34,6 @@ typedef enum {
|
||||||
WRITE_QUEUE,
|
WRITE_QUEUE,
|
||||||
APPLY_QUEUE,
|
APPLY_QUEUE,
|
||||||
SYNC_QUEUE,
|
SYNC_QUEUE,
|
||||||
MERGE_QUEUE,
|
|
||||||
QUEUE_MAX,
|
QUEUE_MAX,
|
||||||
} EQueueType;
|
} EQueueType;
|
||||||
|
|
||||||
|
|
|
@ -16,8 +16,8 @@
|
||||||
#ifndef _TD_SNODE_H_
|
#ifndef _TD_SNODE_H_
|
||||||
#define _TD_SNODE_H_
|
#define _TD_SNODE_H_
|
||||||
|
|
||||||
#include "tmsgcb.h"
|
|
||||||
#include "tmsg.h"
|
#include "tmsg.h"
|
||||||
|
#include "tmsgcb.h"
|
||||||
#include "trpc.h"
|
#include "trpc.h"
|
||||||
|
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
|
@ -68,8 +68,8 @@ int32_t sndGetLoad(SSnode *pSnode, SSnodeLoad *pLoad);
|
||||||
* @param pMsg The request message
|
* @param pMsg The request message
|
||||||
* @param pRsp The response message
|
* @param pRsp The response message
|
||||||
*/
|
*/
|
||||||
void sndProcessUMsg(SSnode *pSnode, SRpcMsg *pMsg);
|
int32_t sndProcessUMsg(SSnode *pSnode, SRpcMsg *pMsg);
|
||||||
void sndProcessSMsg(SSnode *pSnode, SRpcMsg *pMsg);
|
int32_t sndProcessSMsg(SSnode *pSnode, SRpcMsg *pMsg);
|
||||||
|
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
}
|
}
|
||||||
|
|
|
@ -36,7 +36,6 @@ typedef struct SPlanContext {
|
||||||
int64_t watermark;
|
int64_t watermark;
|
||||||
char* pMsg;
|
char* pMsg;
|
||||||
int32_t msgLen;
|
int32_t msgLen;
|
||||||
// double filesFactor;
|
|
||||||
} SPlanContext;
|
} SPlanContext;
|
||||||
|
|
||||||
// Create the physical plan for the query, according to the AST.
|
// Create the physical plan for the query, according to the AST.
|
||||||
|
|
|
@ -152,7 +152,7 @@ void* streamDataBlockDecode(const void* buf, SStreamDataBlock* pInput);
|
||||||
typedef struct {
|
typedef struct {
|
||||||
char* qmsg;
|
char* qmsg;
|
||||||
// followings are not applicable to encoder and decoder
|
// followings are not applicable to encoder and decoder
|
||||||
void* inputHandle;
|
// void* inputHandle;
|
||||||
void* executor;
|
void* executor;
|
||||||
} STaskExec;
|
} STaskExec;
|
||||||
|
|
||||||
|
@ -240,12 +240,13 @@ struct SStreamTask {
|
||||||
int8_t inputType;
|
int8_t inputType;
|
||||||
int8_t status;
|
int8_t status;
|
||||||
|
|
||||||
int8_t sourceType;
|
|
||||||
int8_t execType;
|
int8_t execType;
|
||||||
int8_t sinkType;
|
int8_t sinkType;
|
||||||
int8_t dispatchType;
|
int8_t dispatchType;
|
||||||
int16_t dispatchMsgType;
|
int16_t dispatchMsgType;
|
||||||
|
|
||||||
|
int8_t dataScan;
|
||||||
|
|
||||||
// node info
|
// node info
|
||||||
int32_t childId;
|
int32_t childId;
|
||||||
int32_t nodeId;
|
int32_t nodeId;
|
||||||
|
|
|
@ -95,9 +95,12 @@ SArray *smGetMsgHandles() {
|
||||||
if (dmSetMgmtHandle(pArray, TDMT_MON_SM_INFO, smPutNodeMsgToMonitorQueue, 0) == NULL) goto _OVER;
|
if (dmSetMgmtHandle(pArray, TDMT_MON_SM_INFO, smPutNodeMsgToMonitorQueue, 0) == NULL) goto _OVER;
|
||||||
|
|
||||||
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_DEPLOY, smPutNodeMsgToMgmtQueue, 1) == NULL) goto _OVER;
|
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_DEPLOY, smPutNodeMsgToMgmtQueue, 1) == NULL) goto _OVER;
|
||||||
|
if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_TASK_DROP, smPutNodeMsgToMgmtQueue, 1) == NULL) goto _OVER;
|
||||||
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_RUN, smPutNodeMsgToMgmtQueue, 1) == NULL) goto _OVER;
|
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_RUN, smPutNodeMsgToMgmtQueue, 1) == NULL) goto _OVER;
|
||||||
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_DISPATCH, smPutNodeMsgToMgmtQueue, 1) == NULL) goto _OVER;
|
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_DISPATCH, smPutNodeMsgToMgmtQueue, 1) == NULL) goto _OVER;
|
||||||
|
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_DISPATCH_RSP, smPutNodeMsgToMgmtQueue, 1) == NULL) goto _OVER;
|
||||||
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_RECOVER, smPutNodeMsgToMgmtQueue, 1) == NULL) goto _OVER;
|
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_RECOVER, smPutNodeMsgToMgmtQueue, 1) == NULL) goto _OVER;
|
||||||
|
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_RECOVER_RSP, smPutNodeMsgToMgmtQueue, 1) == NULL) goto _OVER;
|
||||||
|
|
||||||
code = 0;
|
code = 0;
|
||||||
_OVER:
|
_OVER:
|
||||||
|
|
|
@ -55,7 +55,9 @@ static void smProcessUniqueQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t num
|
||||||
taosGetQitem(qall, (void **)&pMsg);
|
taosGetQitem(qall, (void **)&pMsg);
|
||||||
|
|
||||||
dTrace("msg:%p, get from snode-unique queue", pMsg);
|
dTrace("msg:%p, get from snode-unique queue", pMsg);
|
||||||
sndProcessUMsg(pMgmt->pSnode, pMsg);
|
if (sndProcessUMsg(pMgmt->pSnode, pMsg) < 0) {
|
||||||
|
ASSERT(0);
|
||||||
|
}
|
||||||
|
|
||||||
dTrace("msg:%p, is freed", pMsg);
|
dTrace("msg:%p, is freed", pMsg);
|
||||||
rpcFreeCont(pMsg->pCont);
|
rpcFreeCont(pMsg->pCont);
|
||||||
|
@ -67,7 +69,9 @@ static void smProcessSharedQueue(SQueueInfo *pInfo, SRpcMsg *pMsg) {
|
||||||
SSnodeMgmt *pMgmt = pInfo->ahandle;
|
SSnodeMgmt *pMgmt = pInfo->ahandle;
|
||||||
|
|
||||||
dTrace("msg:%p, get from snode-shared queue", pMsg);
|
dTrace("msg:%p, get from snode-shared queue", pMsg);
|
||||||
sndProcessSMsg(pMgmt->pSnode, pMsg);
|
if (sndProcessSMsg(pMgmt->pSnode, pMsg) < 0) {
|
||||||
|
ASSERT(0);
|
||||||
|
}
|
||||||
|
|
||||||
dTrace("msg:%p, is freed", pMsg);
|
dTrace("msg:%p, is freed", pMsg);
|
||||||
rpcFreeCont(pMsg->pCont);
|
rpcFreeCont(pMsg->pCont);
|
||||||
|
|
|
@ -169,10 +169,6 @@ static int32_t vmPutMsgToQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg, EQueueType qtyp
|
||||||
dTrace("vgId:%d, msg:%p put into vnode-sync queue", pVnode->vgId, pMsg);
|
dTrace("vgId:%d, msg:%p put into vnode-sync queue", pVnode->vgId, pMsg);
|
||||||
taosWriteQitem(pVnode->pSyncQ, pMsg);
|
taosWriteQitem(pVnode->pSyncQ, pMsg);
|
||||||
break;
|
break;
|
||||||
case MERGE_QUEUE:
|
|
||||||
dTrace("vgId:%d, msg:%p put into vnode-merge queue", pVnode->vgId, pMsg);
|
|
||||||
taosWriteQitem(pVnode->pMergeQ, pMsg);
|
|
||||||
break;
|
|
||||||
case APPLY_QUEUE:
|
case APPLY_QUEUE:
|
||||||
dTrace("vgId:%d, msg:%p put into vnode-apply queue", pVnode->vgId, pMsg);
|
dTrace("vgId:%d, msg:%p put into vnode-apply queue", pVnode->vgId, pMsg);
|
||||||
taosWriteQitem(pVnode->pApplyQ, pMsg);
|
taosWriteQitem(pVnode->pApplyQ, pMsg);
|
||||||
|
@ -195,8 +191,6 @@ int32_t vmPutMsgToQueryQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) { return vmPutMsg
|
||||||
|
|
||||||
int32_t vmPutMsgToFetchQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) { return vmPutMsgToQueue(pMgmt, pMsg, FETCH_QUEUE); }
|
int32_t vmPutMsgToFetchQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) { return vmPutMsgToQueue(pMgmt, pMsg, FETCH_QUEUE); }
|
||||||
|
|
||||||
int32_t vmPutMsgToMergeQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) { return vmPutMsgToQueue(pMgmt, pMsg, MERGE_QUEUE); }
|
|
||||||
|
|
||||||
int32_t vmPutMsgToMgmtQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
|
int32_t vmPutMsgToMgmtQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
|
||||||
dTrace("msg:%p, put into vnode-mgmt queue", pMsg);
|
dTrace("msg:%p, put into vnode-mgmt queue", pMsg);
|
||||||
taosWriteQitem(pMgmt->mgmtWorker.queue, pMsg);
|
taosWriteQitem(pMgmt->mgmtWorker.queue, pMsg);
|
||||||
|
@ -242,9 +236,6 @@ int32_t vmGetQueueSize(SVnodeMgmt *pMgmt, int32_t vgId, EQueueType qtype) {
|
||||||
case FETCH_QUEUE:
|
case FETCH_QUEUE:
|
||||||
size = taosQueueItemSize(pVnode->pFetchQ);
|
size = taosQueueItemSize(pVnode->pFetchQ);
|
||||||
break;
|
break;
|
||||||
case MERGE_QUEUE:
|
|
||||||
size = taosQueueItemSize(pVnode->pMergeQ);
|
|
||||||
break;
|
|
||||||
default:
|
default:
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
|
@ -63,9 +63,8 @@ int32_t mndConvertRsmaTask(char** pDst, int32_t* pDstLen, const char* ast, int64
|
||||||
.topicQuery = false,
|
.topicQuery = false,
|
||||||
.streamQuery = true,
|
.streamQuery = true,
|
||||||
.rSmaQuery = true,
|
.rSmaQuery = true,
|
||||||
.triggerType = STREAM_TRIGGER_AT_ONCE,
|
.triggerType = triggerType,
|
||||||
.watermark = watermark,
|
.watermark = watermark,
|
||||||
/*.filesFactor = filesFactor,*/
|
|
||||||
};
|
};
|
||||||
|
|
||||||
if (qCreateQueryPlan(&cxt, &pPlan, NULL) < 0) {
|
if (qCreateQueryPlan(&cxt, &pPlan, NULL) < 0) {
|
||||||
|
@ -270,7 +269,6 @@ int32_t mndAddShuffleSinkTasksToStream(SMnode* pMnode, STrans* pTrans, SStreamOb
|
||||||
pTask->epSet = mndGetVgroupEpset(pMnode, pVgroup);
|
pTask->epSet = mndGetVgroupEpset(pMnode, pVgroup);
|
||||||
|
|
||||||
// source
|
// source
|
||||||
pTask->sourceType = TASK_SOURCE__MERGE;
|
|
||||||
pTask->inputType = TASK_INPUT_TYPE__DATA_BLOCK;
|
pTask->inputType = TASK_INPUT_TYPE__DATA_BLOCK;
|
||||||
|
|
||||||
// exec
|
// exec
|
||||||
|
@ -316,7 +314,6 @@ int32_t mndAddFixedSinkTaskToStream(SMnode* pMnode, STrans* pTrans, SStreamObj*
|
||||||
#endif
|
#endif
|
||||||
pTask->epSet = mndGetVgroupEpset(pMnode, &pStream->fixedSinkVg);
|
pTask->epSet = mndGetVgroupEpset(pMnode, &pStream->fixedSinkVg);
|
||||||
// source
|
// source
|
||||||
pTask->sourceType = TASK_SOURCE__MERGE;
|
|
||||||
pTask->inputType = TASK_INPUT_TYPE__DATA_BLOCK;
|
pTask->inputType = TASK_INPUT_TYPE__DATA_BLOCK;
|
||||||
|
|
||||||
// exec
|
// exec
|
||||||
|
@ -427,6 +424,8 @@ int32_t mndScheduleStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream) {
|
||||||
SStreamTask* pTask = tNewSStreamTask(pStream->uid);
|
SStreamTask* pTask = tNewSStreamTask(pStream->uid);
|
||||||
mndAddTaskToTaskSet(taskSourceLevel, pTask);
|
mndAddTaskToTaskSet(taskSourceLevel, pTask);
|
||||||
|
|
||||||
|
pTask->dataScan = 1;
|
||||||
|
|
||||||
// input
|
// input
|
||||||
pTask->inputType = TASK_INPUT_TYPE__SUMBIT_BLOCK;
|
pTask->inputType = TASK_INPUT_TYPE__SUMBIT_BLOCK;
|
||||||
|
|
||||||
|
@ -470,6 +469,8 @@ int32_t mndScheduleStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream) {
|
||||||
SStreamTask* pTask = tNewSStreamTask(pStream->uid);
|
SStreamTask* pTask = tNewSStreamTask(pStream->uid);
|
||||||
mndAddTaskToTaskSet(taskOneLevel, pTask);
|
mndAddTaskToTaskSet(taskOneLevel, pTask);
|
||||||
|
|
||||||
|
pTask->dataScan = 1;
|
||||||
|
|
||||||
// input
|
// input
|
||||||
pTask->inputType = TASK_INPUT_TYPE__SUMBIT_BLOCK;
|
pTask->inputType = TASK_INPUT_TYPE__SUMBIT_BLOCK;
|
||||||
|
|
||||||
|
|
|
@ -56,7 +56,6 @@ SStreamTask* sndMetaGetTask(SStreamMeta* pMeta, int32_t taskId);
|
||||||
int32_t sndMetaRemoveTask(SStreamMeta* pMeta, int32_t taskId);
|
int32_t sndMetaRemoveTask(SStreamMeta* pMeta, int32_t taskId);
|
||||||
|
|
||||||
int32_t sndDropTaskOfStream(SStreamMeta* pMeta, int64_t streamId);
|
int32_t sndDropTaskOfStream(SStreamMeta* pMeta, int64_t streamId);
|
||||||
|
|
||||||
int32_t sndStopTaskOfStream(SStreamMeta* pMeta, int64_t streamId);
|
int32_t sndStopTaskOfStream(SStreamMeta* pMeta, int64_t streamId);
|
||||||
int32_t sndResumeTaskOfStream(SStreamMeta* pMeta, int64_t streamId);
|
int32_t sndResumeTaskOfStream(SStreamMeta* pMeta, int64_t streamId);
|
||||||
|
|
||||||
|
|
|
@ -76,45 +76,158 @@ int32_t sndMetaRemoveTask(SStreamMeta *pMeta, int32_t taskId) {
|
||||||
return taosHashRemove(pMeta->pHash, &taskId, sizeof(int32_t));
|
return taosHashRemove(pMeta->pHash, &taskId, sizeof(int32_t));
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t sndProcessTaskExecReq(SSnode *pSnode, SRpcMsg *pMsg) {
|
static int32_t sndProcessTaskDeployReq(SSnode *pNode, SRpcMsg *pMsg) {
|
||||||
/*SStreamExecMsgHead *pHead = pMsg->pCont;*/
|
SStreamMeta *pMeta = pNode->pMeta;
|
||||||
/*int32_t taskId = pHead->streamTaskId;*/
|
char *msg = pMsg->pCont;
|
||||||
/*SStreamTask *pTask = sndMetaGetTask(pSnode->pMeta, taskId);*/
|
int32_t msgLen = pMsg->contLen;
|
||||||
/*if (pTask == NULL) {*/
|
|
||||||
/*return -1;*/
|
SStreamTask *pTask = taosMemoryCalloc(1, sizeof(SStreamTask));
|
||||||
/*}*/
|
if (pTask == NULL) {
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
SDecoder decoder;
|
||||||
|
tDecoderInit(&decoder, (uint8_t *)msg, msgLen);
|
||||||
|
if (tDecodeSStreamTask(&decoder, pTask) < 0) {
|
||||||
|
ASSERT(0);
|
||||||
|
}
|
||||||
|
tDecoderClear(&decoder);
|
||||||
|
|
||||||
|
pTask->status = TASK_STATUS__IDLE;
|
||||||
|
|
||||||
|
pTask->inputQueue = streamQueueOpen();
|
||||||
|
pTask->outputQueue = streamQueueOpen();
|
||||||
|
pTask->inputStatus = TASK_INPUT_STATUS__NORMAL;
|
||||||
|
pTask->outputStatus = TASK_INPUT_STATUS__NORMAL;
|
||||||
|
|
||||||
|
if (pTask->inputQueue == NULL || pTask->outputQueue == NULL) goto FAIL;
|
||||||
|
|
||||||
|
pTask->pMsgCb = &pNode->msgCb;
|
||||||
|
|
||||||
|
ASSERT(pTask->execType != TASK_EXEC__NONE);
|
||||||
|
|
||||||
|
SReadHandle handle = {
|
||||||
|
.pMsgCb = &pNode->msgCb,
|
||||||
|
};
|
||||||
|
|
||||||
|
/*pTask->exec.inputHandle = NULL;*/
|
||||||
|
pTask->exec.executor = qCreateStreamExecTaskInfo(pTask->exec.qmsg, &handle);
|
||||||
|
ASSERT(pTask->exec.executor);
|
||||||
|
|
||||||
|
streamSetupTrigger(pTask);
|
||||||
|
|
||||||
|
qInfo("deploy stream: stream id %ld task id %d child id %d on snode", pTask->streamId, pTask->taskId, pTask->childId);
|
||||||
|
|
||||||
|
return 0;
|
||||||
|
|
||||||
|
FAIL:
|
||||||
|
if (pTask->inputQueue) streamQueueClose(pTask->inputQueue);
|
||||||
|
if (pTask->outputQueue) streamQueueClose(pTask->outputQueue);
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
static int32_t sndProcessTaskRunReq(SSnode *pNode, SRpcMsg *pMsg) {
|
||||||
|
SStreamMeta *pMeta = pNode->pMeta;
|
||||||
|
SStreamTaskRunReq *pReq = pMsg->pCont;
|
||||||
|
int32_t taskId = pReq->taskId;
|
||||||
|
SStreamTask *pTask = *(SStreamTask **)taosHashGet(pMeta->pHash, &taskId, sizeof(int32_t));
|
||||||
|
streamTaskProcessRunReq(pTask, &pNode->msgCb);
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
void sndProcessUMsg(SSnode *pSnode, SRpcMsg *pMsg) {
|
static int32_t sndProcessTaskDispatchReq(SSnode *pNode, SRpcMsg *pMsg) {
|
||||||
|
SStreamMeta *pMeta = pNode->pMeta;
|
||||||
|
|
||||||
|
char *msgStr = pMsg->pCont;
|
||||||
|
char *msgBody = POINTER_SHIFT(msgStr, sizeof(SMsgHead));
|
||||||
|
int32_t msgLen = pMsg->contLen - sizeof(SMsgHead);
|
||||||
|
|
||||||
|
SStreamDispatchReq req;
|
||||||
|
SDecoder decoder;
|
||||||
|
tDecoderInit(&decoder, msgBody, msgLen);
|
||||||
|
tDecodeStreamDispatchReq(&decoder, &req);
|
||||||
|
int32_t taskId = req.taskId;
|
||||||
|
SStreamTask *pTask = *(SStreamTask **)taosHashGet(pMeta->pHash, &taskId, sizeof(int32_t));
|
||||||
|
SRpcMsg rsp = {
|
||||||
|
.info = pMsg->info,
|
||||||
|
.code = 0,
|
||||||
|
};
|
||||||
|
streamProcessDispatchReq(pTask, &pNode->msgCb, &req, &rsp);
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
static int32_t sndProcessTaskRecoverReq(SSnode *pNode, SRpcMsg *pMsg) {
|
||||||
|
SStreamMeta *pMeta = pNode->pMeta;
|
||||||
|
|
||||||
|
SStreamTaskRecoverReq *pReq = pMsg->pCont;
|
||||||
|
int32_t taskId = pReq->taskId;
|
||||||
|
SStreamTask *pTask = *(SStreamTask **)taosHashGet(pMeta->pHash, &taskId, sizeof(int32_t));
|
||||||
|
streamProcessRecoverReq(pTask, &pNode->msgCb, pReq, pMsg);
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
static int32_t sndProcessTaskDispatchRsp(SSnode *pNode, SRpcMsg *pMsg) {
|
||||||
|
SStreamMeta *pMeta = pNode->pMeta;
|
||||||
|
|
||||||
|
SStreamDispatchRsp *pRsp = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
|
||||||
|
int32_t taskId = pRsp->taskId;
|
||||||
|
SStreamTask *pTask = *(SStreamTask **)taosHashGet(pMeta->pHash, &taskId, sizeof(int32_t));
|
||||||
|
streamProcessDispatchRsp(pTask, &pNode->msgCb, pRsp);
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
static int32_t sndProcessTaskRecoverRsp(SSnode *pNode, SRpcMsg *pMsg) {
|
||||||
|
SStreamMeta *pMeta = pNode->pMeta;
|
||||||
|
|
||||||
|
SStreamTaskRecoverRsp *pRsp = pMsg->pCont;
|
||||||
|
int32_t taskId = pRsp->taskId;
|
||||||
|
SStreamTask *pTask = *(SStreamTask **)taosHashGet(pMeta->pHash, &taskId, sizeof(int32_t));
|
||||||
|
streamProcessRecoverRsp(pTask, pRsp);
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
static int32_t sndProcessTaskDropReq(SSnode *pNode, SRpcMsg *pMsg) {
|
||||||
|
SStreamMeta *pMeta = pNode->pMeta;
|
||||||
|
|
||||||
|
char *msg = pMsg->pCont;
|
||||||
|
int32_t msgLen = pMsg->contLen;
|
||||||
|
SVDropStreamTaskReq *pReq = (SVDropStreamTaskReq *)msg;
|
||||||
|
int32_t code = taosHashRemove(pMeta->pHash, &pReq->taskId, sizeof(int32_t));
|
||||||
|
ASSERT(code == 0);
|
||||||
|
if (code == 0) {
|
||||||
|
// sendrsp
|
||||||
|
}
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t sndProcessUMsg(SSnode *pSnode, SRpcMsg *pMsg) {
|
||||||
// stream deploy
|
// stream deploy
|
||||||
// stream stop/resume
|
// stream stop/resume
|
||||||
// operator exec
|
// operator exec
|
||||||
if (pMsg->msgType == TDMT_STREAM_TASK_DEPLOY) {
|
switch (pMsg->msgType) {
|
||||||
void *msg = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
|
case TDMT_STREAM_TASK_DEPLOY:
|
||||||
SStreamTask *pTask = taosMemoryMalloc(sizeof(SStreamTask));
|
return sndProcessTaskDeployReq(pSnode, pMsg);
|
||||||
if (pTask == NULL) {
|
case TDMT_VND_STREAM_TASK_DROP:
|
||||||
|
return sndProcessTaskDropReq(pSnode, pMsg);
|
||||||
|
default:
|
||||||
ASSERT(0);
|
ASSERT(0);
|
||||||
return;
|
|
||||||
}
|
|
||||||
SDecoder decoder;
|
|
||||||
tDecoderInit(&decoder, msg, pMsg->contLen - sizeof(SMsgHead));
|
|
||||||
tDecodeSStreamTask(&decoder, pTask);
|
|
||||||
tDecoderClear(&decoder);
|
|
||||||
|
|
||||||
sndMetaDeployTask(pSnode->pMeta, pTask);
|
|
||||||
/*} else if (pMsg->msgType == TDMT_SND_TASK_EXEC) {*/
|
|
||||||
/*sndProcessTaskExecReq(pSnode, pMsg);*/
|
|
||||||
} else {
|
|
||||||
ASSERT(0);
|
|
||||||
}
|
}
|
||||||
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
void sndProcessSMsg(SSnode *pSnode, SRpcMsg *pMsg) {
|
int32_t sndProcessSMsg(SSnode *pSnode, SRpcMsg *pMsg) {
|
||||||
// operator exec
|
switch (pMsg->msgType) {
|
||||||
/*if (pMsg->msgType == TDMT_SND_TASK_EXEC) {*/
|
case TDMT_STREAM_TASK_RUN:
|
||||||
/*sndProcessTaskExecReq(pSnode, pMsg);*/
|
return sndProcessTaskRunReq(pSnode, pMsg);
|
||||||
/*} else {*/
|
case TDMT_STREAM_TASK_DISPATCH:
|
||||||
ASSERT(0);
|
return sndProcessTaskDispatchReq(pSnode, pMsg);
|
||||||
/*}*/
|
case TDMT_STREAM_TASK_RECOVER:
|
||||||
|
return sndProcessTaskRecoverReq(pSnode, pMsg);
|
||||||
|
case TDMT_STREAM_TASK_DISPATCH_RSP:
|
||||||
|
return sndProcessTaskDispatchRsp(pSnode, pMsg);
|
||||||
|
case TDMT_STREAM_TASK_RECOVER_RSP:
|
||||||
|
return sndProcessTaskRecoverRsp(pSnode, pMsg);
|
||||||
|
default:
|
||||||
|
ASSERT(0);
|
||||||
|
}
|
||||||
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
|
@ -125,10 +125,10 @@ int32_t tqProcessOffsetCommitReq(STQ* pTq, char* msg, int32_t msgLen) {
|
||||||
|
|
||||||
if (offset.type == TMQ_OFFSET__SNAPSHOT) {
|
if (offset.type == TMQ_OFFSET__SNAPSHOT) {
|
||||||
tqDebug("receive offset commit msg to %s on vg %d, offset(type:snapshot) uid: %ld, ts: %ld", offset.subKey,
|
tqDebug("receive offset commit msg to %s on vg %d, offset(type:snapshot) uid: %ld, ts: %ld", offset.subKey,
|
||||||
pTq->pVnode->config.vgId, offset.uid, offset.ts);
|
TD_VID(pTq->pVnode), offset.uid, offset.ts);
|
||||||
} else if (offset.type == TMQ_OFFSET__LOG) {
|
} else if (offset.type == TMQ_OFFSET__LOG) {
|
||||||
tqDebug("receive offset commit msg to %s on vg %d, offset(type:log) version: %ld", offset.subKey,
|
tqDebug("receive offset commit msg to %s on vg %d, offset(type:log) version: %ld", offset.subKey,
|
||||||
pTq->pVnode->config.vgId, offset.version);
|
TD_VID(pTq->pVnode), offset.version);
|
||||||
} else {
|
} else {
|
||||||
ASSERT(0);
|
ASSERT(0);
|
||||||
}
|
}
|
||||||
|
@ -159,7 +159,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) {
|
||||||
if (pOffset != NULL) {
|
if (pOffset != NULL) {
|
||||||
ASSERT(pOffset->type == TMQ_OFFSET__LOG);
|
ASSERT(pOffset->type == TMQ_OFFSET__LOG);
|
||||||
tqDebug("consumer %ld, restore offset of %s on vg %d, offset(type:log) version: %ld", consumerId, pReq->subKey,
|
tqDebug("consumer %ld, restore offset of %s on vg %d, offset(type:log) version: %ld", consumerId, pReq->subKey,
|
||||||
pTq->pVnode->config.vgId, pOffset->version);
|
TD_VID(pTq->pVnode), pOffset->version);
|
||||||
fetchOffset = pOffset->version + 1;
|
fetchOffset = pOffset->version + 1;
|
||||||
} else {
|
} else {
|
||||||
if (pReq->currentOffset == TMQ_CONF__RESET_OFFSET__EARLIEAST) {
|
if (pReq->currentOffset == TMQ_CONF__RESET_OFFSET__EARLIEAST) {
|
||||||
|
@ -167,13 +167,13 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) {
|
||||||
} else if (pReq->currentOffset == TMQ_CONF__RESET_OFFSET__LATEST) {
|
} else if (pReq->currentOffset == TMQ_CONF__RESET_OFFSET__LATEST) {
|
||||||
fetchOffset = walGetCommittedVer(pTq->pWal);
|
fetchOffset = walGetCommittedVer(pTq->pWal);
|
||||||
} else if (pReq->currentOffset == TMQ_CONF__RESET_OFFSET__NONE) {
|
} else if (pReq->currentOffset == TMQ_CONF__RESET_OFFSET__NONE) {
|
||||||
tqError("tmq poll: no offset committed for consumer %ld in vg %d, subkey %s", consumerId,
|
tqError("tmq poll: no offset committed for consumer %ld in vg %d, subkey %s", consumerId, TD_VID(pTq->pVnode),
|
||||||
pTq->pVnode->config.vgId, pReq->subKey);
|
pReq->subKey);
|
||||||
terrno = TSDB_CODE_TQ_NO_COMMITTED_OFFSET;
|
terrno = TSDB_CODE_TQ_NO_COMMITTED_OFFSET;
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
tqDebug("consumer %ld, restore offset of %s on vg %d failed, config is %ld, set to %ld", consumerId, pReq->subKey,
|
tqDebug("consumer %ld, restore offset of %s on vg %d failed, config is %ld, set to %ld", consumerId, pReq->subKey,
|
||||||
pTq->pVnode->config.vgId, pReq->currentOffset, fetchOffset);
|
TD_VID(pTq->pVnode), pReq->currentOffset, fetchOffset);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -183,14 +183,14 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) {
|
||||||
STqHandle* pHandle = taosHashGet(pTq->handles, pReq->subKey, strlen(pReq->subKey));
|
STqHandle* pHandle = taosHashGet(pTq->handles, pReq->subKey, strlen(pReq->subKey));
|
||||||
/*ASSERT(pHandle);*/
|
/*ASSERT(pHandle);*/
|
||||||
if (pHandle == NULL) {
|
if (pHandle == NULL) {
|
||||||
tqError("tmq poll: no consumer handle for consumer %ld in vg %d, subkey %s", consumerId, pTq->pVnode->config.vgId,
|
tqError("tmq poll: no consumer handle for consumer %ld in vg %d, subkey %s", consumerId, TD_VID(pTq->pVnode),
|
||||||
pReq->subKey);
|
pReq->subKey);
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pHandle->consumerId != consumerId) {
|
if (pHandle->consumerId != consumerId) {
|
||||||
tqError("tmq poll: consumer handle mismatch for consumer %ld in vg %d, subkey %s, handle consumer id %ld",
|
tqError("tmq poll: consumer handle mismatch for consumer %ld in vg %d, subkey %s, handle consumer id %ld",
|
||||||
consumerId, pTq->pVnode->config.vgId, pReq->subKey, pHandle->consumerId);
|
consumerId, TD_VID(pTq->pVnode), pReq->subKey, pHandle->consumerId);
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -304,7 +304,6 @@ int32_t tqProcessVgDeleteReq(STQ* pTq, char* msg, int32_t msgLen) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
// TODO: persist meta into tdb
|
|
||||||
int32_t tqProcessVgChangeReq(STQ* pTq, char* msg, int32_t msgLen) {
|
int32_t tqProcessVgChangeReq(STQ* pTq, char* msg, int32_t msgLen) {
|
||||||
SMqRebVgReq req = {0};
|
SMqRebVgReq req = {0};
|
||||||
tDecodeSMqRebVgReq(msg, &req);
|
tDecodeSMqRebVgReq(msg, &req);
|
||||||
|
@ -346,10 +345,10 @@ int32_t tqProcessVgChangeReq(STQ* pTq, char* msg, int32_t msgLen) {
|
||||||
pHandle->execHandle.execTb.suid = req.suid;
|
pHandle->execHandle.execTb.suid = req.suid;
|
||||||
SArray* tbUidList = taosArrayInit(0, sizeof(int64_t));
|
SArray* tbUidList = taosArrayInit(0, sizeof(int64_t));
|
||||||
tsdbGetCtbIdList(pTq->pVnode->pMeta, req.suid, tbUidList);
|
tsdbGetCtbIdList(pTq->pVnode->pMeta, req.suid, tbUidList);
|
||||||
tqDebug("vg %d, tq try get suid: %ld", pTq->pVnode->config.vgId, req.suid);
|
tqDebug("vg %d, tq try get suid: %ld", TD_VID(pTq->pVnode), req.suid);
|
||||||
for (int32_t i = 0; i < taosArrayGetSize(tbUidList); i++) {
|
for (int32_t i = 0; i < taosArrayGetSize(tbUidList); i++) {
|
||||||
int64_t tbUid = *(int64_t*)taosArrayGet(tbUidList, i);
|
int64_t tbUid = *(int64_t*)taosArrayGet(tbUidList, i);
|
||||||
tqDebug("vg %d, idx %d, uid: %ld", pTq->pVnode->config.vgId, i, tbUid);
|
tqDebug("vg %d, idx %d, uid: %ld", TD_VID(pTq->pVnode), i, tbUid);
|
||||||
}
|
}
|
||||||
for (int32_t i = 0; i < 5; i++) {
|
for (int32_t i = 0; i < 5; i++) {
|
||||||
tqReadHandleSetTbUidList(pHandle->execHandle.pExecReader[i], tbUidList);
|
tqReadHandleSetTbUidList(pHandle->execHandle.pExecReader[i], tbUidList);
|
||||||
|
@ -400,16 +399,21 @@ int32_t tqProcessTaskDeploy(STQ* pTq, char* msg, int32_t msgLen) {
|
||||||
// exec
|
// exec
|
||||||
if (pTask->execType != TASK_EXEC__NONE) {
|
if (pTask->execType != TASK_EXEC__NONE) {
|
||||||
// expand runners
|
// expand runners
|
||||||
STqReadHandle* pStreamReader = tqInitSubmitMsgScanner(pTq->pVnode->pMeta);
|
if (pTask->dataScan) {
|
||||||
SReadHandle handle = {
|
STqReadHandle* pStreamReader = tqInitSubmitMsgScanner(pTq->pVnode->pMeta);
|
||||||
.reader = pStreamReader,
|
SReadHandle handle = {
|
||||||
.meta = pTq->pVnode->pMeta,
|
.reader = pStreamReader,
|
||||||
.pMsgCb = &pTq->pVnode->msgCb,
|
.meta = pTq->pVnode->pMeta,
|
||||||
.vnode = pTq->pVnode,
|
.pMsgCb = &pTq->pVnode->msgCb,
|
||||||
};
|
.vnode = pTq->pVnode,
|
||||||
pTask->exec.inputHandle = pStreamReader;
|
};
|
||||||
pTask->exec.executor = qCreateStreamExecTaskInfo(pTask->exec.qmsg, &handle);
|
/*pTask->exec.inputHandle = pStreamReader;*/
|
||||||
ASSERT(pTask->exec.executor);
|
pTask->exec.executor = qCreateStreamExecTaskInfo(pTask->exec.qmsg, &handle);
|
||||||
|
ASSERT(pTask->exec.executor);
|
||||||
|
} else {
|
||||||
|
pTask->exec.executor = qCreateStreamExecTaskInfo(pTask->exec.qmsg, NULL);
|
||||||
|
ASSERT(pTask->exec.executor);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// sink
|
// sink
|
||||||
|
@ -431,7 +435,7 @@ int32_t tqProcessTaskDeploy(STQ* pTq, char* msg, int32_t msgLen) {
|
||||||
|
|
||||||
streamSetupTrigger(pTask);
|
streamSetupTrigger(pTask);
|
||||||
|
|
||||||
tqInfo("deploy stream task id %d child id %d on vg %d", pTask->taskId, pTask->childId, pTq->pVnode->config.vgId);
|
tqInfo("deploy stream task id %d child id %d on vg %d", pTask->taskId, pTask->childId, TD_VID(pTq->pVnode));
|
||||||
|
|
||||||
taosHashPut(pTq->pStreamTasks, &pTask->taskId, sizeof(int32_t), &pTask, sizeof(void*));
|
taosHashPut(pTq->pStreamTasks, &pTask->taskId, sizeof(int32_t), &pTask, sizeof(void*));
|
||||||
|
|
||||||
|
@ -464,7 +468,7 @@ int32_t tqProcessStreamTrigger(STQ* pTq, SSubmitReq* pReq) {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (streamLaunchByWrite(pTask, pTq->pVnode->config.vgId, &pTq->pVnode->msgCb) < 0) {
|
if (streamLaunchByWrite(pTask, TD_VID(pTq->pVnode), &pTq->pVnode->msgCb) < 0) {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
|
@ -534,9 +538,9 @@ int32_t tqProcessTaskRecoverRsp(STQ* pTq, SRpcMsg* pMsg) {
|
||||||
int32_t tqProcessTaskDropReq(STQ* pTq, char* msg, int32_t msgLen) {
|
int32_t tqProcessTaskDropReq(STQ* pTq, char* msg, int32_t msgLen) {
|
||||||
SVDropStreamTaskReq* pReq = (SVDropStreamTaskReq*)msg;
|
SVDropStreamTaskReq* pReq = (SVDropStreamTaskReq*)msg;
|
||||||
int32_t code = taosHashRemove(pTq->pStreamTasks, &pReq->taskId, sizeof(int32_t));
|
int32_t code = taosHashRemove(pTq->pStreamTasks, &pReq->taskId, sizeof(int32_t));
|
||||||
|
ASSERT(code == 0);
|
||||||
if (code == 0) {
|
if (code == 0) {
|
||||||
// sendrsp
|
// sendrsp
|
||||||
}
|
}
|
||||||
ASSERT(code == 0);
|
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
|
@ -99,7 +99,7 @@ int32_t qSetMultiStreamInput(qTaskInfo_t tinfo, const void* pBlocks, size_t numO
|
||||||
}
|
}
|
||||||
|
|
||||||
qTaskInfo_t qCreateStreamExecTaskInfo(void* msg, void* streamReadHandle) {
|
qTaskInfo_t qCreateStreamExecTaskInfo(void* msg, void* streamReadHandle) {
|
||||||
if (msg == NULL || streamReadHandle == NULL) {
|
if (msg == NULL) {
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -13,10 +13,10 @@
|
||||||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||||
*/
|
*/
|
||||||
|
|
||||||
#include "os.h"
|
|
||||||
#include "tref.h"
|
|
||||||
#include "dataSinkMgt.h"
|
#include "dataSinkMgt.h"
|
||||||
|
#include "os.h"
|
||||||
#include "tmsg.h"
|
#include "tmsg.h"
|
||||||
|
#include "tref.h"
|
||||||
#include "tudf.h"
|
#include "tudf.h"
|
||||||
|
|
||||||
#include "executor.h"
|
#include "executor.h"
|
||||||
|
@ -24,15 +24,13 @@
|
||||||
#include "query.h"
|
#include "query.h"
|
||||||
|
|
||||||
static TdThreadOnce initPoolOnce = PTHREAD_ONCE_INIT;
|
static TdThreadOnce initPoolOnce = PTHREAD_ONCE_INIT;
|
||||||
int32_t exchangeObjRefPool = -1;
|
int32_t exchangeObjRefPool = -1;
|
||||||
|
|
||||||
static void initRefPool() {
|
static void initRefPool() { exchangeObjRefPool = taosOpenRef(1024, doDestroyExchangeOperatorInfo); }
|
||||||
exchangeObjRefPool = taosOpenRef(1024, doDestroyExchangeOperatorInfo);
|
|
||||||
}
|
|
||||||
|
|
||||||
int32_t qCreateExecTask(SReadHandle* readHandle, int32_t vgId, uint64_t taskId, SSubplan* pSubplan,
|
int32_t qCreateExecTask(SReadHandle* readHandle, int32_t vgId, uint64_t taskId, SSubplan* pSubplan,
|
||||||
qTaskInfo_t* pTaskInfo, DataSinkHandle* handle, const char* sql, EOPTR_EXEC_MODEL model) {
|
qTaskInfo_t* pTaskInfo, DataSinkHandle* handle, const char* sql, EOPTR_EXEC_MODEL model) {
|
||||||
assert(readHandle != NULL && pSubplan != NULL);
|
assert(pSubplan != NULL);
|
||||||
SExecTaskInfo** pTask = (SExecTaskInfo**)pTaskInfo;
|
SExecTaskInfo** pTask = (SExecTaskInfo**)pTaskInfo;
|
||||||
|
|
||||||
taosThreadOnce(&initPoolOnce, initRefPool);
|
taosThreadOnce(&initPoolOnce, initRefPool);
|
||||||
|
@ -47,57 +45,57 @@ int32_t qCreateExecTask(SReadHandle* readHandle, int32_t vgId, uint64_t taskId,
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
goto _error;
|
goto _error;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (handle) {
|
if (handle) {
|
||||||
void* pSinkParam = NULL;
|
void* pSinkParam = NULL;
|
||||||
code = createDataSinkParam(pSubplan->pDataSink, &pSinkParam, pTaskInfo);
|
code = createDataSinkParam(pSubplan->pDataSink, &pSinkParam, pTaskInfo);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
goto _error;
|
goto _error;
|
||||||
}
|
}
|
||||||
|
|
||||||
code = dsCreateDataSinker(pSubplan->pDataSink, handle, pSinkParam);
|
code = dsCreateDataSinker(pSubplan->pDataSink, handle, pSinkParam);
|
||||||
}
|
}
|
||||||
|
|
||||||
_error:
|
_error:
|
||||||
// if failed to add ref for all tables in this query, abort current query
|
// if failed to add ref for all tables in this query, abort current query
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
#ifdef TEST_IMPL
|
#ifdef TEST_IMPL
|
||||||
// wait moment
|
// wait moment
|
||||||
int waitMoment(SQInfo* pQInfo){
|
int waitMoment(SQInfo* pQInfo) {
|
||||||
if(pQInfo->sql) {
|
if (pQInfo->sql) {
|
||||||
int ms = 0;
|
int ms = 0;
|
||||||
char* pcnt = strstr(pQInfo->sql, " count(*)");
|
char* pcnt = strstr(pQInfo->sql, " count(*)");
|
||||||
if(pcnt) return 0;
|
if (pcnt) return 0;
|
||||||
|
|
||||||
char* pos = strstr(pQInfo->sql, " t_");
|
char* pos = strstr(pQInfo->sql, " t_");
|
||||||
if(pos){
|
if (pos) {
|
||||||
pos += 3;
|
pos += 3;
|
||||||
ms = atoi(pos);
|
ms = atoi(pos);
|
||||||
while(*pos >= '0' && *pos <= '9'){
|
while (*pos >= '0' && *pos <= '9') {
|
||||||
pos ++;
|
pos++;
|
||||||
}
|
}
|
||||||
char unit_char = *pos;
|
char unit_char = *pos;
|
||||||
if(unit_char == 'h'){
|
if (unit_char == 'h') {
|
||||||
ms *= 3600*1000;
|
ms *= 3600 * 1000;
|
||||||
} else if(unit_char == 'm'){
|
} else if (unit_char == 'm') {
|
||||||
ms *= 60*1000;
|
ms *= 60 * 1000;
|
||||||
} else if(unit_char == 's'){
|
} else if (unit_char == 's') {
|
||||||
ms *= 1000;
|
ms *= 1000;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if(ms == 0) return 0;
|
if (ms == 0) return 0;
|
||||||
printf("test wait sleep %dms. sql=%s ...\n", ms, pQInfo->sql);
|
printf("test wait sleep %dms. sql=%s ...\n", ms, pQInfo->sql);
|
||||||
|
|
||||||
if(ms < 1000) {
|
if (ms < 1000) {
|
||||||
taosMsleep(ms);
|
taosMsleep(ms);
|
||||||
} else {
|
} else {
|
||||||
int used_ms = 0;
|
int used_ms = 0;
|
||||||
while(used_ms < ms) {
|
while (used_ms < ms) {
|
||||||
taosMsleep(1000);
|
taosMsleep(1000);
|
||||||
used_ms += 1000;
|
used_ms += 1000;
|
||||||
if(isTaskKilled(pQInfo)){
|
if (isTaskKilled(pQInfo)) {
|
||||||
printf("test check query is canceled, sleep break.%s\n", pQInfo->sql);
|
printf("test check query is canceled, sleep break.%s\n", pQInfo->sql);
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
@ -108,15 +106,14 @@ int waitMoment(SQInfo* pQInfo){
|
||||||
}
|
}
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
int32_t qExecTask(qTaskInfo_t tinfo, SSDataBlock** pRes, uint64_t *useconds) {
|
int32_t qExecTask(qTaskInfo_t tinfo, SSDataBlock** pRes, uint64_t* useconds) {
|
||||||
SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
|
SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
|
||||||
int64_t threadId = taosGetSelfPthreadId();
|
int64_t threadId = taosGetSelfPthreadId();
|
||||||
|
|
||||||
*pRes = NULL;
|
*pRes = NULL;
|
||||||
int64_t curOwner = 0;
|
int64_t curOwner = 0;
|
||||||
if ((curOwner = atomic_val_compare_exchange_64(&pTaskInfo->owner, 0, threadId)) != 0) {
|
if ((curOwner = atomic_val_compare_exchange_64(&pTaskInfo->owner, 0, threadId)) != 0) {
|
||||||
qError("%s-%p execTask is now executed by thread:%p", GET_TASKID(pTaskInfo), pTaskInfo,
|
qError("%s-%p execTask is now executed by thread:%p", GET_TASKID(pTaskInfo), pTaskInfo, (void*)curOwner);
|
||||||
(void*)curOwner);
|
|
||||||
pTaskInfo->code = TSDB_CODE_QRY_IN_EXEC;
|
pTaskInfo->code = TSDB_CODE_QRY_IN_EXEC;
|
||||||
return pTaskInfo->code;
|
return pTaskInfo->code;
|
||||||
}
|
}
|
||||||
|
@ -152,18 +149,18 @@ int32_t qExecTask(qTaskInfo_t tinfo, SSDataBlock** pRes, uint64_t *useconds) {
|
||||||
|
|
||||||
cleanUpUdfs();
|
cleanUpUdfs();
|
||||||
|
|
||||||
int32_t current = (*pRes != NULL)? (*pRes)->info.rows:0;
|
int32_t current = (*pRes != NULL) ? (*pRes)->info.rows : 0;
|
||||||
uint64_t total = pTaskInfo->pRoot->resultInfo.totalRows;
|
uint64_t total = pTaskInfo->pRoot->resultInfo.totalRows;
|
||||||
|
|
||||||
qDebug("%s task suspended, %d rows returned, total:%" PRId64 " rows, in sinkNode:%d, elapsed:%.2f ms",
|
qDebug("%s task suspended, %d rows returned, total:%" PRId64 " rows, in sinkNode:%d, elapsed:%.2f ms",
|
||||||
GET_TASKID(pTaskInfo), current, total, 0, el/1000.0);
|
GET_TASKID(pTaskInfo), current, total, 0, el / 1000.0);
|
||||||
|
|
||||||
atomic_store_64(&pTaskInfo->owner, 0);
|
atomic_store_64(&pTaskInfo->owner, 0);
|
||||||
return pTaskInfo->code;
|
return pTaskInfo->code;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t qKillTask(qTaskInfo_t qinfo) {
|
int32_t qKillTask(qTaskInfo_t qinfo) {
|
||||||
SExecTaskInfo *pTaskInfo = (SExecTaskInfo *)qinfo;
|
SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)qinfo;
|
||||||
|
|
||||||
if (pTaskInfo == NULL) {
|
if (pTaskInfo == NULL) {
|
||||||
return TSDB_CODE_QRY_INVALID_QHANDLE;
|
return TSDB_CODE_QRY_INVALID_QHANDLE;
|
||||||
|
@ -182,7 +179,7 @@ int32_t qKillTask(qTaskInfo_t qinfo) {
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t qAsyncKillTask(qTaskInfo_t qinfo) {
|
int32_t qAsyncKillTask(qTaskInfo_t qinfo) {
|
||||||
SExecTaskInfo *pTaskInfo = (SExecTaskInfo *)qinfo;
|
SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)qinfo;
|
||||||
|
|
||||||
if (pTaskInfo == NULL) {
|
if (pTaskInfo == NULL) {
|
||||||
return TSDB_CODE_QRY_INVALID_QHANDLE;
|
return TSDB_CODE_QRY_INVALID_QHANDLE;
|
||||||
|
@ -195,7 +192,7 @@ int32_t qAsyncKillTask(qTaskInfo_t qinfo) {
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t qIsTaskCompleted(qTaskInfo_t qinfo) {
|
int32_t qIsTaskCompleted(qTaskInfo_t qinfo) {
|
||||||
SExecTaskInfo *pTaskInfo = (SExecTaskInfo *)qinfo;
|
SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)qinfo;
|
||||||
|
|
||||||
if (pTaskInfo == NULL) {
|
if (pTaskInfo == NULL) {
|
||||||
return TSDB_CODE_QRY_INVALID_QHANDLE;
|
return TSDB_CODE_QRY_INVALID_QHANDLE;
|
||||||
|
@ -205,18 +202,17 @@ int32_t qIsTaskCompleted(qTaskInfo_t qinfo) {
|
||||||
}
|
}
|
||||||
|
|
||||||
void qDestroyTask(qTaskInfo_t qTaskHandle) {
|
void qDestroyTask(qTaskInfo_t qTaskHandle) {
|
||||||
SExecTaskInfo* pTaskInfo = (SExecTaskInfo*) qTaskHandle;
|
SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)qTaskHandle;
|
||||||
qDebug("%s execTask completed, numOfRows:%"PRId64, GET_TASKID(pTaskInfo), pTaskInfo->pRoot->resultInfo.totalRows);
|
qDebug("%s execTask completed, numOfRows:%" PRId64, GET_TASKID(pTaskInfo), pTaskInfo->pRoot->resultInfo.totalRows);
|
||||||
|
|
||||||
queryCostStatis(pTaskInfo); // print the query cost summary
|
queryCostStatis(pTaskInfo); // print the query cost summary
|
||||||
doDestroyTask(pTaskInfo);
|
doDestroyTask(pTaskInfo);
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t qGetExplainExecInfo(qTaskInfo_t tinfo, int32_t *resNum, SExplainExecInfo **pRes) {
|
int32_t qGetExplainExecInfo(qTaskInfo_t tinfo, int32_t* resNum, SExplainExecInfo** pRes) {
|
||||||
SExecTaskInfo *pTaskInfo = (SExecTaskInfo *)tinfo;
|
SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
|
||||||
int32_t capacity = 0;
|
int32_t capacity = 0;
|
||||||
|
|
||||||
return getOperatorExplainExecInfo(pTaskInfo->pRoot, pRes, &capacity, resNum);
|
return getOperatorExplainExecInfo(pTaskInfo->pRoot, pRes, &capacity, resNum);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -4557,11 +4557,17 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
|
||||||
STimeWindowAggSupp twSup = {
|
STimeWindowAggSupp twSup = {
|
||||||
.waterMark = pTableScanNode->watermark, .calTrigger = pTableScanNode->triggerType, .maxTs = INT64_MIN};
|
.waterMark = pTableScanNode->watermark, .calTrigger = pTableScanNode->triggerType, .maxTs = INT64_MIN};
|
||||||
tsdbReaderT pDataReader = NULL;
|
tsdbReaderT pDataReader = NULL;
|
||||||
|
|
||||||
|
if (pHandle) {
|
||||||
|
pDataReader = doCreateDataReader(pTableScanNode, pHandle, pTableListInfo, (uint64_t)queryId, taskId, pTagCond);
|
||||||
|
}
|
||||||
|
#if 0
|
||||||
if (pHandle->vnode) {
|
if (pHandle->vnode) {
|
||||||
pDataReader = doCreateDataReader(pTableScanNode, pHandle, pTableListInfo, (uint64_t)queryId, taskId, pTagCond);
|
pDataReader = doCreateDataReader(pTableScanNode, pHandle, pTableListInfo, (uint64_t)queryId, taskId, pTagCond);
|
||||||
} else {
|
} else {
|
||||||
getTableList(pHandle->meta, pScanPhyNode->tableType, pScanPhyNode->uid, pTableListInfo, pTagCond);
|
getTableList(pHandle->meta, pScanPhyNode->tableType, pScanPhyNode->uid, pTableListInfo, pTagCond);
|
||||||
}
|
}
|
||||||
|
#endif
|
||||||
|
|
||||||
if (pDataReader == NULL && terrno != 0) {
|
if (pDataReader == NULL && terrno != 0) {
|
||||||
qDebug("%s pDataReader is NULL", GET_TASKID(pTaskInfo));
|
qDebug("%s pDataReader is NULL", GET_TASKID(pTaskInfo));
|
||||||
|
@ -4894,8 +4900,8 @@ SArray* extractColumnInfo(SNodeList* pNodeList) {
|
||||||
}
|
}
|
||||||
|
|
||||||
SArray* extractPartitionColInfo(SNodeList* pNodeList) {
|
SArray* extractPartitionColInfo(SNodeList* pNodeList) {
|
||||||
if(!pNodeList) {
|
if (!pNodeList) {
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
size_t numOfCols = LIST_LENGTH(pNodeList);
|
size_t numOfCols = LIST_LENGTH(pNodeList);
|
||||||
|
|
|
@ -537,7 +537,7 @@ SOperatorInfo* createTableScanOperatorInfo(STableScanPhysiNode* pTableScanNode,
|
||||||
goto _error;
|
goto _error;
|
||||||
}
|
}
|
||||||
|
|
||||||
//taosSsleep(20);
|
// taosSsleep(20);
|
||||||
|
|
||||||
SDataBlockDescNode* pDescNode = pTableScanNode->scan.node.pOutputDataBlockDesc;
|
SDataBlockDescNode* pDescNode = pTableScanNode->scan.node.pOutputDataBlockDesc;
|
||||||
|
|
||||||
|
@ -800,23 +800,23 @@ static SSDataBlock* doDataScan(SStreamBlockScanInfo* pInfo) {
|
||||||
if (!pResult) {
|
if (!pResult) {
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pResult->info.groupId == pInfo->groupId) {
|
if (pResult->info.groupId == pInfo->groupId) {
|
||||||
return pResult;
|
return pResult;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/* Todo(liuyao) for partition by column
|
/* Todo(liuyao) for partition by column
|
||||||
SSDataBlock* pBlock = createOneDataBlock(pResult, true);
|
SSDataBlock* pBlock = createOneDataBlock(pResult, true);
|
||||||
blockDataCleanup(pResult);
|
blockDataCleanup(pResult);
|
||||||
for (int32_t i = 0; i < pBlock->info.rows; i++) {
|
for (int32_t i = 0; i < pBlock->info.rows; i++) {
|
||||||
uint64_t id = getGroupId(pInfo->pOperatorDumy, pBlock, i);
|
uint64_t id = getGroupId(pInfo->pOperatorDumy, pBlock, i);
|
||||||
if (id == pInfo->groupId) {
|
if (id == pInfo->groupId) {
|
||||||
copyOneRow(pResult, pBlock, i);
|
copyOneRow(pResult, pBlock, i);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
return pResult;
|
||||||
return pResult;
|
*/
|
||||||
*/
|
|
||||||
}
|
}
|
||||||
|
|
||||||
static void setUpdateData(SStreamBlockScanInfo* pInfo, SSDataBlock* pBlock, SSDataBlock* pUpdateBlock) {
|
static void setUpdateData(SStreamBlockScanInfo* pInfo, SSDataBlock* pBlock, SSDataBlock* pUpdateBlock) {
|
||||||
|
@ -831,7 +831,7 @@ static void setUpdateData(SStreamBlockScanInfo* pInfo, SSDataBlock* pBlock, SSDa
|
||||||
int32_t rowId = *(int32_t*)taosArrayGet(pInfo->tsArray, pInfo->tsArrayIndex);
|
int32_t rowId = *(int32_t*)taosArrayGet(pInfo->tsArray, pInfo->tsArrayIndex);
|
||||||
pInfo->groupId = getGroupId(pInfo->pOperatorDumy, pBlock, rowId);
|
pInfo->groupId = getGroupId(pInfo->pOperatorDumy, pBlock, rowId);
|
||||||
int32_t i = 0;
|
int32_t i = 0;
|
||||||
for ( ; i < size; i++) {
|
for (; i < size; i++) {
|
||||||
rowId = *(int32_t*)taosArrayGet(pInfo->tsArray, i + pInfo->tsArrayIndex);
|
rowId = *(int32_t*)taosArrayGet(pInfo->tsArray, i + pInfo->tsArrayIndex);
|
||||||
uint64_t id = getGroupId(pInfo->pOperatorDumy, pBlock, rowId);
|
uint64_t id = getGroupId(pInfo->pOperatorDumy, pBlock, rowId);
|
||||||
if (pInfo->groupId != id) {
|
if (pInfo->groupId != id) {
|
||||||
|
@ -1061,9 +1061,6 @@ SOperatorInfo* createStreamScanOperatorInfo(void* pDataReader, SReadHandle* pHan
|
||||||
SScanPhysiNode* pScanPhyNode = &pTableScanNode->scan;
|
SScanPhysiNode* pScanPhyNode = &pTableScanNode->scan;
|
||||||
|
|
||||||
SDataBlockDescNode* pDescNode = pScanPhyNode->node.pOutputDataBlockDesc;
|
SDataBlockDescNode* pDescNode = pScanPhyNode->node.pOutputDataBlockDesc;
|
||||||
SOperatorInfo* pTableScanDummy = createTableScanOperatorInfo(pTableScanNode, pDataReader, pHandle, pTaskInfo);
|
|
||||||
|
|
||||||
STableScanInfo* pSTInfo = (STableScanInfo*)pTableScanDummy->info;
|
|
||||||
|
|
||||||
int32_t numOfCols = 0;
|
int32_t numOfCols = 0;
|
||||||
pInfo->pColMatchInfo =
|
pInfo->pColMatchInfo =
|
||||||
|
@ -1081,16 +1078,6 @@ SOperatorInfo* createStreamScanOperatorInfo(void* pDataReader, SReadHandle* pHan
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// set the extract column id to streamHandle
|
|
||||||
tqReadHandleSetColIdList((STqReadHandle*)pHandle->reader, pColIds);
|
|
||||||
SArray* tableIdList = extractTableIdList(&pTaskInfo->tableqinfoList);
|
|
||||||
int32_t code = tqReadHandleSetTbUidList(pHandle->reader, tableIdList);
|
|
||||||
if (code != 0) {
|
|
||||||
taosArrayDestroy(tableIdList);
|
|
||||||
goto _error;
|
|
||||||
}
|
|
||||||
taosArrayDestroy(tableIdList);
|
|
||||||
|
|
||||||
pInfo->pBlockLists = taosArrayInit(4, POINTER_BYTES);
|
pInfo->pBlockLists = taosArrayInit(4, POINTER_BYTES);
|
||||||
if (pInfo->pBlockLists == NULL) {
|
if (pInfo->pBlockLists == NULL) {
|
||||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
@ -1102,30 +1089,44 @@ SOperatorInfo* createStreamScanOperatorInfo(void* pDataReader, SReadHandle* pHan
|
||||||
goto _error;
|
goto _error;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pSTInfo->interval.interval > 0 && pDataReader) {
|
if (pDataReader) {
|
||||||
pInfo->pUpdateInfo = updateInfoInitP(&pSTInfo->interval, pTwSup->waterMark);
|
SOperatorInfo* pTableScanDummy = createTableScanOperatorInfo(pTableScanNode, pDataReader, pHandle, pTaskInfo);
|
||||||
} else {
|
STableScanInfo* pSTInfo = (STableScanInfo*)pTableScanDummy->info;
|
||||||
pInfo->pUpdateInfo = NULL;
|
if (pSTInfo->interval.interval > 0) {
|
||||||
|
pInfo->pUpdateInfo = updateInfoInitP(&pSTInfo->interval, pTwSup->waterMark);
|
||||||
|
} else {
|
||||||
|
pInfo->pUpdateInfo = NULL;
|
||||||
|
}
|
||||||
|
pInfo->pOperatorDumy = pTableScanDummy;
|
||||||
|
pInfo->interval = pSTInfo->interval;
|
||||||
|
|
||||||
|
// set the extract column id to streamHandle
|
||||||
|
tqReadHandleSetColIdList((STqReadHandle*)pHandle->reader, pColIds);
|
||||||
|
SArray* tableIdList = extractTableIdList(&pTaskInfo->tableqinfoList);
|
||||||
|
int32_t code = tqReadHandleSetTbUidList(pHandle->reader, tableIdList);
|
||||||
|
if (code != 0) {
|
||||||
|
taosArrayDestroy(tableIdList);
|
||||||
|
goto _error;
|
||||||
|
}
|
||||||
|
taosArrayDestroy(tableIdList);
|
||||||
|
pInfo->readHandle = *pHandle;
|
||||||
|
pInfo->streamBlockReader = pHandle->reader;
|
||||||
}
|
}
|
||||||
|
|
||||||
// create the pseduo columns info
|
// create the pseduo columns info
|
||||||
if (pTableScanNode->scan.pScanPseudoCols != NULL) {
|
if (pTableScanNode->scan.pScanPseudoCols != NULL) {
|
||||||
pInfo->pPseudoExpr = createExprInfo(pTableScanNode->scan.pScanPseudoCols, NULL, &pInfo->numOfPseudoExpr);
|
pInfo->pPseudoExpr = createExprInfo(pTableScanNode->scan.pScanPseudoCols, NULL, &pInfo->numOfPseudoExpr);
|
||||||
|
pInfo->tableUid = pScanPhyNode->uid;
|
||||||
}
|
}
|
||||||
|
|
||||||
pInfo->readHandle = *pHandle;
|
|
||||||
pInfo->tableUid = pScanPhyNode->uid;
|
|
||||||
pInfo->streamBlockReader = pHandle->reader;
|
|
||||||
pInfo->pRes = createResDataBlock(pDescNode);
|
pInfo->pRes = createResDataBlock(pDescNode);
|
||||||
pInfo->pUpdateRes = createResDataBlock(pDescNode);
|
pInfo->pUpdateRes = createResDataBlock(pDescNode);
|
||||||
pInfo->pCondition = pScanPhyNode->node.pConditions;
|
pInfo->pCondition = pScanPhyNode->node.pConditions;
|
||||||
pInfo->pDataReader = pDataReader;
|
pInfo->pDataReader = pDataReader;
|
||||||
pInfo->scanMode = STREAM_SCAN_FROM_READERHANDLE;
|
pInfo->scanMode = STREAM_SCAN_FROM_READERHANDLE;
|
||||||
pInfo->pOperatorDumy = pTableScanDummy;
|
|
||||||
pInfo->interval = pSTInfo->interval;
|
|
||||||
pInfo->sessionSup = (SessionWindowSupporter){.pStreamAggSup = NULL, .gap = -1};
|
pInfo->sessionSup = (SessionWindowSupporter){.pStreamAggSup = NULL, .gap = -1};
|
||||||
pInfo->groupId = 0;
|
pInfo->groupId = 0;
|
||||||
|
|
||||||
pOperator->name = "StreamBlockScanOperator";
|
pOperator->name = "StreamBlockScanOperator";
|
||||||
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN;
|
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN;
|
||||||
pOperator->blocking = false;
|
pOperator->blocking = false;
|
||||||
|
@ -1947,7 +1948,7 @@ _error:
|
||||||
|
|
||||||
static int32_t loadDataBlockFromOneTable(SOperatorInfo* pOperator, STableMergeScanInfo* pTableScanInfo,
|
static int32_t loadDataBlockFromOneTable(SOperatorInfo* pOperator, STableMergeScanInfo* pTableScanInfo,
|
||||||
int32_t readerIdx, SSDataBlock* pBlock, uint32_t* status) {
|
int32_t readerIdx, SSDataBlock* pBlock, uint32_t* status) {
|
||||||
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
|
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
|
||||||
STableMergeScanInfo* pInfo = pOperator->info;
|
STableMergeScanInfo* pInfo = pOperator->info;
|
||||||
|
|
||||||
SFileBlockLoadRecorder* pCost = &pTableScanInfo->readRecorder;
|
SFileBlockLoadRecorder* pCost = &pTableScanInfo->readRecorder;
|
||||||
|
@ -2200,8 +2201,7 @@ SSDataBlock* doTableMergeScan(SOperatorInfo* pOperator) {
|
||||||
longjmp(pTaskInfo->env, code);
|
longjmp(pTaskInfo->env, code);
|
||||||
}
|
}
|
||||||
|
|
||||||
SSDataBlock* pBlock =
|
SSDataBlock* pBlock = getSortedTableMergeScanBlockData(pInfo->pSortHandle, pOperator->resultInfo.capacity, pOperator);
|
||||||
getSortedTableMergeScanBlockData(pInfo->pSortHandle, pOperator->resultInfo.capacity, pOperator);
|
|
||||||
|
|
||||||
if (pBlock != NULL) {
|
if (pBlock != NULL) {
|
||||||
pOperator->resultInfo.totalRows += pBlock->info.rows;
|
pOperator->resultInfo.totalRows += pBlock->info.rows;
|
||||||
|
@ -2234,20 +2234,20 @@ void destroyTableMergeScanOperatorInfo(void* param, int32_t numOfOutput) {
|
||||||
|
|
||||||
typedef struct STableMergeScanExecInfo {
|
typedef struct STableMergeScanExecInfo {
|
||||||
SFileBlockLoadRecorder blockRecorder;
|
SFileBlockLoadRecorder blockRecorder;
|
||||||
SSortExecInfo sortExecInfo;
|
SSortExecInfo sortExecInfo;
|
||||||
} STableMergeScanExecInfo;
|
} STableMergeScanExecInfo;
|
||||||
|
|
||||||
int32_t getTableMergeScanExplainExecInfo(SOperatorInfo* pOptr, void** pOptrExplain, uint32_t* len) {
|
int32_t getTableMergeScanExplainExecInfo(SOperatorInfo* pOptr, void** pOptrExplain, uint32_t* len) {
|
||||||
ASSERT(pOptr != NULL);
|
ASSERT(pOptr != NULL);
|
||||||
// TODO: merge these two info into one struct
|
// TODO: merge these two info into one struct
|
||||||
STableMergeScanExecInfo* execInfo = taosMemoryCalloc(1, sizeof(STableMergeScanExecInfo));
|
STableMergeScanExecInfo* execInfo = taosMemoryCalloc(1, sizeof(STableMergeScanExecInfo));
|
||||||
STableMergeScanInfo* pInfo = pOptr->info;
|
STableMergeScanInfo* pInfo = pOptr->info;
|
||||||
execInfo->blockRecorder = pInfo->readRecorder;
|
execInfo->blockRecorder = pInfo->readRecorder;
|
||||||
execInfo->sortExecInfo = tsortGetSortExecInfo(pInfo->pSortHandle);
|
execInfo->sortExecInfo = tsortGetSortExecInfo(pInfo->pSortHandle);
|
||||||
|
|
||||||
*pOptrExplain = execInfo;
|
*pOptrExplain = execInfo;
|
||||||
*len = sizeof(STableMergeScanExecInfo);
|
*len = sizeof(STableMergeScanExecInfo);
|
||||||
|
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -2277,16 +2277,16 @@ SOperatorInfo* createTableMergeScanOperatorInfo(STableScanPhysiNode* pTableScanN
|
||||||
|
|
||||||
pInfo->scanInfo = (SScanInfo){.numOfAsc = pTableScanNode->scanSeq[0], .numOfDesc = pTableScanNode->scanSeq[1]};
|
pInfo->scanInfo = (SScanInfo){.numOfAsc = pTableScanNode->scanSeq[0], .numOfDesc = pTableScanNode->scanSeq[1]};
|
||||||
|
|
||||||
pInfo->readHandle = *readHandle;
|
pInfo->readHandle = *readHandle;
|
||||||
pInfo->interval = extractIntervalInfo(pTableScanNode);
|
pInfo->interval = extractIntervalInfo(pTableScanNode);
|
||||||
pInfo->sample.sampleRatio = pTableScanNode->ratio;
|
pInfo->sample.sampleRatio = pTableScanNode->ratio;
|
||||||
pInfo->sample.seed = taosGetTimestampSec();
|
pInfo->sample.seed = taosGetTimestampSec();
|
||||||
pInfo->dataBlockLoadFlag = pTableScanNode->dataRequired;
|
pInfo->dataBlockLoadFlag = pTableScanNode->dataRequired;
|
||||||
pInfo->pFilterNode = pTableScanNode->scan.node.pConditions;
|
pInfo->pFilterNode = pTableScanNode->scan.node.pConditions;
|
||||||
pInfo->dataReaders = dataReaders;
|
pInfo->dataReaders = dataReaders;
|
||||||
pInfo->scanFlag = MAIN_SCAN;
|
pInfo->scanFlag = MAIN_SCAN;
|
||||||
pInfo->pColMatchInfo = pColList;
|
pInfo->pColMatchInfo = pColList;
|
||||||
pInfo->curTWinIdx = 0;
|
pInfo->curTWinIdx = 0;
|
||||||
|
|
||||||
pInfo->pResBlock = createResDataBlock(pDescNode);
|
pInfo->pResBlock = createResDataBlock(pDescNode);
|
||||||
|
|
||||||
|
@ -2304,22 +2304,22 @@ SOperatorInfo* createTableMergeScanOperatorInfo(STableScanPhysiNode* pTableScanN
|
||||||
pInfo->pSortInputBlock = createOneDataBlock(pInfo->pResBlock, false);
|
pInfo->pSortInputBlock = createOneDataBlock(pInfo->pResBlock, false);
|
||||||
|
|
||||||
int32_t rowSize = pInfo->pResBlock->info.rowSize;
|
int32_t rowSize = pInfo->pResBlock->info.rowSize;
|
||||||
pInfo->bufPageSize = getProperSortPageSize(rowSize);
|
pInfo->bufPageSize = getProperSortPageSize(rowSize);
|
||||||
|
|
||||||
// todo the total available buffer should be determined by total capacity of buffer of this task.
|
// todo the total available buffer should be determined by total capacity of buffer of this task.
|
||||||
// the additional one is reserved for merge result
|
// the additional one is reserved for merge result
|
||||||
pInfo->sortBufSize = pInfo->bufPageSize * (taosArrayGetSize(dataReaders) + 1);
|
pInfo->sortBufSize = pInfo->bufPageSize * (taosArrayGetSize(dataReaders) + 1);
|
||||||
pInfo->hasGroupId = false;
|
pInfo->hasGroupId = false;
|
||||||
pInfo->prefetchedTuple = NULL;
|
pInfo->prefetchedTuple = NULL;
|
||||||
|
|
||||||
pOperator->name = "TableMergeScanOperator";
|
pOperator->name = "TableMergeScanOperator";
|
||||||
// TODO : change it
|
// TODO : change it
|
||||||
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_TABLE_MERGE_SCAN;
|
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_TABLE_MERGE_SCAN;
|
||||||
pOperator->blocking = false;
|
pOperator->blocking = false;
|
||||||
pOperator->status = OP_NOT_OPENED;
|
pOperator->status = OP_NOT_OPENED;
|
||||||
pOperator->info = pInfo;
|
pOperator->info = pInfo;
|
||||||
pOperator->numOfExprs = numOfCols;
|
pOperator->numOfExprs = numOfCols;
|
||||||
pOperator->pTaskInfo = pTaskInfo;
|
pOperator->pTaskInfo = pTaskInfo;
|
||||||
initResultSizeInfo(pOperator, 1024);
|
initResultSizeInfo(pOperator, 1024);
|
||||||
|
|
||||||
pOperator->fpSet =
|
pOperator->fpSet =
|
||||||
|
|
|
@ -36,11 +36,11 @@ int32_t tEncodeSStreamTask(SEncoder* pEncoder, const SStreamTask* pTask) {
|
||||||
if (tEncodeI32(pEncoder, pTask->taskId) < 0) return -1;
|
if (tEncodeI32(pEncoder, pTask->taskId) < 0) return -1;
|
||||||
if (tEncodeI8(pEncoder, pTask->inputType) < 0) return -1;
|
if (tEncodeI8(pEncoder, pTask->inputType) < 0) return -1;
|
||||||
if (tEncodeI8(pEncoder, pTask->status) < 0) return -1;
|
if (tEncodeI8(pEncoder, pTask->status) < 0) return -1;
|
||||||
if (tEncodeI8(pEncoder, pTask->sourceType) < 0) return -1;
|
|
||||||
if (tEncodeI8(pEncoder, pTask->execType) < 0) return -1;
|
if (tEncodeI8(pEncoder, pTask->execType) < 0) return -1;
|
||||||
if (tEncodeI8(pEncoder, pTask->sinkType) < 0) return -1;
|
if (tEncodeI8(pEncoder, pTask->sinkType) < 0) return -1;
|
||||||
if (tEncodeI8(pEncoder, pTask->dispatchType) < 0) return -1;
|
if (tEncodeI8(pEncoder, pTask->dispatchType) < 0) return -1;
|
||||||
if (tEncodeI16(pEncoder, pTask->dispatchMsgType) < 0) return -1;
|
if (tEncodeI16(pEncoder, pTask->dispatchMsgType) < 0) return -1;
|
||||||
|
if (tEncodeI8(pEncoder, pTask->dataScan) < 0) return -1;
|
||||||
|
|
||||||
if (tEncodeI32(pEncoder, pTask->childId) < 0) return -1;
|
if (tEncodeI32(pEncoder, pTask->childId) < 0) return -1;
|
||||||
if (tEncodeI32(pEncoder, pTask->nodeId) < 0) return -1;
|
if (tEncodeI32(pEncoder, pTask->nodeId) < 0) return -1;
|
||||||
|
@ -84,11 +84,11 @@ int32_t tDecodeSStreamTask(SDecoder* pDecoder, SStreamTask* pTask) {
|
||||||
if (tDecodeI32(pDecoder, &pTask->taskId) < 0) return -1;
|
if (tDecodeI32(pDecoder, &pTask->taskId) < 0) return -1;
|
||||||
if (tDecodeI8(pDecoder, &pTask->inputType) < 0) return -1;
|
if (tDecodeI8(pDecoder, &pTask->inputType) < 0) return -1;
|
||||||
if (tDecodeI8(pDecoder, &pTask->status) < 0) return -1;
|
if (tDecodeI8(pDecoder, &pTask->status) < 0) return -1;
|
||||||
if (tDecodeI8(pDecoder, &pTask->sourceType) < 0) return -1;
|
|
||||||
if (tDecodeI8(pDecoder, &pTask->execType) < 0) return -1;
|
if (tDecodeI8(pDecoder, &pTask->execType) < 0) return -1;
|
||||||
if (tDecodeI8(pDecoder, &pTask->sinkType) < 0) return -1;
|
if (tDecodeI8(pDecoder, &pTask->sinkType) < 0) return -1;
|
||||||
if (tDecodeI8(pDecoder, &pTask->dispatchType) < 0) return -1;
|
if (tDecodeI8(pDecoder, &pTask->dispatchType) < 0) return -1;
|
||||||
if (tDecodeI16(pDecoder, &pTask->dispatchMsgType) < 0) return -1;
|
if (tDecodeI16(pDecoder, &pTask->dispatchMsgType) < 0) return -1;
|
||||||
|
if (tDecodeI8(pDecoder, &pTask->dataScan) < 0) return -1;
|
||||||
|
|
||||||
if (tDecodeI32(pDecoder, &pTask->childId) < 0) return -1;
|
if (tDecodeI32(pDecoder, &pTask->childId) < 0) return -1;
|
||||||
if (tDecodeI32(pDecoder, &pTask->nodeId) < 0) return -1;
|
if (tDecodeI32(pDecoder, &pTask->nodeId) < 0) return -1;
|
||||||
|
|
Loading…
Reference in New Issue