Merge pull request #22866 from taosdata/fix/3_liaohj
refactor: do some internal refactor.
This commit is contained in:
commit
4325a0a433
|
@ -111,14 +111,12 @@ typedef int32_t FTaskExpand(void* ahandle, SStreamTask* pTask, int64_t ver);
|
||||||
typedef struct {
|
typedef struct {
|
||||||
int8_t type;
|
int8_t type;
|
||||||
int64_t ver;
|
int64_t ver;
|
||||||
int32_t* dataRef;
|
|
||||||
SPackedData submit;
|
SPackedData submit;
|
||||||
} SStreamDataSubmit;
|
} SStreamDataSubmit;
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
int8_t type;
|
int8_t type;
|
||||||
int64_t ver;
|
int64_t ver;
|
||||||
SArray* dataRefs; // SArray<int32_t*>
|
|
||||||
SArray* submits; // SArray<SPackedSubmit>
|
SArray* submits; // SArray<SPackedSubmit>
|
||||||
} SStreamMergedSubmit;
|
} SStreamMergedSubmit;
|
||||||
|
|
||||||
|
@ -672,7 +670,7 @@ int32_t streamLaunchFillHistoryTask(SStreamTask* pTask);
|
||||||
int32_t streamTaskScanHistoryDataComplete(SStreamTask* pTask);
|
int32_t streamTaskScanHistoryDataComplete(SStreamTask* pTask);
|
||||||
int32_t streamStartScanHistoryAsync(SStreamTask* pTask, int8_t igUntreated);
|
int32_t streamStartScanHistoryAsync(SStreamTask* pTask, int8_t igUntreated);
|
||||||
bool streamHistoryTaskSetVerRangeStep2(SStreamTask* pTask, int64_t latestVer);
|
bool streamHistoryTaskSetVerRangeStep2(SStreamTask* pTask, int64_t latestVer);
|
||||||
int32_t streamTaskGetInputQItems(const SStreamTask* pTask);
|
int32_t streamQueueGetNumOfItems(const SStreamQueue* pQueue);
|
||||||
|
|
||||||
// common
|
// common
|
||||||
int32_t streamRestoreParam(SStreamTask* pTask);
|
int32_t streamRestoreParam(SStreamTask* pTask);
|
||||||
|
@ -696,7 +694,7 @@ void streamTaskOpenAllUpstreamInput(SStreamTask* pTask);
|
||||||
// source level
|
// source level
|
||||||
int32_t streamSetParamForStreamScannerStep1(SStreamTask* pTask, SVersionRange* pVerRange, STimeWindow* pWindow);
|
int32_t streamSetParamForStreamScannerStep1(SStreamTask* pTask, SVersionRange* pVerRange, STimeWindow* pWindow);
|
||||||
int32_t streamSetParamForStreamScannerStep2(SStreamTask* pTask, SVersionRange* pVerRange, STimeWindow* pWindow);
|
int32_t streamSetParamForStreamScannerStep2(SStreamTask* pTask, SVersionRange* pVerRange, STimeWindow* pWindow);
|
||||||
int32_t streamSourceScanHistoryData(SStreamTask* pTask);
|
int32_t streamScanHistoryData(SStreamTask* pTask);
|
||||||
int32_t streamDispatchScanHistoryFinishMsg(SStreamTask* pTask);
|
int32_t streamDispatchScanHistoryFinishMsg(SStreamTask* pTask);
|
||||||
|
|
||||||
// agg level
|
// agg level
|
||||||
|
|
|
@ -1048,7 +1048,7 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) {
|
||||||
ASSERT(pTask->status.pauseAllowed == true);
|
ASSERT(pTask->status.pauseAllowed == true);
|
||||||
}
|
}
|
||||||
|
|
||||||
streamSourceScanHistoryData(pTask);
|
streamScanHistoryData(pTask);
|
||||||
if (pTask->status.taskStatus == TASK_STATUS__PAUSE) {
|
if (pTask->status.taskStatus == TASK_STATUS__PAUSE) {
|
||||||
double el = (taosGetTimestampMs() - pTask->tsInfo.step1Start) / 1000.0;
|
double el = (taosGetTimestampMs() - pTask->tsInfo.step1Start) / 1000.0;
|
||||||
tqDebug("s-task:%s is paused in the step1, elapsed time:%.2fs, sched-status:%d", pTask->id.idStr, el,
|
tqDebug("s-task:%s is paused in the step1, elapsed time:%.2fs, sched-status:%d", pTask->id.idStr, el,
|
||||||
|
|
|
@ -537,7 +537,7 @@ int32_t doConvertRows(SSubmitTbData* pTableData, STSchema* pTSchema, SSDataBlock
|
||||||
if (k == 0) {
|
if (k == 0) {
|
||||||
SColumnInfoData* pColData = taosArrayGet(pDataBlock->pDataBlock, dataIndex);
|
SColumnInfoData* pColData = taosArrayGet(pDataBlock->pDataBlock, dataIndex);
|
||||||
void* colData = colDataGetData(pColData, j);
|
void* colData = colDataGetData(pColData, j);
|
||||||
tqDebug("s-task:%s tq sink pipe2, row %d, col %d ts %" PRId64, id, j, k, *(int64_t*)colData);
|
tqDebug("s-task:%s sink row %d, col %d ts %" PRId64, id, j, k, *(int64_t*)colData);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (IS_SET_NULL(pCol)) {
|
if (IS_SET_NULL(pCol)) {
|
||||||
|
|
|
@ -396,7 +396,7 @@ int32_t doScanWalForAllTasks(SStreamMeta* pStreamMeta, bool* pScanIdle) {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t numOfItems = streamTaskGetInputQItems(pTask);
|
int32_t numOfItems = streamQueueGetNumOfItems(pTask->inputInfo.queue);
|
||||||
int64_t maxVer = (pTask->info.fillHistory == 1) ? pTask->dataRange.range.maxVer : INT64_MAX;
|
int64_t maxVer = (pTask->info.fillHistory == 1) ? pTask->dataRange.range.maxVer : INT64_MAX;
|
||||||
|
|
||||||
taosThreadMutexLock(&pTask->lock);
|
taosThreadMutexLock(&pTask->lock);
|
||||||
|
|
|
@ -177,7 +177,7 @@ static int32_t streamTaskAppendInputBlocks(SStreamTask* pTask, const SStreamDisp
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t streamTaskEnqueueRetrieve(SStreamTask* pTask, SStreamRetrieveReq* pReq, SRpcMsg* pRsp) {
|
int32_t streamTaskEnqueueRetrieve(SStreamTask* pTask, SStreamRetrieveReq* pReq, SRpcMsg* pRsp) {
|
||||||
SStreamDataBlock* pData = taosAllocateQitem(sizeof(SStreamDataBlock), DEF_QITEM, 0);
|
SStreamDataBlock* pData = taosAllocateQitem(sizeof(SStreamDataBlock), DEF_QITEM, sizeof(SStreamDataBlock));
|
||||||
int8_t status = TASK_INPUT_STATUS__NORMAL;
|
int8_t status = TASK_INPUT_STATUS__NORMAL;
|
||||||
|
|
||||||
// enqueue
|
// enqueue
|
||||||
|
@ -223,9 +223,16 @@ int32_t streamTaskOutputResultBlock(SStreamTask* pTask, SStreamDataBlock* pBlock
|
||||||
destroyStreamDataBlock(pBlock);
|
destroyStreamDataBlock(pBlock);
|
||||||
} else {
|
} else {
|
||||||
ASSERT(type == TASK_OUTPUT__FIXED_DISPATCH || type == TASK_OUTPUT__SHUFFLE_DISPATCH);
|
ASSERT(type == TASK_OUTPUT__FIXED_DISPATCH || type == TASK_OUTPUT__SHUFFLE_DISPATCH);
|
||||||
code = taosWriteQitem(pTask->outputInfo.queue->pQueue, pBlock);
|
STaosQueue* pQueue = pTask->outputInfo.queue->pQueue;
|
||||||
|
code = taosWriteQitem(pQueue, pBlock);
|
||||||
|
|
||||||
|
int32_t total = streamQueueGetNumOfItems(pTask->outputInfo.queue);
|
||||||
|
double size = SIZE_IN_MB(taosQueueMemorySize(pQueue));
|
||||||
if (code != 0) {
|
if (code != 0) {
|
||||||
qError("s-task:%s failed to put res into outputQ", pTask->id.idStr);
|
qError("s-task:%s failed to put res into outputQ, outputQ items:%d, size:%.2fMiB code:%s, result lost",
|
||||||
|
pTask->id.idStr, total, size, tstrerror(code));
|
||||||
|
} else {
|
||||||
|
qInfo("s-task:%s data put into outputQ, outputQ items:%d, size:%.2fMiB", pTask->id.idStr, total, size);
|
||||||
}
|
}
|
||||||
|
|
||||||
streamDispatchStreamBlock(pTask);
|
streamDispatchStreamBlock(pTask);
|
||||||
|
|
|
@ -115,28 +115,16 @@ SStreamDataSubmit* streamDataSubmitNew(SPackedData* pData, int32_t type) {
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
pDataSubmit->dataRef = (int32_t*)taosMemoryMalloc(sizeof(int32_t));
|
|
||||||
if (pDataSubmit->dataRef == NULL) {
|
|
||||||
taosFreeQitem(pDataSubmit);
|
|
||||||
return NULL;
|
|
||||||
}
|
|
||||||
|
|
||||||
pDataSubmit->ver = pData->ver;
|
pDataSubmit->ver = pData->ver;
|
||||||
pDataSubmit->submit = *pData;
|
pDataSubmit->submit = *pData;
|
||||||
*pDataSubmit->dataRef = 1; // initialize the reference count to be 1
|
|
||||||
pDataSubmit->type = type;
|
pDataSubmit->type = type;
|
||||||
|
|
||||||
return pDataSubmit;
|
return pDataSubmit;
|
||||||
}
|
}
|
||||||
|
|
||||||
void streamDataSubmitDestroy(SStreamDataSubmit* pDataSubmit) {
|
void streamDataSubmitDestroy(SStreamDataSubmit* pDataSubmit) {
|
||||||
int32_t ref = atomic_sub_fetch_32(pDataSubmit->dataRef, 1);
|
ASSERT(pDataSubmit->type == STREAM_INPUT__DATA_SUBMIT);
|
||||||
ASSERT(ref >= 0 && pDataSubmit->type == STREAM_INPUT__DATA_SUBMIT);
|
taosMemoryFree(pDataSubmit->submit.msgStr);
|
||||||
|
|
||||||
if (ref == 0) {
|
|
||||||
taosMemoryFree(pDataSubmit->submit.msgStr);
|
|
||||||
taosMemoryFree(pDataSubmit->dataRef);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
SStreamMergedSubmit* streamMergedSubmitNew() {
|
SStreamMergedSubmit* streamMergedSubmitNew() {
|
||||||
|
@ -146,11 +134,8 @@ SStreamMergedSubmit* streamMergedSubmitNew() {
|
||||||
}
|
}
|
||||||
|
|
||||||
pMerged->submits = taosArrayInit(0, sizeof(SPackedData));
|
pMerged->submits = taosArrayInit(0, sizeof(SPackedData));
|
||||||
pMerged->dataRefs = taosArrayInit(0, sizeof(void*));
|
if (pMerged->submits == NULL) {
|
||||||
|
|
||||||
if (pMerged->dataRefs == NULL || pMerged->submits == NULL) {
|
|
||||||
taosArrayDestroy(pMerged->submits);
|
taosArrayDestroy(pMerged->submits);
|
||||||
taosArrayDestroy(pMerged->dataRefs);
|
|
||||||
taosFreeQitem(pMerged);
|
taosFreeQitem(pMerged);
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
@ -160,9 +145,10 @@ SStreamMergedSubmit* streamMergedSubmitNew() {
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t streamMergeSubmit(SStreamMergedSubmit* pMerged, SStreamDataSubmit* pSubmit) {
|
int32_t streamMergeSubmit(SStreamMergedSubmit* pMerged, SStreamDataSubmit* pSubmit) {
|
||||||
taosArrayPush(pMerged->dataRefs, &pSubmit->dataRef);
|
|
||||||
taosArrayPush(pMerged->submits, &pSubmit->submit);
|
taosArrayPush(pMerged->submits, &pSubmit->submit);
|
||||||
pMerged->ver = pSubmit->ver;
|
if (pSubmit->ver > pMerged->ver) {
|
||||||
|
pMerged->ver = pSubmit->ver;
|
||||||
|
}
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -222,18 +208,10 @@ void streamFreeQitem(SStreamQueueItem* data) {
|
||||||
|
|
||||||
int32_t sz = taosArrayGetSize(pMerge->submits);
|
int32_t sz = taosArrayGetSize(pMerge->submits);
|
||||||
for (int32_t i = 0; i < sz; i++) {
|
for (int32_t i = 0; i < sz; i++) {
|
||||||
int32_t* pRef = taosArrayGetP(pMerge->dataRefs, i);
|
SPackedData* pSubmit = (SPackedData*)taosArrayGet(pMerge->submits, i);
|
||||||
int32_t ref = atomic_sub_fetch_32(pRef, 1);
|
taosMemoryFree(pSubmit->msgStr);
|
||||||
ASSERT(ref >= 0);
|
|
||||||
|
|
||||||
if (ref == 0) {
|
|
||||||
SPackedData* pSubmit = (SPackedData*)taosArrayGet(pMerge->submits, i);
|
|
||||||
taosMemoryFree(pSubmit->msgStr);
|
|
||||||
taosMemoryFree(pRef);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
taosArrayDestroy(pMerge->submits);
|
taosArrayDestroy(pMerge->submits);
|
||||||
taosArrayDestroy(pMerge->dataRefs);
|
|
||||||
taosFreeQitem(pMerge);
|
taosFreeQitem(pMerge);
|
||||||
} else if (type == STREAM_INPUT__REF_DATA_BLOCK) {
|
} else if (type == STREAM_INPUT__REF_DATA_BLOCK) {
|
||||||
SStreamRefDataBlock* pRefBlock = (SStreamRefDataBlock*)data;
|
SStreamRefDataBlock* pRefBlock = (SStreamRefDataBlock*)data;
|
||||||
|
|
|
@ -498,9 +498,10 @@ int32_t streamDispatchStreamBlock(SStreamTask* pTask) {
|
||||||
ASSERT((pTask->outputInfo.type == TASK_OUTPUT__FIXED_DISPATCH || pTask->outputInfo.type == TASK_OUTPUT__SHUFFLE_DISPATCH));
|
ASSERT((pTask->outputInfo.type == TASK_OUTPUT__FIXED_DISPATCH || pTask->outputInfo.type == TASK_OUTPUT__SHUFFLE_DISPATCH));
|
||||||
|
|
||||||
const char* id = pTask->id.idStr;
|
const char* id = pTask->id.idStr;
|
||||||
int32_t numOfElems = taosQueueItemSize(pTask->outputInfo.queue->pQueue);
|
int32_t numOfElems = streamQueueGetNumOfItems(pTask->outputInfo.queue);
|
||||||
|
double size = SIZE_IN_MB(taosQueueMemorySize(pTask->outputInfo.queue->pQueue));
|
||||||
if (numOfElems > 0) {
|
if (numOfElems > 0) {
|
||||||
qDebug("s-task:%s try to dispatch intermediate block to downstream, elem in outputQ:%d", id, numOfElems);
|
qDebug("s-task:%s start to dispatch intermediate block to downstream, elem in outputQ:%d, size:%.2fMiB", id, numOfElems, size);
|
||||||
}
|
}
|
||||||
|
|
||||||
// to make sure only one dispatch is running
|
// to make sure only one dispatch is running
|
||||||
|
|
|
@ -164,11 +164,13 @@ static int32_t streamTaskExecImpl(SStreamTask* pTask, SStreamQueueItem* pItem, i
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t streamScanExec(SStreamTask* pTask, int32_t batchSize) {
|
int32_t streamScanHistoryData(SStreamTask* pTask) {
|
||||||
ASSERT(pTask->info.taskLevel == TASK_LEVEL__SOURCE);
|
ASSERT(pTask->info.taskLevel == TASK_LEVEL__SOURCE);
|
||||||
|
|
||||||
int32_t code = TSDB_CODE_SUCCESS;
|
int32_t code = TSDB_CODE_SUCCESS;
|
||||||
void* exec = pTask->exec.pExecutor;
|
void* exec = pTask->exec.pExecutor;
|
||||||
bool finished = false;
|
bool finished = false;
|
||||||
|
int32_t outputBatchSize = 100;
|
||||||
|
|
||||||
qSetStreamOpOpen(exec);
|
qSetStreamOpOpen(exec);
|
||||||
|
|
||||||
|
@ -217,8 +219,8 @@ int32_t streamScanExec(SStreamTask* pTask, int32_t batchSize) {
|
||||||
block.info.childId = pTask->info.selfChildId;
|
block.info.childId = pTask->info.selfChildId;
|
||||||
taosArrayPush(pRes, &block);
|
taosArrayPush(pRes, &block);
|
||||||
|
|
||||||
if ((++numOfBlocks) >= batchSize) {
|
if ((++numOfBlocks) >= outputBatchSize) {
|
||||||
qDebug("s-task:%s scan exec numOfBlocks:%d, output limit:%d reached", pTask->id.idStr, numOfBlocks, batchSize);
|
qDebug("s-task:%s scan exec numOfBlocks:%d, output limit:%d reached", pTask->id.idStr, numOfBlocks, outputBatchSize);
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -248,13 +250,6 @@ int32_t streamScanExec(SStreamTask* pTask, int32_t batchSize) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t streamTaskGetInputQItems(const SStreamTask* pTask) {
|
|
||||||
int32_t numOfItems1 = taosQueueItemSize(pTask->inputInfo.queue->pQueue);
|
|
||||||
int32_t numOfItems2 = taosQallItemSize(pTask->inputInfo.queue->qall);
|
|
||||||
|
|
||||||
return numOfItems1 + numOfItems2;
|
|
||||||
}
|
|
||||||
|
|
||||||
// wait for the stream task to be idle
|
// wait for the stream task to be idle
|
||||||
static void waitForTaskIdle(SStreamTask* pTask, SStreamTask* pStreamTask) {
|
static void waitForTaskIdle(SStreamTask* pTask, SStreamTask* pStreamTask) {
|
||||||
const char* id = pTask->id.idStr;
|
const char* id = pTask->id.idStr;
|
||||||
|
@ -576,7 +571,6 @@ int32_t streamExecForAll(SStreamTask* pTask) {
|
||||||
// todo other thread may change the status
|
// todo other thread may change the status
|
||||||
// do nothing after sync executor state to storage backend, untill the vnode-level checkpoint is completed.
|
// do nothing after sync executor state to storage backend, untill the vnode-level checkpoint is completed.
|
||||||
if (type == STREAM_INPUT__CHECKPOINT) {
|
if (type == STREAM_INPUT__CHECKPOINT) {
|
||||||
// ASSERT(pTask->status.taskStatus == TASK_STATUS__CK);
|
|
||||||
qDebug("s-task:%s checkpoint block received, set the status:%s", pTask->id.idStr,
|
qDebug("s-task:%s checkpoint block received, set the status:%s", pTask->id.idStr,
|
||||||
streamGetTaskStatusStr(pTask->status.taskStatus));
|
streamGetTaskStatusStr(pTask->status.taskStatus));
|
||||||
streamTaskBuildCheckpoint(pTask);
|
streamTaskBuildCheckpoint(pTask);
|
||||||
|
@ -608,8 +602,6 @@ int32_t streamTryExec(SStreamTask* pTask) {
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
// streamTaskBuildCheckpoint(pTask);
|
|
||||||
|
|
||||||
atomic_store_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__INACTIVE);
|
atomic_store_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__INACTIVE);
|
||||||
qDebug("s-task:%s exec completed, status:%s, sched-status:%d", id, streamGetTaskStatusStr(pTask->status.taskStatus),
|
qDebug("s-task:%s exec completed, status:%s, sched-status:%d", id, streamGetTaskStatusStr(pTask->status.taskStatus),
|
||||||
pTask->status.schedStatus);
|
pTask->status.schedStatus);
|
||||||
|
|
|
@ -165,6 +165,13 @@ bool streamQueueIsFull(const STaosQueue* pQueue) {
|
||||||
return (isFull || size >= STREAM_TASK_INPUT_QUEUE_CAPACITY_IN_SIZE);
|
return (isFull || size >= STREAM_TASK_INPUT_QUEUE_CAPACITY_IN_SIZE);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int32_t streamQueueGetNumOfItems(const SStreamQueue* pQueue) {
|
||||||
|
int32_t numOfItems1 = taosQueueItemSize(pQueue->pQueue);
|
||||||
|
int32_t numOfItems2 = taosQallItemSize(pQueue->qall);
|
||||||
|
|
||||||
|
return numOfItems1 + numOfItems2;
|
||||||
|
}
|
||||||
|
|
||||||
int32_t streamTaskGetDataFromInputQ(SStreamTask* pTask, SStreamQueueItem** pInput, int32_t* numOfBlocks) {
|
int32_t streamTaskGetDataFromInputQ(SStreamTask* pTask, SStreamQueueItem** pInput, int32_t* numOfBlocks) {
|
||||||
int32_t retryTimes = 0;
|
int32_t retryTimes = 0;
|
||||||
int32_t MAX_RETRY_TIMES = 5;
|
int32_t MAX_RETRY_TIMES = 5;
|
||||||
|
@ -269,14 +276,14 @@ int32_t streamTaskPutDataIntoInputQ(SStreamTask* pTask, SStreamQueueItem* pItem)
|
||||||
int8_t type = pItem->type;
|
int8_t type = pItem->type;
|
||||||
STaosQueue* pQueue = pTask->inputInfo.queue->pQueue;
|
STaosQueue* pQueue = pTask->inputInfo.queue->pQueue;
|
||||||
int32_t total = taosQueueItemSize(pQueue) + 1;
|
int32_t total = taosQueueItemSize(pQueue) + 1;
|
||||||
double size = SIZE_IN_MB(taosQueueMemorySize(pQueue));
|
|
||||||
|
|
||||||
if (type == STREAM_INPUT__DATA_SUBMIT) {
|
if (type == STREAM_INPUT__DATA_SUBMIT) {
|
||||||
SStreamDataSubmit* px = (SStreamDataSubmit*)pItem;
|
SStreamDataSubmit* px = (SStreamDataSubmit*)pItem;
|
||||||
if ((pTask->info.taskLevel == TASK_LEVEL__SOURCE) && streamQueueIsFull(pQueue)) {
|
if ((pTask->info.taskLevel == TASK_LEVEL__SOURCE) && streamQueueIsFull(pQueue)) {
|
||||||
// qError(
|
double size = SIZE_IN_MB(taosQueueMemorySize(pQueue));
|
||||||
// "s-task:%s inputQ is full, capacity(size:%d num:%dMiB), current(blocks:%d, size:%.2fMiB) stop to push data",
|
qTrace(
|
||||||
// pTask->id.idStr, STREAM_TASK_INPUT_QUEUE_CAPACITY, STREAM_TASK_INPUT_QUEUE_CAPACITY_IN_SIZE, total, size);
|
"s-task:%s inputQ is full, capacity(size:%d num:%dMiB), current(blocks:%d, size:%.2fMiB) stop to push data",
|
||||||
|
pTask->id.idStr, STREAM_TASK_INPUT_QUEUE_CAPACITY, STREAM_TASK_INPUT_QUEUE_CAPACITY_IN_SIZE, total, size);
|
||||||
streamDataSubmitDestroy(px);
|
streamDataSubmitDestroy(px);
|
||||||
taosFreeQitem(pItem);
|
taosFreeQitem(pItem);
|
||||||
return -1;
|
return -1;
|
||||||
|
@ -292,24 +299,30 @@ int32_t streamTaskPutDataIntoInputQ(SStreamTask* pTask, SStreamQueueItem* pItem)
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
double size = SIZE_IN_MB(taosQueueMemorySize(pQueue));
|
||||||
|
|
||||||
// use the local variable to avoid the pItem be freed by other threads, since it has been put into queue already.
|
// use the local variable to avoid the pItem be freed by other threads, since it has been put into queue already.
|
||||||
qDebug("s-task:%s submit enqueue msgLen:%d ver:%" PRId64 ", total in queue:%d, size:%.2fMiB", pTask->id.idStr,
|
qDebug("s-task:%s submit enqueue msgLen:%d ver:%" PRId64 ", total in queue:%d, size:%.2fMiB", pTask->id.idStr,
|
||||||
msgLen, ver, total, size + SIZE_IN_MB(msgLen));
|
msgLen, ver, total, size + SIZE_IN_MB(msgLen));
|
||||||
} else if (type == STREAM_INPUT__DATA_BLOCK || type == STREAM_INPUT__DATA_RETRIEVE ||
|
} else if (type == STREAM_INPUT__DATA_BLOCK || type == STREAM_INPUT__DATA_RETRIEVE ||
|
||||||
type == STREAM_INPUT__REF_DATA_BLOCK) {
|
type == STREAM_INPUT__REF_DATA_BLOCK) {
|
||||||
if (streamQueueIsFull(pQueue)) {
|
if (streamQueueIsFull(pQueue)) {
|
||||||
// qError("s-task:%s input queue is full, capacity:%d size:%d MiB, current(blocks:%d, size:%.2fMiB) abort",
|
double size = SIZE_IN_MB(taosQueueMemorySize(pQueue));
|
||||||
// pTask->id.idStr, STREAM_TASK_INPUT_QUEUE_CAPACITY, STREAM_TASK_INPUT_QUEUE_CAPACITY_IN_SIZE, total, size);
|
|
||||||
|
qTrace("s-task:%s input queue is full, capacity:%d size:%d MiB, current(blocks:%d, size:%.2fMiB) abort",
|
||||||
|
pTask->id.idStr, STREAM_TASK_INPUT_QUEUE_CAPACITY, STREAM_TASK_INPUT_QUEUE_CAPACITY_IN_SIZE, total, size);
|
||||||
destroyStreamDataBlock((SStreamDataBlock*)pItem);
|
destroyStreamDataBlock((SStreamDataBlock*)pItem);
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
qDebug("s-task:%s blockdata enqueue, total in queue:%d, size:%.2fMiB", pTask->id.idStr, total, size);
|
|
||||||
int32_t code = taosWriteQitem(pQueue, pItem);
|
int32_t code = taosWriteQitem(pQueue, pItem);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
destroyStreamDataBlock((SStreamDataBlock*)pItem);
|
destroyStreamDataBlock((SStreamDataBlock*)pItem);
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
double size = SIZE_IN_MB(taosQueueMemorySize(pQueue));
|
||||||
|
qDebug("s-task:%s blockdata enqueue, total in queue:%d, size:%.2fMiB", pTask->id.idStr, total, size);
|
||||||
} else if (type == STREAM_INPUT__CHECKPOINT || type == STREAM_INPUT__CHECKPOINT_TRIGGER ||
|
} else if (type == STREAM_INPUT__CHECKPOINT || type == STREAM_INPUT__CHECKPOINT_TRIGGER ||
|
||||||
type == STREAM_INPUT__TRANS_STATE) {
|
type == STREAM_INPUT__TRANS_STATE) {
|
||||||
int32_t code = taosWriteQitem(pQueue, pItem);
|
int32_t code = taosWriteQitem(pQueue, pItem);
|
||||||
|
@ -317,6 +330,8 @@ int32_t streamTaskPutDataIntoInputQ(SStreamTask* pTask, SStreamQueueItem* pItem)
|
||||||
taosFreeQitem(pItem);
|
taosFreeQitem(pItem);
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
double size = SIZE_IN_MB(taosQueueMemorySize(pQueue));
|
||||||
qDebug("s-task:%s level:%d %s blockdata enqueue, total in queue:%d, size:%.2fMiB", pTask->id.idStr,
|
qDebug("s-task:%s level:%d %s blockdata enqueue, total in queue:%d, size:%.2fMiB", pTask->id.idStr,
|
||||||
pTask->info.taskLevel, streamGetBlockTypeStr(type), total, size);
|
pTask->info.taskLevel, streamGetBlockTypeStr(type), total, size);
|
||||||
} else if (type == STREAM_INPUT__GET_RES) {
|
} else if (type == STREAM_INPUT__GET_RES) {
|
||||||
|
@ -326,6 +341,8 @@ int32_t streamTaskPutDataIntoInputQ(SStreamTask* pTask, SStreamQueueItem* pItem)
|
||||||
taosFreeQitem(pItem);
|
taosFreeQitem(pItem);
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
double size = SIZE_IN_MB(taosQueueMemorySize(pQueue));
|
||||||
qDebug("s-task:%s data res enqueue, current(blocks:%d, size:%.2fMiB)", pTask->id.idStr, total, size);
|
qDebug("s-task:%s data res enqueue, current(blocks:%d, size:%.2fMiB)", pTask->id.idStr, total, size);
|
||||||
} else {
|
} else {
|
||||||
ASSERT(0);
|
ASSERT(0);
|
||||||
|
|
|
@ -371,10 +371,6 @@ int32_t initScanHistoryReq(SStreamTask* pTask, SStreamScanHistoryReq* pReq, int8
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t streamSourceScanHistoryData(SStreamTask* pTask) {
|
|
||||||
return streamScanExec(pTask, 100);
|
|
||||||
}
|
|
||||||
|
|
||||||
int32_t streamTaskPutTranstateIntoInputQ(SStreamTask* pTask) {
|
int32_t streamTaskPutTranstateIntoInputQ(SStreamTask* pTask) {
|
||||||
SStreamDataBlock* pTranstate = taosAllocateQitem(sizeof(SStreamDataBlock), DEF_QITEM, sizeof(SSDataBlock));
|
SStreamDataBlock* pTranstate = taosAllocateQitem(sizeof(SStreamDataBlock), DEF_QITEM, sizeof(SSDataBlock));
|
||||||
if (pTranstate == NULL) {
|
if (pTranstate == NULL) {
|
||||||
|
|
Loading…
Reference in New Issue