refactor: do some internal refactor.
This commit is contained in:
parent
13c20facd0
commit
f6a097d96f
|
@ -434,8 +434,9 @@ int32_t tDecodeStreamTaskChkInfo(SDecoder* pDecoder, SCheckpointInfo* pChkpInfo)
|
||||||
int32_t tDecodeStreamTaskId(SDecoder* pDecoder, SStreamTaskId* pTaskId);
|
int32_t tDecodeStreamTaskId(SDecoder* pDecoder, SStreamTaskId* pTaskId);
|
||||||
|
|
||||||
int32_t streamTaskPutDataIntoInputQ(SStreamTask* pTask, SStreamQueueItem* pItem);
|
int32_t streamTaskPutDataIntoInputQ(SStreamTask* pTask, SStreamQueueItem* pItem);
|
||||||
|
int32_t streamTaskPutDataIntoOutputQ(SStreamTask* pTask, SStreamDataBlock* pBlock);
|
||||||
int32_t streamTaskPutTranstateIntoInputQ(SStreamTask* pTask);
|
int32_t streamTaskPutTranstateIntoInputQ(SStreamTask* pTask);
|
||||||
bool streamQueueIsFull(const STaosQueue* pQueue);
|
bool streamQueueIsFull(const STaosQueue* pQueue, bool inputQ);
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
SMsgHead head;
|
SMsgHead head;
|
||||||
|
@ -645,12 +646,10 @@ SStreamChildEpInfo* streamTaskGetUpstreamTaskEpInfo(SStreamTask* pTask, int32_t
|
||||||
void streamTaskInputFail(SStreamTask* pTask);
|
void streamTaskInputFail(SStreamTask* pTask);
|
||||||
int32_t streamTryExec(SStreamTask* pTask);
|
int32_t streamTryExec(SStreamTask* pTask);
|
||||||
int32_t streamSchedExec(SStreamTask* pTask);
|
int32_t streamSchedExec(SStreamTask* pTask);
|
||||||
int32_t streamTaskOutputResultBlock(SStreamTask* pTask, SStreamDataBlock* pBlock);
|
|
||||||
bool streamTaskShouldStop(const SStreamStatus* pStatus);
|
bool streamTaskShouldStop(const SStreamStatus* pStatus);
|
||||||
bool streamTaskShouldPause(const SStreamStatus* pStatus);
|
bool streamTaskShouldPause(const SStreamStatus* pStatus);
|
||||||
bool streamTaskIsIdle(const SStreamTask* pTask);
|
bool streamTaskIsIdle(const SStreamTask* pTask);
|
||||||
|
|
||||||
int32_t streamScanExec(SStreamTask* pTask, int32_t batchSize);
|
|
||||||
void initRpcMsg(SRpcMsg* pMsg, int32_t msgType, void* pCont, int32_t contLen);
|
void initRpcMsg(SRpcMsg* pMsg, int32_t msgType, void* pCont, int32_t contLen);
|
||||||
|
|
||||||
char* createStreamTaskIdStr(int64_t streamId, int32_t taskId);
|
char* createStreamTaskIdStr(int64_t streamId, int32_t taskId);
|
||||||
|
|
|
@ -374,7 +374,7 @@ int32_t doScanWalForAllTasks(SStreamMeta* pStreamMeta, bool* pScanIdle) {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (streamQueueIsFull(pTask->inputInfo.queue->pQueue)) {
|
if (streamQueueIsFull(pTask->inputInfo.queue->pQueue, true)) {
|
||||||
tqTrace("s-task:%s input queue is full, do nothing", pTask->id.idStr);
|
tqTrace("s-task:%s input queue is full, do nothing", pTask->id.idStr);
|
||||||
streamMetaReleaseTask(pStreamMeta, pTask);
|
streamMetaReleaseTask(pStreamMeta, pTask);
|
||||||
continue;
|
continue;
|
||||||
|
|
|
@ -212,36 +212,6 @@ int32_t streamTaskEnqueueRetrieve(SStreamTask* pTask, SStreamRetrieveReq* pReq,
|
||||||
return status == TASK_INPUT_STATUS__NORMAL ? 0 : -1;
|
return status == TASK_INPUT_STATUS__NORMAL ? 0 : -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t streamTaskOutputResultBlock(SStreamTask* pTask, SStreamDataBlock* pBlock) {
|
|
||||||
int32_t code = 0;
|
|
||||||
int32_t type = pTask->outputInfo.type;
|
|
||||||
if (type == TASK_OUTPUT__TABLE) {
|
|
||||||
pTask->tbSink.tbSinkFunc(pTask, pTask->tbSink.vnode, pBlock->blocks);
|
|
||||||
destroyStreamDataBlock(pBlock);
|
|
||||||
} else if (type == TASK_OUTPUT__SMA) {
|
|
||||||
pTask->smaSink.smaSink(pTask->smaSink.vnode, pTask->smaSink.smaId, pBlock->blocks);
|
|
||||||
destroyStreamDataBlock(pBlock);
|
|
||||||
} else {
|
|
||||||
ASSERT(type == TASK_OUTPUT__FIXED_DISPATCH || type == TASK_OUTPUT__SHUFFLE_DISPATCH);
|
|
||||||
STaosQueue* pQueue = pTask->outputInfo.queue->pQueue;
|
|
||||||
code = taosWriteQitem(pQueue, pBlock);
|
|
||||||
|
|
||||||
int32_t total = streamQueueGetNumOfItems(pTask->outputInfo.queue);
|
|
||||||
double size = SIZE_IN_MB(taosQueueMemorySize(pQueue));
|
|
||||||
if (code != 0) {
|
|
||||||
qError("s-task:%s failed to put res into outputQ, outputQ items:%d, size:%.2fMiB code:%s, result lost",
|
|
||||||
pTask->id.idStr, total, size, tstrerror(code));
|
|
||||||
} else {
|
|
||||||
qInfo("s-task:%s data put into outputQ, outputQ items:%d, size:%.2fMiB", pTask->id.idStr, total, size);
|
|
||||||
}
|
|
||||||
|
|
||||||
streamDispatchStreamBlock(pTask);
|
|
||||||
return code;
|
|
||||||
}
|
|
||||||
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
int32_t streamProcessDispatchMsg(SStreamTask* pTask, SStreamDispatchReq* pReq, SRpcMsg* pRsp, bool exec) {
|
int32_t streamProcessDispatchMsg(SStreamTask* pTask, SStreamDispatchReq* pReq, SRpcMsg* pRsp, bool exec) {
|
||||||
qDebug("s-task:%s receive dispatch msg from taskId:0x%x(vgId:%d), msgLen:%" PRId64, pTask->id.idStr,
|
qDebug("s-task:%s receive dispatch msg from taskId:0x%x(vgId:%d), msgLen:%" PRId64, pTask->id.idStr,
|
||||||
pReq->upstreamTaskId, pReq->upstreamNodeId, pReq->totalLen);
|
pReq->upstreamTaskId, pReq->upstreamNodeId, pReq->totalLen);
|
||||||
|
|
|
@ -499,8 +499,8 @@ int32_t streamDispatchStreamBlock(SStreamTask* pTask) {
|
||||||
|
|
||||||
const char* id = pTask->id.idStr;
|
const char* id = pTask->id.idStr;
|
||||||
int32_t numOfElems = streamQueueGetNumOfItems(pTask->outputInfo.queue);
|
int32_t numOfElems = streamQueueGetNumOfItems(pTask->outputInfo.queue);
|
||||||
double size = SIZE_IN_MB(taosQueueMemorySize(pTask->outputInfo.queue->pQueue));
|
|
||||||
if (numOfElems > 0) {
|
if (numOfElems > 0) {
|
||||||
|
double size = SIZE_IN_MB(taosQueueMemorySize(pTask->outputInfo.queue->pQueue));
|
||||||
qDebug("s-task:%s start to dispatch intermediate block to downstream, elem in outputQ:%d, size:%.2fMiB", id, numOfElems, size);
|
qDebug("s-task:%s start to dispatch intermediate block to downstream, elem in outputQ:%d, size:%.2fMiB", id, numOfElems, size);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -32,33 +32,58 @@ bool streamTaskShouldPause(const SStreamStatus* pStatus) {
|
||||||
return (status == TASK_STATUS__PAUSE);
|
return (status == TASK_STATUS__PAUSE);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static int32_t doOutputResultBlockImpl(SStreamTask* pTask, SStreamDataBlock* pBlock) {
|
||||||
|
int32_t code = 0;
|
||||||
|
int32_t type = pTask->outputInfo.type;
|
||||||
|
if (type == TASK_OUTPUT__TABLE) {
|
||||||
|
pTask->tbSink.tbSinkFunc(pTask, pTask->tbSink.vnode, pBlock->blocks);
|
||||||
|
destroyStreamDataBlock(pBlock);
|
||||||
|
} else if (type == TASK_OUTPUT__SMA) {
|
||||||
|
pTask->smaSink.smaSink(pTask->smaSink.vnode, pTask->smaSink.smaId, pBlock->blocks);
|
||||||
|
destroyStreamDataBlock(pBlock);
|
||||||
|
} else {
|
||||||
|
ASSERT(type == TASK_OUTPUT__FIXED_DISPATCH || type == TASK_OUTPUT__SHUFFLE_DISPATCH);
|
||||||
|
code = streamTaskPutDataIntoOutputQ(pTask, pBlock);
|
||||||
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
|
streamDispatchStreamBlock(pTask);
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
static int32_t doDumpResult(SStreamTask* pTask, SStreamQueueItem* pItem, SArray* pRes, int32_t size, int64_t* totalSize,
|
static int32_t doDumpResult(SStreamTask* pTask, SStreamQueueItem* pItem, SArray* pRes, int32_t size, int64_t* totalSize,
|
||||||
int32_t* totalBlocks) {
|
int32_t* totalBlocks) {
|
||||||
int32_t numOfBlocks = taosArrayGetSize(pRes);
|
int32_t numOfBlocks = taosArrayGetSize(pRes);
|
||||||
if (numOfBlocks > 0) {
|
if (numOfBlocks == 0) {
|
||||||
SStreamDataBlock* pStreamBlocks = createStreamBlockFromResults(pItem, pTask, size, pRes);
|
|
||||||
if (pStreamBlocks == NULL) {
|
|
||||||
qError("s-task:%s failed to create result stream data block, code:%s", pTask->id.idStr, tstrerror(terrno));
|
|
||||||
taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes);
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
|
|
||||||
qDebug("s-task:%s dump stream result data blocks, num:%d, size:%.2fMiB", pTask->id.idStr, numOfBlocks,
|
|
||||||
SIZE_IN_MB(size));
|
|
||||||
|
|
||||||
int32_t code = streamTaskOutputResultBlock(pTask, pStreamBlocks);
|
|
||||||
if (code == TSDB_CODE_UTIL_QUEUE_OUT_OF_MEMORY) { // back pressure and record position
|
|
||||||
destroyStreamDataBlock(pStreamBlocks);
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
|
|
||||||
*totalSize += size;
|
|
||||||
*totalBlocks += numOfBlocks;
|
|
||||||
} else {
|
|
||||||
taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes);
|
taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes);
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
return TSDB_CODE_SUCCESS;
|
SStreamDataBlock* pStreamBlocks = createStreamBlockFromResults(pItem, pTask, size, pRes);
|
||||||
|
if (pStreamBlocks == NULL) {
|
||||||
|
qError("s-task:%s failed to create result stream data block, code:%s", pTask->id.idStr, tstrerror(terrno));
|
||||||
|
taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes);
|
||||||
|
return TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
}
|
||||||
|
|
||||||
|
qDebug("s-task:%s dump stream result data blocks, num:%d, size:%.2fMiB", pTask->id.idStr, numOfBlocks,
|
||||||
|
SIZE_IN_MB(size));
|
||||||
|
|
||||||
|
int32_t code = doOutputResultBlockImpl(pTask, pStreamBlocks);
|
||||||
|
if (code != TSDB_CODE_SUCCESS) { // back pressure and record position
|
||||||
|
//code == TSDB_CODE_UTIL_QUEUE_OUT_OF_MEMORY
|
||||||
|
destroyStreamDataBlock(pStreamBlocks);
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
|
*totalSize += size;
|
||||||
|
*totalBlocks += numOfBlocks;
|
||||||
|
|
||||||
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t streamTaskExecImpl(SStreamTask* pTask, SStreamQueueItem* pItem, int64_t* totalSize,
|
static int32_t streamTaskExecImpl(SStreamTask* pTask, SStreamQueueItem* pItem, int64_t* totalSize,
|
||||||
|
@ -236,7 +261,7 @@ int32_t streamScanHistoryData(SStreamTask* pTask) {
|
||||||
qRes->type = STREAM_INPUT__DATA_BLOCK;
|
qRes->type = STREAM_INPUT__DATA_BLOCK;
|
||||||
qRes->blocks = pRes;
|
qRes->blocks = pRes;
|
||||||
|
|
||||||
code = streamTaskOutputResultBlock(pTask, qRes);
|
code = doOutputResultBlockImpl(pTask, qRes);
|
||||||
if (code == TSDB_CODE_UTIL_QUEUE_OUT_OF_MEMORY) {
|
if (code == TSDB_CODE_UTIL_QUEUE_OUT_OF_MEMORY) {
|
||||||
taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes);
|
taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes);
|
||||||
taosFreeQitem(qRes);
|
taosFreeQitem(qRes);
|
||||||
|
@ -536,7 +561,7 @@ int32_t streamExecForAll(SStreamTask* pTask) {
|
||||||
|
|
||||||
if (type == STREAM_INPUT__DATA_BLOCK) {
|
if (type == STREAM_INPUT__DATA_BLOCK) {
|
||||||
qDebug("s-task:%s sink task start to sink %d blocks", id, numOfBlocks);
|
qDebug("s-task:%s sink task start to sink %d blocks", id, numOfBlocks);
|
||||||
streamTaskOutputResultBlock(pTask, (SStreamDataBlock*)pInput);
|
doOutputResultBlockImpl(pTask, (SStreamDataBlock*)pInput);
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -15,10 +15,11 @@
|
||||||
|
|
||||||
#include "streamInt.h"
|
#include "streamInt.h"
|
||||||
|
|
||||||
#define MAX_STREAM_EXEC_BATCH_NUM 32
|
#define MAX_STREAM_EXEC_BATCH_NUM 32
|
||||||
#define MIN_STREAM_EXEC_BATCH_NUM 4
|
#define MIN_STREAM_EXEC_BATCH_NUM 4
|
||||||
#define STREAM_TASK_INPUT_QUEUE_CAPACITY 20480
|
#define STREAM_TASK_QUEUE_CAPACITY 20480
|
||||||
#define STREAM_TASK_INPUT_QUEUE_CAPACITY_IN_SIZE (30)
|
#define STREAM_TASK_INPUT_QUEUE_CAPACITY_IN_SIZE (30)
|
||||||
|
#define STREAM_TASK_OUTPUT_QUEUE_CAPACITY_IN_SIZE (50)
|
||||||
|
|
||||||
// todo refactor:
|
// todo refactor:
|
||||||
// read data from input queue
|
// read data from input queue
|
||||||
|
@ -159,10 +160,15 @@ SStreamQueueRes streamQueueGetRes(SStreamQueue1* pQueue) {
|
||||||
}
|
}
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
bool streamQueueIsFull(const STaosQueue* pQueue) {
|
bool streamQueueIsFull(const STaosQueue* pQueue, bool inputQ) {
|
||||||
bool isFull = taosQueueItemSize((STaosQueue*) pQueue) >= STREAM_TASK_INPUT_QUEUE_CAPACITY;
|
bool isFull = taosQueueItemSize((STaosQueue*)pQueue) >= STREAM_TASK_QUEUE_CAPACITY;
|
||||||
double size = SIZE_IN_MB(taosQueueMemorySize((STaosQueue*) pQueue));
|
if (isFull) {
|
||||||
return (isFull || size >= STREAM_TASK_INPUT_QUEUE_CAPACITY_IN_SIZE);
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t threahold = (inputQ) ? STREAM_TASK_INPUT_QUEUE_CAPACITY_IN_SIZE : STREAM_TASK_OUTPUT_QUEUE_CAPACITY_IN_SIZE;
|
||||||
|
double size = SIZE_IN_MB(taosQueueMemorySize((STaosQueue*)pQueue));
|
||||||
|
return (size >= threahold);
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t streamQueueGetNumOfItems(const SStreamQueue* pQueue) {
|
int32_t streamQueueGetNumOfItems(const SStreamQueue* pQueue) {
|
||||||
|
@ -275,15 +281,15 @@ int32_t streamTaskGetDataFromInputQ(SStreamTask* pTask, SStreamQueueItem** pInpu
|
||||||
int32_t streamTaskPutDataIntoInputQ(SStreamTask* pTask, SStreamQueueItem* pItem) {
|
int32_t streamTaskPutDataIntoInputQ(SStreamTask* pTask, SStreamQueueItem* pItem) {
|
||||||
int8_t type = pItem->type;
|
int8_t type = pItem->type;
|
||||||
STaosQueue* pQueue = pTask->inputInfo.queue->pQueue;
|
STaosQueue* pQueue = pTask->inputInfo.queue->pQueue;
|
||||||
int32_t total = taosQueueItemSize(pQueue) + 1;
|
int32_t total = streamQueueGetNumOfItems(pTask->inputInfo.queue) + 1;
|
||||||
|
|
||||||
if (type == STREAM_INPUT__DATA_SUBMIT) {
|
if (type == STREAM_INPUT__DATA_SUBMIT) {
|
||||||
SStreamDataSubmit* px = (SStreamDataSubmit*)pItem;
|
SStreamDataSubmit* px = (SStreamDataSubmit*)pItem;
|
||||||
if ((pTask->info.taskLevel == TASK_LEVEL__SOURCE) && streamQueueIsFull(pQueue)) {
|
if ((pTask->info.taskLevel == TASK_LEVEL__SOURCE) && streamQueueIsFull(pQueue, true)) {
|
||||||
double size = SIZE_IN_MB(taosQueueMemorySize(pQueue));
|
double size = SIZE_IN_MB(taosQueueMemorySize(pQueue));
|
||||||
qTrace(
|
qTrace(
|
||||||
"s-task:%s inputQ is full, capacity(size:%d num:%dMiB), current(blocks:%d, size:%.2fMiB) stop to push data",
|
"s-task:%s inputQ is full, capacity(size:%d num:%dMiB), current(blocks:%d, size:%.2fMiB) stop to push data",
|
||||||
pTask->id.idStr, STREAM_TASK_INPUT_QUEUE_CAPACITY, STREAM_TASK_INPUT_QUEUE_CAPACITY_IN_SIZE, total, size);
|
pTask->id.idStr, STREAM_TASK_QUEUE_CAPACITY, STREAM_TASK_INPUT_QUEUE_CAPACITY_IN_SIZE, total, size);
|
||||||
streamDataSubmitDestroy(px);
|
streamDataSubmitDestroy(px);
|
||||||
taosFreeQitem(pItem);
|
taosFreeQitem(pItem);
|
||||||
return -1;
|
return -1;
|
||||||
|
@ -306,11 +312,11 @@ int32_t streamTaskPutDataIntoInputQ(SStreamTask* pTask, SStreamQueueItem* pItem)
|
||||||
msgLen, ver, total, size + SIZE_IN_MB(msgLen));
|
msgLen, ver, total, size + SIZE_IN_MB(msgLen));
|
||||||
} else if (type == STREAM_INPUT__DATA_BLOCK || type == STREAM_INPUT__DATA_RETRIEVE ||
|
} else if (type == STREAM_INPUT__DATA_BLOCK || type == STREAM_INPUT__DATA_RETRIEVE ||
|
||||||
type == STREAM_INPUT__REF_DATA_BLOCK) {
|
type == STREAM_INPUT__REF_DATA_BLOCK) {
|
||||||
if (streamQueueIsFull(pQueue)) {
|
if (streamQueueIsFull(pQueue, true)) {
|
||||||
double size = SIZE_IN_MB(taosQueueMemorySize(pQueue));
|
double size = SIZE_IN_MB(taosQueueMemorySize(pQueue));
|
||||||
|
|
||||||
qTrace("s-task:%s input queue is full, capacity:%d size:%d MiB, current(blocks:%d, size:%.2fMiB) abort",
|
qTrace("s-task:%s input queue is full, capacity:%d size:%d MiB, current(blocks:%d, size:%.2fMiB) abort",
|
||||||
pTask->id.idStr, STREAM_TASK_INPUT_QUEUE_CAPACITY, STREAM_TASK_INPUT_QUEUE_CAPACITY_IN_SIZE, total, size);
|
pTask->id.idStr, STREAM_TASK_QUEUE_CAPACITY, STREAM_TASK_INPUT_QUEUE_CAPACITY_IN_SIZE, total, size);
|
||||||
destroyStreamDataBlock((SStreamDataBlock*)pItem);
|
destroyStreamDataBlock((SStreamDataBlock*)pItem);
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
@ -356,6 +362,38 @@ int32_t streamTaskPutDataIntoInputQ(SStreamTask* pTask, SStreamQueueItem* pItem)
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// the result should be put into the outputQ in any cases, otherwise, the result may be lost
|
||||||
|
int32_t streamTaskPutDataIntoOutputQ(SStreamTask* pTask, SStreamDataBlock* pBlock) {
|
||||||
|
STaosQueue* pQueue = pTask->outputInfo.queue->pQueue;
|
||||||
|
|
||||||
|
while (streamQueueIsFull(pQueue, false)) {
|
||||||
|
if (streamTaskShouldStop(&pTask->status)) {
|
||||||
|
qInfo("s-task:%s discard result block due to task stop", pTask->id.idStr);
|
||||||
|
return TSDB_CODE_STREAM_EXEC_CANCELLED;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t total = streamQueueGetNumOfItems(pTask->outputInfo.queue);
|
||||||
|
double size = SIZE_IN_MB(taosQueueMemorySize(pQueue));
|
||||||
|
// let's wait for there are enough space to hold this result pBlock
|
||||||
|
qDebug("s-task:%s outputQ is full, wait for 500ms and retry, outputQ items:%d, size:%.2fMiB", pTask->id.idStr,
|
||||||
|
total, size);
|
||||||
|
taosMsleep(500);
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t code = taosWriteQitem(pQueue, pBlock);
|
||||||
|
|
||||||
|
int32_t total = streamQueueGetNumOfItems(pTask->outputInfo.queue);
|
||||||
|
double size = SIZE_IN_MB(taosQueueMemorySize(pQueue));
|
||||||
|
if (code != 0) {
|
||||||
|
qError("s-task:%s failed to put res into outputQ, outputQ items:%d, size:%.2fMiB code:%s, result lost",
|
||||||
|
pTask->id.idStr, total + 1, size, tstrerror(code));
|
||||||
|
} else {
|
||||||
|
qInfo("s-task:%s data put into outputQ, outputQ items:%d, size:%.2fMiB", pTask->id.idStr, total, size);
|
||||||
|
}
|
||||||
|
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
int32_t streamTaskInitTokenBucket(STokenBucket* pBucket, int32_t cap, int32_t rate) {
|
int32_t streamTaskInitTokenBucket(STokenBucket* pBucket, int32_t cap, int32_t rate) {
|
||||||
if (cap < 100 || rate < 50 || pBucket == NULL) {
|
if (cap < 100 || rate < 50 || pBucket == NULL) {
|
||||||
qError("failed to init sink task bucket, cap:%d, rate:%d", cap, rate);
|
qError("failed to init sink task bucket, cap:%d, rate:%d", cap, rate);
|
||||||
|
|
Loading…
Reference in New Issue