fix compile error

This commit is contained in:
yihaoDeng 2023-08-25 09:02:56 +00:00
parent 60d1df0e25
commit a02b35ed86
1 changed files with 12 additions and 13 deletions

View File

@ -40,13 +40,13 @@ SStreamQueue* streamQueueOpen(int64_t cap) {
return pQueue; return pQueue;
} }
void streamQueueClose(SStreamQueue* pQueue) { // void streamQueueClose(SStreamQueue* pQueue) {
streamQueueCleanup(pQueue); // streamQueueCleanup(pQueue);
taosFreeQall(pQueue->qall); // taosFreeQall(pQueue->qall);
taosCloseQueue(pQueue->queue); // taosCloseQueue(pQueue->queue);
taosMemoryFree(pQueue); // taosMemoryFree(pQueue);
} // }
void streamQueueClose(SStreamQueue* pQueue, int32_t taskId) { void streamQueueClose(SStreamQueue* pQueue, int32_t taskId) {
qDebug("s-task:0x%x free the queue:%p, items in queue:%d", taskId, pQueue->queue, taosQueueItemSize(pQueue->queue)); qDebug("s-task:0x%x free the queue:%p, items in queue:%d", taskId, pQueue->queue, taosQueueItemSize(pQueue->queue));
@ -54,7 +54,7 @@ void streamQueueClose(SStreamQueue* pQueue, int32_t taskId) {
while ((qItem = streamQueueNextItem(pQueue)) != NULL) { while ((qItem = streamQueueNextItem(pQueue)) != NULL) {
streamFreeQitem(qItem); streamFreeQitem(qItem);
} }
pQueue->status = STREAM_QUEUE__SUCESS; pQueue->status = STREAM_QUEUE__SUCESS;
taosFreeQall(pQueue->qall); taosFreeQall(pQueue->qall);
taosCloseQueue(pQueue->queue); taosCloseQueue(pQueue->queue);
@ -119,14 +119,13 @@ SStreamQueueRes streamQueueGetRes(SStreamQueue1* pQueue) {
} }
#endif #endif
// todo refactor: // todo refactor:
// read data from input queue // read data from input queue
typedef struct SQueueReader { typedef struct SQueueReader {
SStreamQueue* pQueue; SStreamQueue* pQueue;
int32_t taskLevel; int32_t taskLevel;
int32_t maxBlocks; // maximum block in one batch int32_t maxBlocks; // maximum block in one batch
int32_t waitDuration; // maximum wait time to format several block into a batch to process, unit: ms int32_t waitDuration; // maximum wait time to format several block into a batch to process, unit: ms
} SQueueReader; } SQueueReader;
#if 0 #if 0
@ -226,7 +225,6 @@ int32_t extractBlocksFromInputQ(SStreamTask* pTask, SStreamQueueItem** pInput, i
if (qItem->type == STREAM_INPUT__CHECKPOINT || qItem->type == STREAM_INPUT__CHECKPOINT_TRIGGER || if (qItem->type == STREAM_INPUT__CHECKPOINT || qItem->type == STREAM_INPUT__CHECKPOINT_TRIGGER ||
qItem->type == STREAM_INPUT__TRANS_STATE) { qItem->type == STREAM_INPUT__TRANS_STATE) {
if (*pInput == NULL) { if (*pInput == NULL) {
char* p = NULL; char* p = NULL;
if (qItem->type == STREAM_INPUT__CHECKPOINT) { if (qItem->type == STREAM_INPUT__CHECKPOINT) {
p = "checkpoint"; p = "checkpoint";
@ -242,7 +240,8 @@ int32_t extractBlocksFromInputQ(SStreamTask* pTask, SStreamQueueItem** pInput, i
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} else { } else {
// previous existed blocks needs to be handle, before handle the checkpoint msg block // previous existed blocks needs to be handle, before handle the checkpoint msg block
qDebug("s-task:%s checkpoint/transtate msg extracted, handle previous blocks, numOfBlocks:%d", id, *numOfBlocks); qDebug("s-task:%s checkpoint/transtate msg extracted, handle previous blocks, numOfBlocks:%d", id,
*numOfBlocks);
streamQueueProcessFail(pTask->inputQueue); streamQueueProcessFail(pTask->inputQueue);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }