refactor: do some internal refactor.
This commit is contained in:
parent
0facde6ddc
commit
e53b5d4392
|
@ -189,7 +189,6 @@ int32_t streamInit();
|
||||||
void streamCleanUp();
|
void streamCleanUp();
|
||||||
|
|
||||||
SStreamQueue* streamQueueOpen(int64_t cap);
|
SStreamQueue* streamQueueOpen(int64_t cap);
|
||||||
void streamQueueCleanup(SStreamQueue* pQueue);
|
|
||||||
void streamQueueClose(SStreamQueue* pQueue, int32_t taskId);
|
void streamQueueClose(SStreamQueue* pQueue, int32_t taskId);
|
||||||
|
|
||||||
static FORCE_INLINE void streamQueueProcessSuccess(SStreamQueue* queue) {
|
static FORCE_INLINE void streamQueueProcessSuccess(SStreamQueue* queue) {
|
||||||
|
@ -424,8 +423,8 @@ int32_t streamTaskInit(SStreamTask* pTask, SStreamMeta* pMeta, SMsgCb* pMsg
|
||||||
|
|
||||||
int32_t tDecodeStreamTaskChkInfo(SDecoder* pDecoder, SCheckpointInfo* pChkpInfo);
|
int32_t tDecodeStreamTaskChkInfo(SDecoder* pDecoder, SCheckpointInfo* pChkpInfo);
|
||||||
|
|
||||||
int32_t tAppendDataToInputQueue(SStreamTask* pTask, SStreamQueueItem* pItem);
|
int32_t streamTaskPutDataIntoInputQ(SStreamTask* pTask, SStreamQueueItem* pItem);
|
||||||
bool tInputQueueIsFull(const SStreamTask* pTask);
|
bool streamQueueIsFull(const STaosQueue* pQueue);
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
SMsgHead head;
|
SMsgHead head;
|
||||||
|
|
|
@ -164,7 +164,7 @@ int32_t tqOffsetRestoreFromFile(STqOffsetStore* pStore, const char* fname);
|
||||||
// tqStream
|
// tqStream
|
||||||
int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver);
|
int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver);
|
||||||
int32_t tqScanWalForStreamTasks(STQ* pTq);
|
int32_t tqScanWalForStreamTasks(STQ* pTq);
|
||||||
int32_t tqSetStreamTasksReady(STQ* pTq);
|
int32_t tqCheckAndRunStreamTask(STQ* pTq);
|
||||||
int32_t tqStopStreamTasks(STQ* pTq);
|
int32_t tqStopStreamTasks(STQ* pTq);
|
||||||
|
|
||||||
// tq util
|
// tq util
|
||||||
|
|
|
@ -223,11 +223,11 @@ void tqClose(STQ*);
|
||||||
int tqPushMsg(STQ*, tmsg_t msgType);
|
int tqPushMsg(STQ*, tmsg_t msgType);
|
||||||
int tqRegisterPushHandle(STQ* pTq, void* handle, SRpcMsg* pMsg);
|
int tqRegisterPushHandle(STQ* pTq, void* handle, SRpcMsg* pMsg);
|
||||||
int tqUnregisterPushHandle(STQ* pTq, void* pHandle);
|
int tqUnregisterPushHandle(STQ* pTq, void* pHandle);
|
||||||
int tqStartStreamTasksAsync(STQ* pTq, bool ckPause); // restore all stream tasks after vnode launching completed.
|
int tqScanWalAsync(STQ* pTq, bool ckPause);
|
||||||
int32_t tqProcessStreamCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg);
|
int32_t tqProcessStreamCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg);
|
||||||
int32_t tqProcessStreamTaskCheckpointReadyMsg(STQ* pTq, SRpcMsg* pMsg);
|
int32_t tqProcessStreamTaskCheckpointReadyMsg(STQ* pTq, SRpcMsg* pMsg);
|
||||||
int32_t tqProcessTaskUpdateReq(STQ* pTq, SRpcMsg* pMsg);
|
int32_t tqProcessTaskUpdateReq(STQ* pTq, SRpcMsg* pMsg);
|
||||||
int32_t tqSetStreamTasksReadyAsync(STQ* pTq);
|
int32_t tqCheckAndRunStreamTaskAsync(STQ* pTq);
|
||||||
|
|
||||||
int tqCommit(STQ*);
|
int tqCommit(STQ*);
|
||||||
int32_t tqUpdateTbUidList(STQ* pTq, const SArray* tbUidList, bool isAdd);
|
int32_t tqUpdateTbUidList(STQ* pTq, const SArray* tbUidList, bool isAdd);
|
||||||
|
|
|
@ -1199,7 +1199,7 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) {
|
||||||
streamSetStatusNormal(pTask);
|
streamSetStatusNormal(pTask);
|
||||||
}
|
}
|
||||||
|
|
||||||
tqStartStreamTasksAsync(pTq, false);
|
tqScanWalAsync(pTq, false);
|
||||||
}
|
}
|
||||||
|
|
||||||
streamMetaReleaseTask(pMeta, pTask);
|
streamMetaReleaseTask(pMeta, pTask);
|
||||||
|
@ -1341,7 +1341,7 @@ int32_t tqProcessTaskRunReq(STQ* pTq, SRpcMsg* pMsg) {
|
||||||
int32_t vgId = TD_VID(pTq->pVnode);
|
int32_t vgId = TD_VID(pTq->pVnode);
|
||||||
|
|
||||||
if (taskId == STREAM_EXEC_TASK_STATUS_CHECK_ID) {
|
if (taskId == STREAM_EXEC_TASK_STATUS_CHECK_ID) {
|
||||||
tqSetStreamTasksReady(pTq);
|
tqCheckAndRunStreamTask(pTq);
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1365,7 +1365,7 @@ int32_t tqProcessTaskRunReq(STQ* pTq, SRpcMsg* pMsg) {
|
||||||
}
|
}
|
||||||
|
|
||||||
streamMetaReleaseTask(pTq->pStreamMeta, pTask);
|
streamMetaReleaseTask(pTq->pStreamMeta, pTask);
|
||||||
tqStartStreamTasksAsync(pTq, false);
|
tqScanWalAsync(pTq, false);
|
||||||
return 0;
|
return 0;
|
||||||
} else { // NOTE: pTask->status.schedStatus is not updated since it is not be handled by the run exec.
|
} else { // NOTE: pTask->status.schedStatus is not updated since it is not be handled by the run exec.
|
||||||
// todo add one function to handle this
|
// todo add one function to handle this
|
||||||
|
@ -1505,7 +1505,7 @@ int32_t tqProcessTaskResumeImpl(STQ* pTq, SStreamTask* pTask, int64_t sversion,
|
||||||
pTask->status.taskStatus == TASK_STATUS__SCAN_HISTORY) {
|
pTask->status.taskStatus == TASK_STATUS__SCAN_HISTORY) {
|
||||||
streamStartScanHistoryAsync(pTask, igUntreated);
|
streamStartScanHistoryAsync(pTask, igUntreated);
|
||||||
} else if (level == TASK_LEVEL__SOURCE && (taosQueueItemSize(pTask->inputQueue->queue) == 0)) {
|
} else if (level == TASK_LEVEL__SOURCE && (taosQueueItemSize(pTask->inputQueue->queue) == 0)) {
|
||||||
tqStartStreamTasksAsync(pTq, false);
|
tqScanWalAsync(pTq, false);
|
||||||
} else {
|
} else {
|
||||||
streamSchedExec(pTask);
|
streamSchedExec(pTask);
|
||||||
}
|
}
|
||||||
|
@ -1824,7 +1824,7 @@ _end:
|
||||||
taosWUnLockLatch(&pMeta->lock);
|
taosWUnLockLatch(&pMeta->lock);
|
||||||
if (vnodeIsRoleLeader(pTq->pVnode) && !tsDisableStream) {
|
if (vnodeIsRoleLeader(pTq->pVnode) && !tsDisableStream) {
|
||||||
vInfo("vgId:%d, restart all stream tasks", vgId);
|
vInfo("vgId:%d, restart all stream tasks", vgId);
|
||||||
tqSetStreamTasksReadyAsync(pTq);
|
tqCheckAndRunStreamTaskAsync(pTq);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -46,7 +46,7 @@ int32_t tqPushMsg(STQ* pTq, tmsg_t msgType) {
|
||||||
// 2. the vnode should be the leader.
|
// 2. the vnode should be the leader.
|
||||||
// 3. the stream is not suspended yet.
|
// 3. the stream is not suspended yet.
|
||||||
if ((!tsDisableStream) && (numOfTasks > 0) && (msgType == TDMT_VND_SUBMIT || msgType == TDMT_VND_DELETE)) {
|
if ((!tsDisableStream) && (numOfTasks > 0) && (msgType == TDMT_VND_SUBMIT || msgType == TDMT_VND_DELETE)) {
|
||||||
tqStartStreamTasksAsync(pTq, true);
|
tqScanWalAsync(pTq, true);
|
||||||
}
|
}
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
|
|
|
@ -56,7 +56,7 @@ int32_t tqScanWalForStreamTasks(STQ* pTq) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t tqSetStreamTasksReady(STQ* pTq) {
|
int32_t tqCheckAndRunStreamTask(STQ* pTq) {
|
||||||
int32_t vgId = TD_VID(pTq->pVnode);
|
int32_t vgId = TD_VID(pTq->pVnode);
|
||||||
SStreamMeta* pMeta = pTq->pStreamMeta;
|
SStreamMeta* pMeta = pTq->pStreamMeta;
|
||||||
|
|
||||||
|
@ -79,12 +79,12 @@ int32_t tqSetStreamTasksReady(STQ* pTq) {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// fill-history task can only be launched by related stream tasks.
|
||||||
if (pTask->info.fillHistory == 1) {
|
if (pTask->info.fillHistory == 1) {
|
||||||
streamMetaReleaseTask(pMeta, pTask);
|
streamMetaReleaseTask(pMeta, pTask);
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
// todo: how about the fill-history task?
|
|
||||||
if (pTask->status.downstreamReady == 1) {
|
if (pTask->status.downstreamReady == 1) {
|
||||||
tqDebug("s-task:%s downstream ready, no need to check downstream, check only related fill-history task",
|
tqDebug("s-task:%s downstream ready, no need to check downstream, check only related fill-history task",
|
||||||
pTask->id.idStr);
|
pTask->id.idStr);
|
||||||
|
@ -103,7 +103,7 @@ int32_t tqSetStreamTasksReady(STQ* pTq) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t tqSetStreamTasksReadyAsync(STQ* pTq) {
|
int32_t tqCheckAndRunStreamTaskAsync(STQ* pTq) {
|
||||||
int32_t vgId = TD_VID(pTq->pVnode);
|
int32_t vgId = TD_VID(pTq->pVnode);
|
||||||
SStreamMeta* pMeta = pTq->pStreamMeta;
|
SStreamMeta* pMeta = pTq->pStreamMeta;
|
||||||
|
|
||||||
|
@ -136,7 +136,7 @@ int32_t tqSetStreamTasksReadyAsync(STQ* pTq) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t tqStartStreamTasksAsync(STQ* pTq, bool ckPause) {
|
int32_t tqScanWalAsync(STQ* pTq, bool ckPause) {
|
||||||
int32_t vgId = TD_VID(pTq->pVnode);
|
int32_t vgId = TD_VID(pTq->pVnode);
|
||||||
SStreamMeta* pMeta = pTq->pStreamMeta;
|
SStreamMeta* pMeta = pTq->pStreamMeta;
|
||||||
|
|
||||||
|
@ -340,7 +340,7 @@ int32_t createStreamTaskRunReq(SStreamMeta* pStreamMeta, bool* pScanIdle) {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (tInputQueueIsFull(pTask)) {
|
if (streamQueueIsFull(pTask->inputQueue->queue)) {
|
||||||
tqTrace("s-task:%s input queue is full, do nothing", pTask->id.idStr);
|
tqTrace("s-task:%s input queue is full, do nothing", pTask->id.idStr);
|
||||||
streamMetaReleaseTask(pStreamMeta, pTask);
|
streamMetaReleaseTask(pStreamMeta, pTask);
|
||||||
continue;
|
continue;
|
||||||
|
@ -386,7 +386,7 @@ int32_t createStreamTaskRunReq(SStreamMeta* pStreamMeta, bool* pScanIdle) {
|
||||||
|
|
||||||
if (pItem != NULL) {
|
if (pItem != NULL) {
|
||||||
noDataInWal = false;
|
noDataInWal = false;
|
||||||
code = tAppendDataToInputQueue(pTask, pItem);
|
code = streamTaskPutDataIntoInputQ(pTask, pItem);
|
||||||
if (code == TSDB_CODE_SUCCESS) {
|
if (code == TSDB_CODE_SUCCESS) {
|
||||||
int64_t ver = walReaderGetCurrentVer(pTask->exec.pWalReader);
|
int64_t ver = walReaderGetCurrentVer(pTask->exec.pWalReader);
|
||||||
pTask->chkInfo.currentVer = ver;
|
pTask->chkInfo.currentVer = ver;
|
||||||
|
|
|
@ -560,7 +560,7 @@ static void vnodeRestoreFinish(const SSyncFSM *pFsm, const SyncIndex commitIdx)
|
||||||
vInfo("vgId:%d, not launch stream tasks, since stream tasks are disabled", vgId);
|
vInfo("vgId:%d, not launch stream tasks, since stream tasks are disabled", vgId);
|
||||||
} else {
|
} else {
|
||||||
vInfo("vgId:%d start to launch stream tasks", pVnode->config.vgId);
|
vInfo("vgId:%d start to launch stream tasks", pVnode->config.vgId);
|
||||||
tqSetStreamTasksReadyAsync(pVnode->pTq);
|
tqCheckAndRunStreamTaskAsync(pVnode->pTq);
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
vInfo("vgId:%d, sync restore finished, not launch stream tasks since not leader", vgId);
|
vInfo("vgId:%d, sync restore finished, not launch stream tasks since not leader", vgId);
|
||||||
|
|
|
@ -68,7 +68,7 @@ int32_t streamTaskSendCheckpointReadyMsg(SStreamTask* pTask);
|
||||||
int32_t streamTaskSendCheckpointSourceRsp(SStreamTask* pTask);
|
int32_t streamTaskSendCheckpointSourceRsp(SStreamTask* pTask);
|
||||||
int32_t streamTaskGetNumOfDownstream(const SStreamTask* pTask);
|
int32_t streamTaskGetNumOfDownstream(const SStreamTask* pTask);
|
||||||
|
|
||||||
int32_t extractBlocksFromInputQ(SStreamTask* pTask, SStreamQueueItem** pInput, int32_t* numOfBlocks);
|
int32_t streamTaskGetDataFromInputQ(SStreamTask* pTask, SStreamQueueItem** pInput, int32_t* numOfBlocks);
|
||||||
SStreamQueueItem* streamMergeQueueItem(SStreamQueueItem* dst, SStreamQueueItem* pElem);
|
SStreamQueueItem* streamMergeQueueItem(SStreamQueueItem* dst, SStreamQueueItem* pElem);
|
||||||
|
|
||||||
int32_t streamTaskBuildScanhistoryRspMsg(SStreamTask* pTask, SStreamScanHistoryFinishReq* pReq, void** pBuffer, int32_t* pLen);
|
int32_t streamTaskBuildScanhistoryRspMsg(SStreamTask* pTask, SStreamScanHistoryFinishReq* pReq, void** pBuffer, int32_t* pLen);
|
||||||
|
|
|
@ -16,9 +16,6 @@
|
||||||
#include "streamInt.h"
|
#include "streamInt.h"
|
||||||
#include "ttimer.h"
|
#include "ttimer.h"
|
||||||
|
|
||||||
#define STREAM_TASK_INPUT_QUEUE_CAPACITY 20480
|
|
||||||
#define STREAM_TASK_INPUT_QUEUE_CAPACITY_IN_SIZE (30)
|
|
||||||
#define QUEUE_MEM_SIZE_IN_MB(_q) (taosQueueMemorySize(_q) / ONE_MB_F)
|
|
||||||
SStreamGlobalEnv streamEnv;
|
SStreamGlobalEnv streamEnv;
|
||||||
|
|
||||||
int32_t streamInit() {
|
int32_t streamInit() {
|
||||||
|
@ -85,7 +82,7 @@ static void streamSchedByTimer(void* param, void* tmrId) {
|
||||||
|
|
||||||
atomic_store_8(&pTask->triggerStatus, TASK_TRIGGER_STATUS__INACTIVE);
|
atomic_store_8(&pTask->triggerStatus, TASK_TRIGGER_STATUS__INACTIVE);
|
||||||
pTrigger->pBlock->info.type = STREAM_GET_ALL;
|
pTrigger->pBlock->info.type = STREAM_GET_ALL;
|
||||||
if (tAppendDataToInputQueue(pTask, (SStreamQueueItem*)pTrigger) < 0) {
|
if (streamTaskPutDataIntoInputQ(pTask, (SStreamQueueItem*)pTrigger) < 0) {
|
||||||
taosFreeQitem(pTrigger);
|
taosFreeQitem(pTrigger);
|
||||||
taosTmrReset(streamSchedByTimer, (int32_t)pTask->triggerParam, pTask, streamEnv.timer, &pTask->schedTimer);
|
taosTmrReset(streamSchedByTimer, (int32_t)pTask->triggerParam, pTask, streamEnv.timer, &pTask->schedTimer);
|
||||||
return;
|
return;
|
||||||
|
@ -172,7 +169,7 @@ static int32_t streamTaskAppendInputBlocks(SStreamTask* pTask, const SStreamDisp
|
||||||
pTask->status.appendTranstateBlock = true;
|
pTask->status.appendTranstateBlock = true;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t code = tAppendDataToInputQueue(pTask, (SStreamQueueItem*)pBlock);
|
int32_t code = streamTaskPutDataIntoInputQ(pTask, (SStreamQueueItem*)pBlock);
|
||||||
// input queue is full, upstream is blocked now
|
// input queue is full, upstream is blocked now
|
||||||
status = (code == TSDB_CODE_SUCCESS) ? TASK_INPUT_STATUS__NORMAL : TASK_INPUT_STATUS__BLOCKED;
|
status = (code == TSDB_CODE_SUCCESS) ? TASK_INPUT_STATUS__NORMAL : TASK_INPUT_STATUS__BLOCKED;
|
||||||
}
|
}
|
||||||
|
@ -192,7 +189,7 @@ int32_t streamTaskEnqueueRetrieve(SStreamTask* pTask, SStreamRetrieveReq* pReq,
|
||||||
pData->type = STREAM_INPUT__DATA_RETRIEVE;
|
pData->type = STREAM_INPUT__DATA_RETRIEVE;
|
||||||
pData->srcVgId = 0;
|
pData->srcVgId = 0;
|
||||||
streamRetrieveReqToData(pReq, pData);
|
streamRetrieveReqToData(pReq, pData);
|
||||||
if (tAppendDataToInputQueue(pTask, (SStreamQueueItem*)pData) == 0) {
|
if (streamTaskPutDataIntoInputQ(pTask, (SStreamQueueItem*)pData) == 0) {
|
||||||
status = TASK_INPUT_STATUS__NORMAL;
|
status = TASK_INPUT_STATUS__NORMAL;
|
||||||
} else {
|
} else {
|
||||||
status = TASK_INPUT_STATUS__FAILED;
|
status = TASK_INPUT_STATUS__FAILED;
|
||||||
|
@ -239,47 +236,6 @@ int32_t streamTaskOutputResultBlock(SStreamTask* pTask, SStreamDataBlock* pBlock
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
// static int32_t streamTaskAppendInputBlocks(SStreamTask* pTask, const SStreamDispatchReq* pReq) {
|
|
||||||
// int8_t status = 0;
|
|
||||||
//
|
|
||||||
// SStreamDataBlock* pBlock = createStreamDataFromDispatchMsg(pReq, pReq->type, pReq->srcVgId);
|
|
||||||
// if (pBlock == NULL) {
|
|
||||||
// streamTaskInputFail(pTask);
|
|
||||||
// status = TASK_INPUT_STATUS__FAILED;
|
|
||||||
// qError("vgId:%d, s-task:%s failed to receive dispatch msg, reason: out of memory", pTask->pMeta->vgId,
|
|
||||||
// pTask->id.idStr);
|
|
||||||
// } else {
|
|
||||||
// if (pBlock->type == STREAM_INPUT__TRANS_STATE) {
|
|
||||||
// pTask->status.appendTranstateBlock = true;
|
|
||||||
// }
|
|
||||||
//
|
|
||||||
// int32_t code = tAppendDataToInputQueue(pTask, (SStreamQueueItem*)pBlock);
|
|
||||||
// // input queue is full, upstream is blocked now
|
|
||||||
// status = (code == TSDB_CODE_SUCCESS) ? TASK_INPUT_STATUS__NORMAL : TASK_INPUT_STATUS__BLOCKED;
|
|
||||||
// }
|
|
||||||
//
|
|
||||||
// return status;
|
|
||||||
// }
|
|
||||||
|
|
||||||
// static int32_t buildDispatchRsp(const SStreamTask* pTask, const SStreamDispatchReq* pReq, int32_t status, void**
|
|
||||||
// pBuf) {
|
|
||||||
// *pBuf = rpcMallocCont(sizeof(SMsgHead) + sizeof(SStreamDispatchRsp));
|
|
||||||
// if (*pBuf == NULL) {
|
|
||||||
// return TSDB_CODE_OUT_OF_MEMORY;
|
|
||||||
// }
|
|
||||||
//
|
|
||||||
// ((SMsgHead*)(*pBuf))->vgId = htonl(pReq->upstreamNodeId);
|
|
||||||
// SStreamDispatchRsp* pDispatchRsp = POINTER_SHIFT((*pBuf), sizeof(SMsgHead));
|
|
||||||
//
|
|
||||||
// pDispatchRsp->inputStatus = status;
|
|
||||||
// pDispatchRsp->streamId = htobe64(pReq->streamId);
|
|
||||||
// pDispatchRsp->upstreamNodeId = htonl(pReq->upstreamNodeId);
|
|
||||||
// pDispatchRsp->upstreamTaskId = htonl(pReq->upstreamTaskId);
|
|
||||||
// pDispatchRsp->downstreamNodeId = htonl(pTask->info.nodeId);
|
|
||||||
// pDispatchRsp->downstreamTaskId = htonl(pTask->id.taskId);
|
|
||||||
//
|
|
||||||
// return TSDB_CODE_SUCCESS;
|
|
||||||
// }
|
|
||||||
int32_t streamProcessDispatchMsg(SStreamTask* pTask, SStreamDispatchReq* pReq, SRpcMsg* pRsp, bool exec) {
|
int32_t streamProcessDispatchMsg(SStreamTask* pTask, SStreamDispatchReq* pReq, SRpcMsg* pRsp, bool exec) {
|
||||||
qDebug("s-task:%s receive dispatch msg from taskId:0x%x(vgId:%d), msgLen:%" PRId64, pTask->id.idStr,
|
qDebug("s-task:%s receive dispatch msg from taskId:0x%x(vgId:%d), msgLen:%" PRId64, pTask->id.idStr,
|
||||||
pReq->upstreamTaskId, pReq->upstreamNodeId, pReq->totalLen);
|
pReq->upstreamTaskId, pReq->upstreamNodeId, pReq->totalLen);
|
||||||
|
@ -343,98 +299,6 @@ int32_t streamProcessRetrieveReq(SStreamTask* pTask, SStreamRetrieveReq* pReq, S
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
bool tInputQueueIsFull(const SStreamTask* pTask) {
|
|
||||||
bool isFull = taosQueueItemSize((pTask->inputQueue->queue)) >= STREAM_TASK_INPUT_QUEUE_CAPACITY;
|
|
||||||
double size = QUEUE_MEM_SIZE_IN_MB(pTask->inputQueue->queue);
|
|
||||||
return (isFull || size >= STREAM_TASK_INPUT_QUEUE_CAPACITY_IN_SIZE);
|
|
||||||
}
|
|
||||||
|
|
||||||
int32_t tAppendDataToInputQueue(SStreamTask* pTask, SStreamQueueItem* pItem) {
|
|
||||||
int8_t type = pItem->type;
|
|
||||||
int32_t total = taosQueueItemSize(pTask->inputQueue->queue) + 1;
|
|
||||||
double size = QUEUE_MEM_SIZE_IN_MB(pTask->inputQueue->queue);
|
|
||||||
|
|
||||||
if (type == STREAM_INPUT__DATA_SUBMIT) {
|
|
||||||
SStreamDataSubmit* px = (SStreamDataSubmit*)pItem;
|
|
||||||
if ((pTask->info.taskLevel == TASK_LEVEL__SOURCE) && tInputQueueIsFull(pTask)) {
|
|
||||||
qError(
|
|
||||||
"s-task:%s input queue is full, capacity(size:%d num:%dMiB), current(blocks:%d, size:%.2fMiB) stop to push "
|
|
||||||
"data",
|
|
||||||
pTask->id.idStr, STREAM_TASK_INPUT_QUEUE_CAPACITY, STREAM_TASK_INPUT_QUEUE_CAPACITY_IN_SIZE, total, size);
|
|
||||||
streamDataSubmitDestroy(px);
|
|
||||||
taosFreeQitem(pItem);
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
|
|
||||||
int32_t msgLen = px->submit.msgLen;
|
|
||||||
int64_t ver = px->submit.ver;
|
|
||||||
|
|
||||||
int32_t code = taosWriteQitem(pTask->inputQueue->queue, pItem);
|
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
|
||||||
streamDataSubmitDestroy(px);
|
|
||||||
taosFreeQitem(pItem);
|
|
||||||
return code;
|
|
||||||
}
|
|
||||||
|
|
||||||
// use the local variable to avoid the pItem be freed by other threads, since it has been put into queue already.
|
|
||||||
qDebug("s-task:%s submit enqueue msgLen:%d ver:%" PRId64 ", total in queue:%d, size:%.2fMiB", pTask->id.idStr,
|
|
||||||
msgLen, ver, total, size + SIZE_IN_MB(msgLen));
|
|
||||||
} else if (type == STREAM_INPUT__DATA_BLOCK || type == STREAM_INPUT__DATA_RETRIEVE ||
|
|
||||||
type == STREAM_INPUT__REF_DATA_BLOCK) {
|
|
||||||
if (/*(pTask->info.taskLevel == TASK_LEVEL__SOURCE) && */ (tInputQueueIsFull(pTask))) {
|
|
||||||
qError("s-task:%s input queue is full, capacity:%d size:%d MiB, current(blocks:%d, size:%.2fMiB) abort",
|
|
||||||
pTask->id.idStr, STREAM_TASK_INPUT_QUEUE_CAPACITY, STREAM_TASK_INPUT_QUEUE_CAPACITY_IN_SIZE, total, size);
|
|
||||||
destroyStreamDataBlock((SStreamDataBlock*)pItem);
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
|
|
||||||
qDebug("s-task:%s blockdata enqueue, total in queue:%d, size:%.2fMiB", pTask->id.idStr, total, size);
|
|
||||||
int32_t code = taosWriteQitem(pTask->inputQueue->queue, pItem);
|
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
|
||||||
destroyStreamDataBlock((SStreamDataBlock*)pItem);
|
|
||||||
return code;
|
|
||||||
}
|
|
||||||
} else if (type == STREAM_INPUT__CHECKPOINT || type == STREAM_INPUT__CHECKPOINT_TRIGGER ||
|
|
||||||
type == STREAM_INPUT__TRANS_STATE) {
|
|
||||||
taosWriteQitem(pTask->inputQueue->queue, pItem);
|
|
||||||
qDebug("s-task:%s level:%d %s blockdata enqueue, total in queue:%d, size:%.2fMiB", pTask->id.idStr,
|
|
||||||
pTask->info.taskLevel, streamGetBlockTypeStr(type), total, size);
|
|
||||||
} else if (type == STREAM_INPUT__GET_RES) {
|
|
||||||
// use the default memory limit, refactor later.
|
|
||||||
taosWriteQitem(pTask->inputQueue->queue, pItem);
|
|
||||||
qDebug("s-task:%s data res enqueue, current(blocks:%d, size:%.2fMiB)", pTask->id.idStr, total, size);
|
|
||||||
} else {
|
|
||||||
ASSERT(0);
|
|
||||||
}
|
|
||||||
|
|
||||||
if (type != STREAM_INPUT__GET_RES && type != STREAM_INPUT__CHECKPOINT && pTask->triggerParam != 0) {
|
|
||||||
atomic_val_compare_exchange_8(&pTask->triggerStatus, TASK_TRIGGER_STATUS__INACTIVE, TASK_TRIGGER_STATUS__ACTIVE);
|
|
||||||
qDebug("s-task:%s new data arrived, active the trigger, triggerStatus:%d", pTask->id.idStr, pTask->triggerStatus);
|
|
||||||
}
|
|
||||||
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
static void* streamQueueCurItem(SStreamQueue* queue) { return queue->qItem; }
|
|
||||||
|
|
||||||
void* streamQueueNextItem(SStreamQueue* pQueue) {
|
|
||||||
int8_t flag = atomic_exchange_8(&pQueue->status, STREAM_QUEUE__PROCESSING);
|
|
||||||
|
|
||||||
if (flag == STREAM_QUEUE__FAILED) {
|
|
||||||
ASSERT(pQueue->qItem != NULL);
|
|
||||||
return streamQueueCurItem(pQueue);
|
|
||||||
} else {
|
|
||||||
pQueue->qItem = NULL;
|
|
||||||
taosGetQitem(pQueue->qall, &pQueue->qItem);
|
|
||||||
if (pQueue->qItem == NULL) {
|
|
||||||
taosReadAllQitems(pQueue->queue, pQueue->qall);
|
|
||||||
taosGetQitem(pQueue->qall, &pQueue->qItem);
|
|
||||||
}
|
|
||||||
|
|
||||||
return streamQueueCurItem(pQueue);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
void streamTaskInputFail(SStreamTask* pTask) { atomic_store_8(&pTask->inputStatus, TASK_INPUT_STATUS__FAILED); }
|
void streamTaskInputFail(SStreamTask* pTask) { atomic_store_8(&pTask->inputStatus, TASK_INPUT_STATUS__FAILED); }
|
||||||
|
|
||||||
void streamTaskOpenAllUpstreamInput(SStreamTask* pTask) {
|
void streamTaskOpenAllUpstreamInput(SStreamTask* pTask) {
|
||||||
|
|
|
@ -124,7 +124,7 @@ static int32_t appendCheckpointIntoInputQ(SStreamTask* pTask, int32_t checkpoint
|
||||||
taosArrayPush(pChkpoint->blocks, pBlock);
|
taosArrayPush(pChkpoint->blocks, pBlock);
|
||||||
|
|
||||||
taosMemoryFree(pBlock);
|
taosMemoryFree(pBlock);
|
||||||
if (tAppendDataToInputQueue(pTask, (SStreamQueueItem*)pChkpoint) < 0) {
|
if (streamTaskPutDataIntoInputQ(pTask, (SStreamQueueItem*)pChkpoint) < 0) {
|
||||||
taosFreeQitem(pChkpoint);
|
taosFreeQitem(pChkpoint);
|
||||||
return TSDB_CODE_OUT_OF_MEMORY;
|
return TSDB_CODE_OUT_OF_MEMORY;
|
||||||
}
|
}
|
||||||
|
|
|
@ -166,6 +166,7 @@ int32_t streamMergeSubmit(SStreamMergedSubmit* pMerged, SStreamDataSubmit* pSubm
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// todo handle memory error
|
||||||
SStreamQueueItem* streamMergeQueueItem(SStreamQueueItem* dst, SStreamQueueItem* pElem) {
|
SStreamQueueItem* streamMergeQueueItem(SStreamQueueItem* dst, SStreamQueueItem* pElem) {
|
||||||
terrno = 0;
|
terrno = 0;
|
||||||
|
|
||||||
|
@ -195,7 +196,7 @@ SStreamQueueItem* streamMergeQueueItem(SStreamQueueItem* dst, SStreamQueueItem*
|
||||||
taosFreeQitem(pElem);
|
taosFreeQitem(pElem);
|
||||||
return (SStreamQueueItem*)pMerged;
|
return (SStreamQueueItem*)pMerged;
|
||||||
} else {
|
} else {
|
||||||
qDebug("block type:%d not merged with existed blocks list, type:%d", pElem->type, dst->type);
|
qDebug("block type:%d not merged with existed blocks list, type:%d", streamGetBlockTypeStr(pElem->type), dst->type);
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -368,7 +368,7 @@ int32_t streamDoTransferStateToStreamTask(SStreamTask* pTask) {
|
||||||
pDelBlock->info.version = 0;
|
pDelBlock->info.version = 0;
|
||||||
pItem->type = STREAM_INPUT__REF_DATA_BLOCK;
|
pItem->type = STREAM_INPUT__REF_DATA_BLOCK;
|
||||||
pItem->pBlock = pDelBlock;
|
pItem->pBlock = pDelBlock;
|
||||||
int32_t code = tAppendDataToInputQueue(pStreamTask, (SStreamQueueItem*)pItem);
|
int32_t code = streamTaskPutDataIntoInputQ(pStreamTask, (SStreamQueueItem*)pItem);
|
||||||
qDebug("s-task:%s append dummy delete block,res:%d", pStreamTask->id.idStr, code);
|
qDebug("s-task:%s append dummy delete block,res:%d", pStreamTask->id.idStr, code);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -517,7 +517,7 @@ int32_t streamExecForAll(SStreamTask* pTask) {
|
||||||
// merge multiple input data if possible in the input queue.
|
// merge multiple input data if possible in the input queue.
|
||||||
qDebug("s-task:%s start to extract data block from inputQ", id);
|
qDebug("s-task:%s start to extract data block from inputQ", id);
|
||||||
|
|
||||||
/*int32_t code = */ extractBlocksFromInputQ(pTask, &pInput, &numOfBlocks);
|
/*int32_t code = */ streamTaskGetDataFromInputQ(pTask, &pInput, &numOfBlocks);
|
||||||
if (pInput == NULL) {
|
if (pInput == NULL) {
|
||||||
ASSERT(numOfBlocks == 0);
|
ASSERT(numOfBlocks == 0);
|
||||||
return 0;
|
return 0;
|
||||||
|
|
|
@ -17,6 +17,28 @@
|
||||||
|
|
||||||
#define MAX_STREAM_EXEC_BATCH_NUM 32
|
#define MAX_STREAM_EXEC_BATCH_NUM 32
|
||||||
#define MIN_STREAM_EXEC_BATCH_NUM 4
|
#define MIN_STREAM_EXEC_BATCH_NUM 4
|
||||||
|
#define STREAM_TASK_INPUT_QUEUE_CAPACITY 20480
|
||||||
|
#define STREAM_TASK_INPUT_QUEUE_CAPACITY_IN_SIZE (30)
|
||||||
|
#define QUEUE_MEM_SIZE_IN_MB(_q) (taosQueueMemorySize(_q) / ONE_MB_F)
|
||||||
|
|
||||||
|
// todo refactor:
|
||||||
|
// read data from input queue
|
||||||
|
typedef struct SQueueReader {
|
||||||
|
SStreamQueue* pQueue;
|
||||||
|
int32_t taskLevel;
|
||||||
|
int32_t maxBlocks; // maximum block in one batch
|
||||||
|
int32_t waitDuration; // maximum wait time to format several block into a batch to process, unit: ms
|
||||||
|
} SQueueReader;
|
||||||
|
|
||||||
|
static void streamQueueCleanup(SStreamQueue* pQueue) {
|
||||||
|
void* qItem = NULL;
|
||||||
|
while ((qItem = streamQueueNextItem(pQueue)) != NULL) {
|
||||||
|
streamFreeQitem(qItem);
|
||||||
|
}
|
||||||
|
pQueue->status = STREAM_QUEUE__SUCESS;
|
||||||
|
}
|
||||||
|
|
||||||
|
static void* streamQueueCurItem(SStreamQueue* queue) { return queue->qItem; }
|
||||||
|
|
||||||
SStreamQueue* streamQueueOpen(int64_t cap) {
|
SStreamQueue* streamQueueOpen(int64_t cap) {
|
||||||
SStreamQueue* pQueue = taosMemoryCalloc(1, sizeof(SStreamQueue));
|
SStreamQueue* pQueue = taosMemoryCalloc(1, sizeof(SStreamQueue));
|
||||||
|
@ -40,21 +62,6 @@ SStreamQueue* streamQueueOpen(int64_t cap) {
|
||||||
return pQueue;
|
return pQueue;
|
||||||
}
|
}
|
||||||
|
|
||||||
void streamQueueCleanup(SStreamQueue* pQueue) {
|
|
||||||
void* qItem = NULL;
|
|
||||||
while ((qItem = streamQueueNextItem(pQueue)) != NULL) {
|
|
||||||
streamFreeQitem(qItem);
|
|
||||||
}
|
|
||||||
pQueue->status = STREAM_QUEUE__SUCESS;
|
|
||||||
}
|
|
||||||
// void streamQueueClose(SStreamQueue* pQueue) {
|
|
||||||
// streamQueueCleanup(pQueue);
|
|
||||||
|
|
||||||
// taosFreeQall(pQueue->qall);
|
|
||||||
// taosCloseQueue(pQueue->queue);
|
|
||||||
// taosMemoryFree(pQueue);
|
|
||||||
// }
|
|
||||||
|
|
||||||
void streamQueueClose(SStreamQueue* pQueue, int32_t taskId) {
|
void streamQueueClose(SStreamQueue* pQueue, int32_t taskId) {
|
||||||
qDebug("s-task:0x%x free the queue:%p, items in queue:%d", taskId, pQueue->queue, taosQueueItemSize(pQueue->queue));
|
qDebug("s-task:0x%x free the queue:%p, items in queue:%d", taskId, pQueue->queue, taosQueueItemSize(pQueue->queue));
|
||||||
streamQueueCleanup(pQueue);
|
streamQueueCleanup(pQueue);
|
||||||
|
@ -64,6 +71,24 @@ void streamQueueClose(SStreamQueue* pQueue, int32_t taskId) {
|
||||||
taosMemoryFree(pQueue);
|
taosMemoryFree(pQueue);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void* streamQueueNextItem(SStreamQueue* pQueue) {
|
||||||
|
int8_t flag = atomic_exchange_8(&pQueue->status, STREAM_QUEUE__PROCESSING);
|
||||||
|
|
||||||
|
if (flag == STREAM_QUEUE__FAILED) {
|
||||||
|
ASSERT(pQueue->qItem != NULL);
|
||||||
|
return streamQueueCurItem(pQueue);
|
||||||
|
} else {
|
||||||
|
pQueue->qItem = NULL;
|
||||||
|
taosGetQitem(pQueue->qall, &pQueue->qItem);
|
||||||
|
if (pQueue->qItem == NULL) {
|
||||||
|
taosReadAllQitems(pQueue->queue, pQueue->qall);
|
||||||
|
taosGetQitem(pQueue->qall, &pQueue->qItem);
|
||||||
|
}
|
||||||
|
|
||||||
|
return streamQueueCurItem(pQueue);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
#if 0
|
#if 0
|
||||||
bool streamQueueResEmpty(const SStreamQueueRes* pRes) {
|
bool streamQueueResEmpty(const SStreamQueueRes* pRes) {
|
||||||
//
|
//
|
||||||
|
@ -122,64 +147,13 @@ SStreamQueueRes streamQueueGetRes(SStreamQueue1* pQueue) {
|
||||||
}
|
}
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
// todo refactor:
|
bool streamQueueIsFull(const STaosQueue* pQueue) {
|
||||||
// read data from input queue
|
bool isFull = taosQueueItemSize((STaosQueue*) pQueue) >= STREAM_TASK_INPUT_QUEUE_CAPACITY;
|
||||||
typedef struct SQueueReader {
|
double size = QUEUE_MEM_SIZE_IN_MB((STaosQueue*) pQueue);
|
||||||
SStreamQueue* pQueue;
|
return (isFull || size >= STREAM_TASK_INPUT_QUEUE_CAPACITY_IN_SIZE);
|
||||||
int32_t taskLevel;
|
|
||||||
int32_t maxBlocks; // maximum block in one batch
|
|
||||||
int32_t waitDuration; // maximum wait time to format several block into a batch to process, unit: ms
|
|
||||||
} SQueueReader;
|
|
||||||
|
|
||||||
#if 0
|
|
||||||
SStreamQueueItem* doReadMultiBlocksFromQueue(SQueueReader* pReader, const char* idstr) {
|
|
||||||
int32_t numOfBlocks = 0;
|
|
||||||
int32_t tryCount = 0;
|
|
||||||
SStreamQueueItem* pRet = NULL;
|
|
||||||
|
|
||||||
while (1) {
|
|
||||||
SStreamQueueItem* qItem = streamQueueNextItem(pReader->pQueue);
|
|
||||||
if (qItem == NULL) {
|
|
||||||
if (pReader->taskLevel == TASK_LEVEL__SOURCE && numOfBlocks < MIN_STREAM_EXEC_BATCH_NUM && tryCount < pReader->waitDuration) {
|
|
||||||
tryCount++;
|
|
||||||
taosMsleep(1);
|
|
||||||
qDebug("try again batchSize:%d", numOfBlocks);
|
|
||||||
continue;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
qDebug("break batchSize:%d", numOfBlocks);
|
int32_t streamTaskGetDataFromInputQ(SStreamTask* pTask, SStreamQueueItem** pInput, int32_t* numOfBlocks) {
|
||||||
break;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (pRet == NULL) {
|
|
||||||
pRet = qItem;
|
|
||||||
streamQueueProcessSuccess(pReader->pQueue);
|
|
||||||
if (pReader->taskLevel == TASK_LEVEL__SINK) {
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
// todo we need to sort the data block, instead of just appending into the array list.
|
|
||||||
void* newRet = NULL;
|
|
||||||
if ((newRet = streamMergeQueueItem(pRet, qItem)) == NULL) {
|
|
||||||
streamQueueProcessFail(pReader->pQueue);
|
|
||||||
break;
|
|
||||||
} else {
|
|
||||||
numOfBlocks++;
|
|
||||||
pRet = newRet;
|
|
||||||
streamQueueProcessSuccess(pReader->pQueue);
|
|
||||||
if (numOfBlocks > pReader->maxBlocks) {
|
|
||||||
qDebug("maximum blocks limit:%d reached, processing, %s", pReader->maxBlocks, idstr);
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return pRet;
|
|
||||||
}
|
|
||||||
#endif
|
|
||||||
|
|
||||||
int32_t extractBlocksFromInputQ(SStreamTask* pTask, SStreamQueueItem** pInput, int32_t* numOfBlocks) {
|
|
||||||
int32_t retryTimes = 0;
|
int32_t retryTimes = 0;
|
||||||
int32_t MAX_RETRY_TIMES = 5;
|
int32_t MAX_RETRY_TIMES = 5;
|
||||||
const char* id = pTask->id.idStr;
|
const char* id = pTask->id.idStr;
|
||||||
|
@ -205,7 +179,6 @@ int32_t extractBlocksFromInputQ(SStreamTask* pTask, SStreamQueueItem** pInput, i
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// non sink task
|
|
||||||
while (1) {
|
while (1) {
|
||||||
if (streamTaskShouldPause(&pTask->status) || streamTaskShouldStop(&pTask->status)) {
|
if (streamTaskShouldPause(&pTask->status) || streamTaskShouldStop(&pTask->status)) {
|
||||||
qDebug("s-task:%s task should pause, extract input blocks:%d", pTask->id.idStr, *numOfBlocks);
|
qDebug("s-task:%s task should pause, extract input blocks:%d", pTask->id.idStr, *numOfBlocks);
|
||||||
|
@ -227,24 +200,17 @@ int32_t extractBlocksFromInputQ(SStreamTask* pTask, SStreamQueueItem** pInput, i
|
||||||
// do not merge blocks for sink node and check point data block
|
// do not merge blocks for sink node and check point data block
|
||||||
if (qItem->type == STREAM_INPUT__CHECKPOINT || qItem->type == STREAM_INPUT__CHECKPOINT_TRIGGER ||
|
if (qItem->type == STREAM_INPUT__CHECKPOINT || qItem->type == STREAM_INPUT__CHECKPOINT_TRIGGER ||
|
||||||
qItem->type == STREAM_INPUT__TRANS_STATE) {
|
qItem->type == STREAM_INPUT__TRANS_STATE) {
|
||||||
if (*pInput == NULL) {
|
const char* p = streamGetBlockTypeStr(qItem->type);
|
||||||
char* p = NULL;
|
|
||||||
if (qItem->type == STREAM_INPUT__CHECKPOINT) {
|
|
||||||
p = "checkpoint";
|
|
||||||
} else if (qItem->type == STREAM_INPUT__CHECKPOINT_TRIGGER) {
|
|
||||||
p = "checkpoint-trigger";
|
|
||||||
} else {
|
|
||||||
p = "transtate";
|
|
||||||
}
|
|
||||||
|
|
||||||
|
if (*pInput == NULL) {
|
||||||
qDebug("s-task:%s %s msg extracted, start to process immediately", id, p);
|
qDebug("s-task:%s %s msg extracted, start to process immediately", id, p);
|
||||||
|
|
||||||
*numOfBlocks = 1;
|
*numOfBlocks = 1;
|
||||||
*pInput = qItem;
|
*pInput = qItem;
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
} else {
|
} else {
|
||||||
// previous existed blocks needs to be handle, before handle the checkpoint msg block
|
// previous existed blocks needs to be handle, before handle the checkpoint msg block
|
||||||
qDebug("s-task:%s checkpoint/transtate msg extracted, handle previous blocks, numOfBlocks:%d", id,
|
qDebug("s-task:%s %s msg extracted, handle previous blocks, numOfBlocks:%d", id, p, *numOfBlocks);
|
||||||
*numOfBlocks);
|
|
||||||
streamQueueProcessFail(pTask->inputQueue);
|
streamQueueProcessFail(pTask->inputQueue);
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
@ -256,7 +222,11 @@ int32_t extractBlocksFromInputQ(SStreamTask* pTask, SStreamQueueItem** pInput, i
|
||||||
// todo we need to sort the data block, instead of just appending into the array list.
|
// todo we need to sort the data block, instead of just appending into the array list.
|
||||||
void* newRet = streamMergeQueueItem(*pInput, qItem);
|
void* newRet = streamMergeQueueItem(*pInput, qItem);
|
||||||
if (newRet == NULL) {
|
if (newRet == NULL) {
|
||||||
qError("s-task:%s failed to merge blocks from inputQ, numOfBlocks:%d", id, *numOfBlocks);
|
if (terrno != 0) {
|
||||||
|
qError("s-task:%s failed to merge blocks from inputQ, numOfBlocks:%d, code:%s", id, *numOfBlocks,
|
||||||
|
tstrerror(terrno));
|
||||||
|
}
|
||||||
|
|
||||||
streamQueueProcessFail(pTask->inputQueue);
|
streamQueueProcessFail(pTask->inputQueue);
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
@ -274,3 +244,68 @@ int32_t extractBlocksFromInputQ(SStreamTask* pTask, SStreamQueueItem** pInput, i
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int32_t streamTaskPutDataIntoInputQ(SStreamTask* pTask, SStreamQueueItem* pItem) {
|
||||||
|
int8_t type = pItem->type;
|
||||||
|
int32_t total = taosQueueItemSize(pTask->inputQueue->queue) + 1;
|
||||||
|
double size = QUEUE_MEM_SIZE_IN_MB(pTask->inputQueue->queue);
|
||||||
|
|
||||||
|
if (type == STREAM_INPUT__DATA_SUBMIT) {
|
||||||
|
SStreamDataSubmit* px = (SStreamDataSubmit*)pItem;
|
||||||
|
if ((pTask->info.taskLevel == TASK_LEVEL__SOURCE) && streamQueueIsFull(pTask->inputQueue->queue)) {
|
||||||
|
qError(
|
||||||
|
"s-task:%s inputQ is full, capacity(size:%d num:%dMiB), current(blocks:%d, size:%.2fMiB) stop to push data",
|
||||||
|
pTask->id.idStr, STREAM_TASK_INPUT_QUEUE_CAPACITY, STREAM_TASK_INPUT_QUEUE_CAPACITY_IN_SIZE, total, size);
|
||||||
|
streamDataSubmitDestroy(px);
|
||||||
|
taosFreeQitem(pItem);
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t msgLen = px->submit.msgLen;
|
||||||
|
int64_t ver = px->submit.ver;
|
||||||
|
|
||||||
|
int32_t code = taosWriteQitem(pTask->inputQueue->queue, pItem);
|
||||||
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
streamDataSubmitDestroy(px);
|
||||||
|
taosFreeQitem(pItem);
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
|
// use the local variable to avoid the pItem be freed by other threads, since it has been put into queue already.
|
||||||
|
qDebug("s-task:%s submit enqueue msgLen:%d ver:%" PRId64 ", total in queue:%d, size:%.2fMiB", pTask->id.idStr,
|
||||||
|
msgLen, ver, total, size + SIZE_IN_MB(msgLen));
|
||||||
|
} else if (type == STREAM_INPUT__DATA_BLOCK || type == STREAM_INPUT__DATA_RETRIEVE ||
|
||||||
|
type == STREAM_INPUT__REF_DATA_BLOCK) {
|
||||||
|
if (streamQueueIsFull(pTask->inputQueue->queue)) {
|
||||||
|
qError("s-task:%s input queue is full, capacity:%d size:%d MiB, current(blocks:%d, size:%.2fMiB) abort",
|
||||||
|
pTask->id.idStr, STREAM_TASK_INPUT_QUEUE_CAPACITY, STREAM_TASK_INPUT_QUEUE_CAPACITY_IN_SIZE, total, size);
|
||||||
|
destroyStreamDataBlock((SStreamDataBlock*)pItem);
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
qDebug("s-task:%s blockdata enqueue, total in queue:%d, size:%.2fMiB", pTask->id.idStr, total, size);
|
||||||
|
int32_t code = taosWriteQitem(pTask->inputQueue->queue, pItem);
|
||||||
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
destroyStreamDataBlock((SStreamDataBlock*)pItem);
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
} else if (type == STREAM_INPUT__CHECKPOINT || type == STREAM_INPUT__CHECKPOINT_TRIGGER ||
|
||||||
|
type == STREAM_INPUT__TRANS_STATE) {
|
||||||
|
taosWriteQitem(pTask->inputQueue->queue, pItem);
|
||||||
|
qDebug("s-task:%s level:%d %s blockdata enqueue, total in queue:%d, size:%.2fMiB", pTask->id.idStr,
|
||||||
|
pTask->info.taskLevel, streamGetBlockTypeStr(type), total, size);
|
||||||
|
} else if (type == STREAM_INPUT__GET_RES) {
|
||||||
|
// use the default memory limit, refactor later.
|
||||||
|
taosWriteQitem(pTask->inputQueue->queue, pItem);
|
||||||
|
qDebug("s-task:%s data res enqueue, current(blocks:%d, size:%.2fMiB)", pTask->id.idStr, total, size);
|
||||||
|
} else {
|
||||||
|
ASSERT(0);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (type != STREAM_INPUT__GET_RES && type != STREAM_INPUT__CHECKPOINT && pTask->triggerParam != 0) {
|
||||||
|
atomic_val_compare_exchange_8(&pTask->triggerStatus, TASK_TRIGGER_STATUS__INACTIVE, TASK_TRIGGER_STATUS__ACTIVE);
|
||||||
|
qDebug("s-task:%s new data arrived, active the trigger, triggerStatus:%d", pTask->id.idStr, pTask->triggerStatus);
|
||||||
|
}
|
||||||
|
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
|
@ -388,7 +388,7 @@ int32_t appendTranstateIntoInputQ(SStreamTask* pTask) {
|
||||||
taosArrayPush(pTranstate->blocks, pBlock);
|
taosArrayPush(pTranstate->blocks, pBlock);
|
||||||
|
|
||||||
taosMemoryFree(pBlock);
|
taosMemoryFree(pBlock);
|
||||||
if (tAppendDataToInputQueue(pTask, (SStreamQueueItem*)pTranstate) < 0) {
|
if (streamTaskPutDataIntoInputQ(pTask, (SStreamQueueItem*)pTranstate) < 0) {
|
||||||
taosFreeQitem(pTranstate);
|
taosFreeQitem(pTranstate);
|
||||||
return TSDB_CODE_OUT_OF_MEMORY;
|
return TSDB_CODE_OUT_OF_MEMORY;
|
||||||
}
|
}
|
||||||
|
@ -624,7 +624,12 @@ int32_t streamLaunchFillHistoryTask(SStreamTask* pTask) {
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if ((*pHTask)->status.downstreamReady == 1) {
|
||||||
|
qDebug("s-task:%s fill-history task is ready, no need to check downstream", (*pHTask)->id.idStr);
|
||||||
|
} else {
|
||||||
checkFillhistoryTaskStatus(pTask, *pHTask);
|
checkFillhistoryTaskStatus(pTask, *pHTask);
|
||||||
|
}
|
||||||
|
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -27,7 +27,7 @@ class TDTestCase:
|
||||||
def init(self, conn, logSql, replicaVar=1):
|
def init(self, conn, logSql, replicaVar=1):
|
||||||
tdLog.debug(f"start to excute {__file__}")
|
tdLog.debug(f"start to excute {__file__}")
|
||||||
self.TDDnodes = None
|
self.TDDnodes = None
|
||||||
tdSql.init(conn.cursor())
|
tdSql.init(conn.cursor(), True)
|
||||||
self.host = socket.gethostname()
|
self.host = socket.gethostname()
|
||||||
|
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue