Merge pull request #16194 from taosdata/feature/stream
refactor(mnode): drop stream task
This commit is contained in:
commit
6de1ce0f8c
|
@ -98,10 +98,9 @@ int32_t create_stream() {
|
||||||
/*const char* sql = "select min(k), max(k), sum(k) as sum_of_k from st1";*/
|
/*const char* sql = "select min(k), max(k), sum(k) as sum_of_k from st1";*/
|
||||||
/*const char* sql = "select sum(k) from tu1 interval(10m)";*/
|
/*const char* sql = "select sum(k) from tu1 interval(10m)";*/
|
||||||
/*pRes = tmq_create_stream(pConn, "stream1", "out1", sql);*/
|
/*pRes = tmq_create_stream(pConn, "stream1", "out1", sql);*/
|
||||||
pRes =
|
pRes = taos_query(pConn,
|
||||||
taos_query(pConn,
|
"create stream stream1 trigger max_delay 10s watermark 10s into outstb as select _wstart start, "
|
||||||
"create stream stream1 trigger max_delay 10s into outstb as select _wstart, sum(k) from st1 partition "
|
"count(k) from st1 partition by tbname interval(20s) ");
|
||||||
"by tbname session(ts, 10s) ");
|
|
||||||
if (taos_errno(pRes) != 0) {
|
if (taos_errno(pRes) != 0) {
|
||||||
printf("failed to create stream stream1, reason:%s\n", taos_errstr(pRes));
|
printf("failed to create stream stream1, reason:%s\n", taos_errstr(pRes));
|
||||||
return -1;
|
return -1;
|
||||||
|
|
|
@ -60,6 +60,7 @@ enum {
|
||||||
STREAM_INPUT__DATA_RETRIEVE,
|
STREAM_INPUT__DATA_RETRIEVE,
|
||||||
STREAM_INPUT__GET_RES,
|
STREAM_INPUT__GET_RES,
|
||||||
STREAM_INPUT__CHECKPOINT,
|
STREAM_INPUT__CHECKPOINT,
|
||||||
|
STREAM_INPUT__DESTROY,
|
||||||
};
|
};
|
||||||
|
|
||||||
typedef enum EStreamType {
|
typedef enum EStreamType {
|
||||||
|
|
|
@ -53,6 +53,7 @@ enum {
|
||||||
TASK_SCHED_STATUS__WAITING,
|
TASK_SCHED_STATUS__WAITING,
|
||||||
TASK_SCHED_STATUS__ACTIVE,
|
TASK_SCHED_STATUS__ACTIVE,
|
||||||
TASK_SCHED_STATUS__FAILED,
|
TASK_SCHED_STATUS__FAILED,
|
||||||
|
TASK_SCHED_STATUS__DROPPING,
|
||||||
};
|
};
|
||||||
|
|
||||||
enum {
|
enum {
|
||||||
|
@ -127,6 +128,10 @@ typedef struct {
|
||||||
int8_t type;
|
int8_t type;
|
||||||
} SStreamCheckpoint;
|
} SStreamCheckpoint;
|
||||||
|
|
||||||
|
typedef struct {
|
||||||
|
int8_t type;
|
||||||
|
} SStreamTaskDestroy;
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
int8_t type;
|
int8_t type;
|
||||||
SSDataBlock* pBlock;
|
SSDataBlock* pBlock;
|
||||||
|
@ -211,7 +216,6 @@ typedef struct {
|
||||||
void* vnode;
|
void* vnode;
|
||||||
FTbSink* tbSinkFunc;
|
FTbSink* tbSinkFunc;
|
||||||
STSchema* pTSchema;
|
STSchema* pTSchema;
|
||||||
SHashObj* pHash; // groupId to tbuid
|
|
||||||
} STaskSinkTb;
|
} STaskSinkTb;
|
||||||
|
|
||||||
typedef void FSmaSink(void* vnode, int64_t smaId, const SArray* data);
|
typedef void FSmaSink(void* vnode, int64_t smaId, const SArray* data);
|
||||||
|
|
|
@ -291,6 +291,7 @@ int32_t* taosGetErrno();
|
||||||
#define TSDB_CODE_MND_STREAM_NOT_EXIST TAOS_DEF_ERROR_CODE(0, 0x03F1)
|
#define TSDB_CODE_MND_STREAM_NOT_EXIST TAOS_DEF_ERROR_CODE(0, 0x03F1)
|
||||||
#define TSDB_CODE_MND_INVALID_STREAM_OPTION TAOS_DEF_ERROR_CODE(0, 0x03F2)
|
#define TSDB_CODE_MND_INVALID_STREAM_OPTION TAOS_DEF_ERROR_CODE(0, 0x03F2)
|
||||||
#define TSDB_CODE_MND_STREAM_MUST_BE_DELETED TAOS_DEF_ERROR_CODE(0, 0x03F3)
|
#define TSDB_CODE_MND_STREAM_MUST_BE_DELETED TAOS_DEF_ERROR_CODE(0, 0x03F3)
|
||||||
|
#define TSDB_CODE_MND_STREAM_TASK_DROPPED TAOS_DEF_ERROR_CODE(0, 0x03F4)
|
||||||
|
|
||||||
// mnode-sma
|
// mnode-sma
|
||||||
#define TSDB_CODE_MND_SMA_ALREADY_EXIST TAOS_DEF_ERROR_CODE(0, 0x0480)
|
#define TSDB_CODE_MND_SMA_ALREADY_EXIST TAOS_DEF_ERROR_CODE(0, 0x0480)
|
||||||
|
|
|
@ -424,6 +424,8 @@ int32_t mndScheduleStream(SMnode* pMnode, SStreamObj* pStream) {
|
||||||
}
|
}
|
||||||
mndAddTaskToTaskSet(taskSourceLevel, pTask);
|
mndAddTaskToTaskSet(taskSourceLevel, pTask);
|
||||||
|
|
||||||
|
pTask->triggerParam = 0;
|
||||||
|
|
||||||
// source
|
// source
|
||||||
pTask->taskLevel = TASK_LEVEL__SOURCE;
|
pTask->taskLevel = TASK_LEVEL__SOURCE;
|
||||||
|
|
||||||
|
|
|
@ -2021,8 +2021,7 @@ static int32_t mndCheckDropStbForTopic(SMnode *pMnode, const char *stbFullName,
|
||||||
FOREACH(pNode, pNodeList) {
|
FOREACH(pNode, pNodeList) {
|
||||||
SColumnNode *pCol = (SColumnNode *)pNode;
|
SColumnNode *pCol = (SColumnNode *)pNode;
|
||||||
|
|
||||||
if (pCol->tableId != suid) {
|
if (pCol->tableId == suid) {
|
||||||
mDebug("topic:%s, check colId:%d passed", pTopic->name, pCol->colId);
|
|
||||||
sdbRelease(pSdb, pTopic);
|
sdbRelease(pSdb, pTopic);
|
||||||
nodesDestroyNode(pAst);
|
nodesDestroyNode(pAst);
|
||||||
return -1;
|
return -1;
|
||||||
|
@ -2045,6 +2044,16 @@ static int32_t mndCheckDropStbForStream(SMnode *pMnode, const char *stbFullName,
|
||||||
pIter = sdbFetch(pSdb, SDB_STREAM, pIter, (void **)&pStream);
|
pIter = sdbFetch(pSdb, SDB_STREAM, pIter, (void **)&pStream);
|
||||||
if (pIter == NULL) break;
|
if (pIter == NULL) break;
|
||||||
|
|
||||||
|
if (pStream->smaId != 0) {
|
||||||
|
sdbRelease(pSdb, pStream);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (pStream->targetStbUid == suid) {
|
||||||
|
sdbRelease(pSdb, pStream);
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
SNode *pAst = NULL;
|
SNode *pAst = NULL;
|
||||||
if (nodesStringToNode(pStream->ast, &pAst) != 0) {
|
if (nodesStringToNode(pStream->ast, &pAst) != 0) {
|
||||||
ASSERT(0);
|
ASSERT(0);
|
||||||
|
@ -2057,8 +2066,7 @@ static int32_t mndCheckDropStbForStream(SMnode *pMnode, const char *stbFullName,
|
||||||
FOREACH(pNode, pNodeList) {
|
FOREACH(pNode, pNodeList) {
|
||||||
SColumnNode *pCol = (SColumnNode *)pNode;
|
SColumnNode *pCol = (SColumnNode *)pNode;
|
||||||
|
|
||||||
if (pCol->tableId != suid) {
|
if (pCol->tableId == suid) {
|
||||||
mDebug("stream:%s, check colId:%d passed", pStream->name, pCol->colId);
|
|
||||||
sdbRelease(pSdb, pStream);
|
sdbRelease(pSdb, pStream);
|
||||||
nodesDestroyNode(pAst);
|
nodesDestroyNode(pAst);
|
||||||
return -1;
|
return -1;
|
||||||
|
|
|
@ -628,8 +628,6 @@ int32_t tqProcessVgChangeReq(STQ* pTq, int64_t version, char* msg, int32_t msgLe
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask) {
|
int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask) {
|
||||||
int32_t code = 0;
|
|
||||||
|
|
||||||
if (pTask->taskLevel == TASK_LEVEL__AGG) {
|
if (pTask->taskLevel == TASK_LEVEL__AGG) {
|
||||||
ASSERT(taosArrayGetSize(pTask->childEpInfo) != 0);
|
ASSERT(taosArrayGetSize(pTask->childEpInfo) != 0);
|
||||||
}
|
}
|
||||||
|
@ -640,8 +638,7 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask) {
|
||||||
pTask->outputQueue = streamQueueOpen();
|
pTask->outputQueue = streamQueueOpen();
|
||||||
|
|
||||||
if (pTask->inputQueue == NULL || pTask->outputQueue == NULL) {
|
if (pTask->inputQueue == NULL || pTask->outputQueue == NULL) {
|
||||||
code = -1;
|
return -1;
|
||||||
goto FAIL;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
pTask->inputStatus = TASK_INPUT_STATUS__NORMAL;
|
pTask->inputStatus = TASK_INPUT_STATUS__NORMAL;
|
||||||
|
@ -686,14 +683,9 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask) {
|
||||||
|
|
||||||
streamSetupTrigger(pTask);
|
streamSetupTrigger(pTask);
|
||||||
|
|
||||||
tqInfo("deploy stream task on vg %d, task id %d, child id %d", TD_VID(pTq->pVnode), pTask->taskId,
|
tqInfo("expand stream task on vg %d, task id %d, child id %d", TD_VID(pTq->pVnode), pTask->taskId,
|
||||||
pTask->selfChildId);
|
pTask->selfChildId);
|
||||||
|
return 0;
|
||||||
FAIL:
|
|
||||||
if (pTask->inputQueue) streamQueueClose(pTask->inputQueue);
|
|
||||||
if (pTask->outputQueue) streamQueueClose(pTask->outputQueue);
|
|
||||||
// TODO free executor
|
|
||||||
return code;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t tqProcessTaskDeployReq(STQ* pTq, int64_t version, char* msg, int32_t msgLen) {
|
int32_t tqProcessTaskDeployReq(STQ* pTq, int64_t version, char* msg, int32_t msgLen) {
|
||||||
|
|
|
@ -231,11 +231,12 @@ void tqTableSink(SStreamTask* pTask, void* vnode, int64_t ver, void* data) {
|
||||||
|
|
||||||
ASSERT(pTask->tbSink.pTSchema);
|
ASSERT(pTask->tbSink.pTSchema);
|
||||||
deleteReq.deleteReqs = taosArrayInit(0, sizeof(SSingleDeleteReq));
|
deleteReq.deleteReqs = taosArrayInit(0, sizeof(SSingleDeleteReq));
|
||||||
SSubmitReq* pReq = tqBlockToSubmit(pVnode, pRes, pTask->tbSink.pTSchema, true, pTask->tbSink.stbUid,
|
SSubmitReq* submitReq = tqBlockToSubmit(pVnode, pRes, pTask->tbSink.pTSchema, true, pTask->tbSink.stbUid,
|
||||||
pTask->tbSink.stbFullName, &deleteReq);
|
pTask->tbSink.stbFullName, &deleteReq);
|
||||||
|
|
||||||
tqDebug("vgId:%d, task %d convert blocks over, put into write-queue", TD_VID(pVnode), pTask->taskId);
|
tqDebug("vgId:%d, task %d convert blocks over, put into write-queue", TD_VID(pVnode), pTask->taskId);
|
||||||
|
|
||||||
|
if (taosArrayGetSize(deleteReq.deleteReqs) != 0) {
|
||||||
int32_t code;
|
int32_t code;
|
||||||
int32_t len;
|
int32_t len;
|
||||||
tEncodeSize(tEncodeSBatchDeleteReq, &deleteReq, len, code);
|
tEncodeSize(tEncodeSBatchDeleteReq, &deleteReq, len, code);
|
||||||
|
@ -244,21 +245,21 @@ void tqTableSink(SStreamTask* pTask, void* vnode, int64_t ver, void* data) {
|
||||||
ASSERT(0);
|
ASSERT(0);
|
||||||
}
|
}
|
||||||
SEncoder encoder;
|
SEncoder encoder;
|
||||||
void* buf = rpcMallocCont(len + sizeof(SMsgHead));
|
void* serializedDeleteReq = rpcMallocCont(len + sizeof(SMsgHead));
|
||||||
void* abuf = POINTER_SHIFT(buf, sizeof(SMsgHead));
|
void* abuf = POINTER_SHIFT(serializedDeleteReq, sizeof(SMsgHead));
|
||||||
tEncoderInit(&encoder, abuf, len);
|
tEncoderInit(&encoder, abuf, len);
|
||||||
tEncodeSBatchDeleteReq(&encoder, &deleteReq);
|
tEncodeSBatchDeleteReq(&encoder, &deleteReq);
|
||||||
tEncoderClear(&encoder);
|
tEncoderClear(&encoder);
|
||||||
|
|
||||||
((SMsgHead*)buf)->vgId = pVnode->config.vgId;
|
((SMsgHead*)serializedDeleteReq)->vgId = pVnode->config.vgId;
|
||||||
|
|
||||||
if (taosArrayGetSize(deleteReq.deleteReqs) != 0) {
|
|
||||||
SRpcMsg msg = {
|
SRpcMsg msg = {
|
||||||
.msgType = TDMT_VND_BATCH_DEL,
|
.msgType = TDMT_VND_BATCH_DEL,
|
||||||
.pCont = buf,
|
.pCont = serializedDeleteReq,
|
||||||
.contLen = len + sizeof(SMsgHead),
|
.contLen = len + sizeof(SMsgHead),
|
||||||
};
|
};
|
||||||
if (tmsgPutToQueue(&pVnode->msgCb, WRITE_QUEUE, &msg) != 0) {
|
if (tmsgPutToQueue(&pVnode->msgCb, WRITE_QUEUE, &msg) != 0) {
|
||||||
|
rpcFreeCont(serializedDeleteReq);
|
||||||
tqDebug("failed to put into write-queue since %s", terrstr());
|
tqDebug("failed to put into write-queue since %s", terrstr());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -268,11 +269,12 @@ void tqTableSink(SStreamTask* pTask, void* vnode, int64_t ver, void* data) {
|
||||||
// build write msg
|
// build write msg
|
||||||
SRpcMsg msg = {
|
SRpcMsg msg = {
|
||||||
.msgType = TDMT_VND_SUBMIT,
|
.msgType = TDMT_VND_SUBMIT,
|
||||||
.pCont = pReq,
|
.pCont = submitReq,
|
||||||
.contLen = ntohl(pReq->length),
|
.contLen = ntohl(submitReq->length),
|
||||||
};
|
};
|
||||||
|
|
||||||
if (tmsgPutToQueue(&pVnode->msgCb, WRITE_QUEUE, &msg) != 0) {
|
if (tmsgPutToQueue(&pVnode->msgCb, WRITE_QUEUE, &msg) != 0) {
|
||||||
|
rpcFreeCont(submitReq);
|
||||||
tqDebug("failed to put into write-queue since %s", terrstr());
|
tqDebug("failed to put into write-queue since %s", terrstr());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -32,7 +32,6 @@ typedef struct {
|
||||||
|
|
||||||
static SStreamGlobalEnv streamEnv;
|
static SStreamGlobalEnv streamEnv;
|
||||||
|
|
||||||
int32_t streamExec(SStreamTask* pTask);
|
|
||||||
int32_t streamPipelineExec(SStreamTask* pTask, int32_t batchNum, bool dispatch);
|
int32_t streamPipelineExec(SStreamTask* pTask, int32_t batchNum, bool dispatch);
|
||||||
|
|
||||||
int32_t streamDispatch(SStreamTask* pTask);
|
int32_t streamDispatch(SStreamTask* pTask);
|
||||||
|
|
|
@ -185,7 +185,9 @@ int32_t streamProcessDispatchReq(SStreamTask* pTask, SStreamDispatchReq* pReq, S
|
||||||
tFreeStreamDispatchReq(pReq);
|
tFreeStreamDispatchReq(pReq);
|
||||||
|
|
||||||
if (exec) {
|
if (exec) {
|
||||||
streamTryExec(pTask);
|
if (streamTryExec(pTask) < 0) {
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
if (pTask->outputType == TASK_OUTPUT__FIXED_DISPATCH || pTask->outputType == TASK_OUTPUT__SHUFFLE_DISPATCH) {
|
if (pTask->outputType == TASK_OUTPUT__FIXED_DISPATCH || pTask->outputType == TASK_OUTPUT__SHUFFLE_DISPATCH) {
|
||||||
streamDispatch(pTask);
|
streamDispatch(pTask);
|
||||||
|
@ -221,7 +223,9 @@ int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp) {
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t streamProcessRunReq(SStreamTask* pTask) {
|
int32_t streamProcessRunReq(SStreamTask* pTask) {
|
||||||
streamTryExec(pTask);
|
if (streamTryExec(pTask) < 0) {
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
if (pTask->outputType == TASK_OUTPUT__FIXED_DISPATCH || pTask->outputType == TASK_OUTPUT__SHUFFLE_DISPATCH) {
|
if (pTask->outputType == TASK_OUTPUT__FIXED_DISPATCH || pTask->outputType == TASK_OUTPUT__SHUFFLE_DISPATCH) {
|
||||||
streamDispatch(pTask);
|
streamDispatch(pTask);
|
||||||
|
|
|
@ -15,6 +15,7 @@
|
||||||
|
|
||||||
#include "executor.h"
|
#include "executor.h"
|
||||||
#include "tstream.h"
|
#include "tstream.h"
|
||||||
|
#include "ttimer.h"
|
||||||
|
|
||||||
SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandFunc) {
|
SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandFunc) {
|
||||||
SStreamMeta* pMeta = taosMemoryCalloc(1, sizeof(SStreamMeta));
|
SStreamMeta* pMeta = taosMemoryCalloc(1, sizeof(SStreamMeta));
|
||||||
|
@ -99,16 +100,19 @@ int32_t streamMetaAddSerializedTask(SStreamMeta* pMeta, int64_t startVer, char*
|
||||||
goto FAIL;
|
goto FAIL;
|
||||||
}
|
}
|
||||||
|
|
||||||
taosHashPut(pMeta->pTasks, &pTask->taskId, sizeof(int32_t), &pTask, sizeof(void*));
|
if (taosHashPut(pMeta->pTasks, &pTask->taskId, sizeof(int32_t), &pTask, sizeof(void*)) < 0) {
|
||||||
|
goto FAIL;
|
||||||
|
}
|
||||||
|
|
||||||
if (tdbTbUpsert(pMeta->pTaskDb, &pTask->taskId, sizeof(int32_t), msg, msgLen, &pMeta->txn) < 0) {
|
if (tdbTbUpsert(pMeta->pTaskDb, &pTask->taskId, sizeof(int32_t), msg, msgLen, &pMeta->txn) < 0) {
|
||||||
|
taosHashRemove(pMeta->pTasks, &pTask->taskId, sizeof(int32_t));
|
||||||
ASSERT(0);
|
ASSERT(0);
|
||||||
return -1;
|
goto FAIL;
|
||||||
}
|
}
|
||||||
return 0;
|
return 0;
|
||||||
|
|
||||||
FAIL:
|
FAIL:
|
||||||
if (pTask) taosMemoryFree(pTask);
|
if (pTask) tFreeSStreamTask(pTask);
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -158,11 +162,28 @@ int32_t streamMetaRemoveTask(SStreamMeta* pMeta, int32_t taskId) {
|
||||||
SStreamTask* pTask = *ppTask;
|
SStreamTask* pTask = *ppTask;
|
||||||
taosHashRemove(pMeta->pTasks, &taskId, sizeof(int32_t));
|
taosHashRemove(pMeta->pTasks, &taskId, sizeof(int32_t));
|
||||||
atomic_store_8(&pTask->taskStatus, TASK_STATUS__DROPPING);
|
atomic_store_8(&pTask->taskStatus, TASK_STATUS__DROPPING);
|
||||||
}
|
|
||||||
|
|
||||||
if (tdbTbDelete(pMeta->pTaskDb, &taskId, sizeof(int32_t), &pMeta->txn) < 0) {
|
if (tdbTbDelete(pMeta->pTaskDb, &taskId, sizeof(int32_t), &pMeta->txn) < 0) {
|
||||||
/*return -1;*/
|
/*return -1;*/
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (pTask->triggerParam != 0) {
|
||||||
|
taosTmrStop(pTask->timer);
|
||||||
|
}
|
||||||
|
|
||||||
|
while (1) {
|
||||||
|
int8_t schedStatus =
|
||||||
|
atomic_val_compare_exchange_8(&pTask->schedStatus, TASK_SCHED_STATUS__INACTIVE, TASK_SCHED_STATUS__DROPPING);
|
||||||
|
if (schedStatus == TASK_SCHED_STATUS__INACTIVE) {
|
||||||
|
tFreeSStreamTask(pTask);
|
||||||
|
break;
|
||||||
|
} else if (schedStatus == TASK_SCHED_STATUS__DROPPING) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
taosMsleep(10);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -13,7 +13,7 @@
|
||||||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||||
*/
|
*/
|
||||||
|
|
||||||
#include "tstream.h"
|
#include "streamInc.h"
|
||||||
|
|
||||||
SStreamQueue* streamQueueOpen() {
|
SStreamQueue* streamQueueOpen() {
|
||||||
SStreamQueue* pQueue = taosMemoryCalloc(1, sizeof(SStreamQueue));
|
SStreamQueue* pQueue = taosMemoryCalloc(1, sizeof(SStreamQueue));
|
||||||
|
@ -36,9 +36,12 @@ void streamQueueClose(SStreamQueue* queue) {
|
||||||
while (1) {
|
while (1) {
|
||||||
void* qItem = streamQueueNextItem(queue);
|
void* qItem = streamQueueNextItem(queue);
|
||||||
if (qItem) {
|
if (qItem) {
|
||||||
taosFreeQitem(qItem);
|
streamFreeQitem(qItem);
|
||||||
} else {
|
} else {
|
||||||
return;
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
taosFreeQall(queue->qall);
|
||||||
|
taosCloseQueue(queue->queue);
|
||||||
|
taosMemoryFree(queue);
|
||||||
}
|
}
|
||||||
|
|
|
@ -152,9 +152,17 @@ int32_t tDecodeSStreamTask(SDecoder* pDecoder, SStreamTask* pTask) {
|
||||||
}
|
}
|
||||||
|
|
||||||
void tFreeSStreamTask(SStreamTask* pTask) {
|
void tFreeSStreamTask(SStreamTask* pTask) {
|
||||||
streamQueueClose(pTask->inputQueue);
|
if (pTask->inputQueue) streamQueueClose(pTask->inputQueue);
|
||||||
streamQueueClose(pTask->outputQueue);
|
if (pTask->outputQueue) streamQueueClose(pTask->outputQueue);
|
||||||
if (pTask->exec.qmsg) taosMemoryFree(pTask->exec.qmsg);
|
if (pTask->exec.qmsg) taosMemoryFree(pTask->exec.qmsg);
|
||||||
if (pTask->exec.executor) qDestroyTask(pTask->exec.executor);
|
if (pTask->exec.executor) qDestroyTask(pTask->exec.executor);
|
||||||
|
taosArrayDestroy(pTask->childEpInfo);
|
||||||
|
if (pTask->outputType == TASK_OUTPUT__TABLE) {
|
||||||
|
tDeleteSSchemaWrapper(pTask->tbSink.pSchemaWrapper);
|
||||||
|
taosMemoryFree(pTask->tbSink.pTSchema);
|
||||||
|
}
|
||||||
|
if (pTask->outputType == TASK_OUTPUT__SHUFFLE_DISPATCH) {
|
||||||
|
taosArrayDestroy(pTask->shuffleDispatcher.dbInfo.pVgroupInfos);
|
||||||
|
}
|
||||||
taosMemoryFree(pTask);
|
taosMemoryFree(pTask);
|
||||||
}
|
}
|
||||||
|
|
|
@ -293,6 +293,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_MND_CGROUP_USED, "Consumer group being
|
||||||
TAOS_DEFINE_ERROR(TSDB_CODE_MND_STREAM_ALREADY_EXIST, "Stream already exists")
|
TAOS_DEFINE_ERROR(TSDB_CODE_MND_STREAM_ALREADY_EXIST, "Stream already exists")
|
||||||
TAOS_DEFINE_ERROR(TSDB_CODE_MND_STREAM_NOT_EXIST, "Stream not exist")
|
TAOS_DEFINE_ERROR(TSDB_CODE_MND_STREAM_NOT_EXIST, "Stream not exist")
|
||||||
TAOS_DEFINE_ERROR(TSDB_CODE_MND_INVALID_STREAM_OPTION, "Invalid stream option")
|
TAOS_DEFINE_ERROR(TSDB_CODE_MND_INVALID_STREAM_OPTION, "Invalid stream option")
|
||||||
|
TAOS_DEFINE_ERROR(TSDB_CODE_MND_STREAM_MUST_BE_DELETED, "Stream must be dropped first")
|
||||||
|
|
||||||
// mnode-sma
|
// mnode-sma
|
||||||
TAOS_DEFINE_ERROR(TSDB_CODE_MND_SMA_ALREADY_EXIST, "SMA already exists")
|
TAOS_DEFINE_ERROR(TSDB_CODE_MND_SMA_ALREADY_EXIST, "SMA already exists")
|
||||||
|
|
Loading…
Reference in New Issue