diff --git a/source/libs/stream/src/streamQueue.c b/source/libs/stream/src/streamQueue.c index 13580b03fa..f4824078c4 100644 --- a/source/libs/stream/src/streamQueue.c +++ b/source/libs/stream/src/streamQueue.c @@ -40,13 +40,13 @@ SStreamQueue* streamQueueOpen(int64_t cap) { return pQueue; } -void streamQueueClose(SStreamQueue* pQueue) { - streamQueueCleanup(pQueue); +// void streamQueueClose(SStreamQueue* pQueue) { +// streamQueueCleanup(pQueue); - taosFreeQall(pQueue->qall); - taosCloseQueue(pQueue->queue); - taosMemoryFree(pQueue); -} +// taosFreeQall(pQueue->qall); +// taosCloseQueue(pQueue->queue); +// taosMemoryFree(pQueue); +// } void streamQueueClose(SStreamQueue* pQueue, int32_t taskId) { qDebug("s-task:0x%x free the queue:%p, items in queue:%d", taskId, pQueue->queue, taosQueueItemSize(pQueue->queue)); @@ -54,7 +54,7 @@ void streamQueueClose(SStreamQueue* pQueue, int32_t taskId) { while ((qItem = streamQueueNextItem(pQueue)) != NULL) { streamFreeQitem(qItem); } - + pQueue->status = STREAM_QUEUE__SUCESS; taosFreeQall(pQueue->qall); taosCloseQueue(pQueue->queue); @@ -119,14 +119,13 @@ SStreamQueueRes streamQueueGetRes(SStreamQueue1* pQueue) { } #endif - // todo refactor: // read data from input queue typedef struct SQueueReader { SStreamQueue* pQueue; - int32_t taskLevel; - int32_t maxBlocks; // maximum block in one batch - int32_t waitDuration; // maximum wait time to format several block into a batch to process, unit: ms + int32_t taskLevel; + int32_t maxBlocks; // maximum block in one batch + int32_t waitDuration; // maximum wait time to format several block into a batch to process, unit: ms } SQueueReader; #if 0 @@ -226,7 +225,6 @@ int32_t extractBlocksFromInputQ(SStreamTask* pTask, SStreamQueueItem** pInput, i if (qItem->type == STREAM_INPUT__CHECKPOINT || qItem->type == STREAM_INPUT__CHECKPOINT_TRIGGER || qItem->type == STREAM_INPUT__TRANS_STATE) { if (*pInput == NULL) { - char* p = NULL; if (qItem->type == STREAM_INPUT__CHECKPOINT) { p = "checkpoint"; @@ -242,7 +240,8 @@ int32_t extractBlocksFromInputQ(SStreamTask* pTask, SStreamQueueItem** pInput, i return TSDB_CODE_SUCCESS; } else { // previous existed blocks needs to be handle, before handle the checkpoint msg block - qDebug("s-task:%s checkpoint/transtate msg extracted, handle previous blocks, numOfBlocks:%d", id, *numOfBlocks); + qDebug("s-task:%s checkpoint/transtate msg extracted, handle previous blocks, numOfBlocks:%d", id, + *numOfBlocks); streamQueueProcessFail(pTask->inputQueue); return TSDB_CODE_SUCCESS; }