commit
f7dada53b9
|
@ -189,7 +189,7 @@ int32_t streamInit();
|
|||
void streamCleanUp();
|
||||
|
||||
SStreamQueue* streamQueueOpen(int64_t cap);
|
||||
void streamQueueClose(SStreamQueue* queue);
|
||||
void streamQueueClose(SStreamQueue* pQueue, int32_t taskId);
|
||||
|
||||
static FORCE_INLINE void streamQueueProcessSuccess(SStreamQueue* queue) {
|
||||
ASSERT(atomic_load_8(&queue->status) == STREAM_QUEUE__PROCESSING);
|
||||
|
|
|
@ -296,53 +296,70 @@ int32_t tqReaderSeek(STqReader* pReader, int64_t ver, const char* id) {
|
|||
}
|
||||
|
||||
int32_t extractMsgFromWal(SWalReader* pReader, void** pItem, int64_t maxVer, const char* id) {
|
||||
int32_t code = walNextValidMsg(pReader);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
int32_t code = 0;
|
||||
|
||||
while(1) {
|
||||
code = walNextValidMsg(pReader);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
return code;
|
||||
}
|
||||
|
||||
SWalCont* pCont = &pReader->pHead->head;
|
||||
int64_t ver = pCont->version;
|
||||
if (ver > maxVer) {
|
||||
tqDebug("maxVer in WAL:%" PRId64 " reached current:%" PRId64 ", do not scan wal anymore, %s", maxVer, ver, id);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
if (pCont->msgType == TDMT_VND_SUBMIT) {
|
||||
void* pBody = POINTER_SHIFT(pCont->body, sizeof(SSubmitReq2Msg));
|
||||
int32_t len = pCont->bodyLen - sizeof(SSubmitReq2Msg);
|
||||
|
||||
void* data = taosMemoryMalloc(len);
|
||||
if (data == NULL) {
|
||||
// todo: for all stream in this vnode, keep this offset in the offset files, and wait for a moment, and then retry
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
terrno = code;
|
||||
|
||||
tqError("vgId:%d, failed to copy submit data for stream processing, since out of memory", 0);
|
||||
return code;
|
||||
}
|
||||
|
||||
memcpy(data, pBody, len);
|
||||
SPackedData data1 = (SPackedData){.ver = ver, .msgLen = len, .msgStr = data};
|
||||
|
||||
*pItem = (SStreamQueueItem*)streamDataSubmitNew(&data1, STREAM_INPUT__DATA_SUBMIT);
|
||||
if (*pItem == NULL) {
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
terrno = code;
|
||||
tqError("%s failed to create data submit for stream since out of memory", id);
|
||||
return code;
|
||||
}
|
||||
} else if (pCont->msgType == TDMT_VND_DELETE) {
|
||||
void* pBody = POINTER_SHIFT(pCont->body, sizeof(SMsgHead));
|
||||
int32_t len = pCont->bodyLen - sizeof(SMsgHead);
|
||||
|
||||
code = extractDelDataBlock(pBody, len, ver, (SStreamRefDataBlock**)pItem);
|
||||
if (code == TSDB_CODE_SUCCESS) {
|
||||
if (*pItem == NULL) {
|
||||
tqDebug("s-task:%s empty delete msg, discard it, len:%d, ver:%" PRId64, id, len, ver);
|
||||
// we need to continue check next data in the wal files.
|
||||
continue;
|
||||
} else {
|
||||
tqDebug("s-task:%s delete msg extract from WAL, len:%d, ver:%" PRId64, id, len, ver);
|
||||
}
|
||||
} else {
|
||||
terrno = code;
|
||||
tqError("s-task:%s extract delete msg from WAL failed, code:%s", id, tstrerror(code));
|
||||
return code;
|
||||
}
|
||||
|
||||
} else {
|
||||
ASSERT(0);
|
||||
}
|
||||
|
||||
return code;
|
||||
}
|
||||
|
||||
int64_t ver = pReader->pHead->head.version;
|
||||
if (ver > maxVer) {
|
||||
tqDebug("maxVer in WAL:%"PRId64" reached current:%"PRId64", do not scan wal anymore, %s", maxVer, ver, id);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
if (pReader->pHead->head.msgType == TDMT_VND_SUBMIT) {
|
||||
void* pBody = POINTER_SHIFT(pReader->pHead->head.body, sizeof(SSubmitReq2Msg));
|
||||
int32_t len = pReader->pHead->head.bodyLen - sizeof(SSubmitReq2Msg);
|
||||
|
||||
void* data = taosMemoryMalloc(len);
|
||||
if (data == NULL) {
|
||||
// todo: for all stream in this vnode, keep this offset in the offset files, and wait for a moment, and then retry
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
tqError("vgId:%d, failed to copy submit data for stream processing, since out of memory", 0);
|
||||
return -1;
|
||||
}
|
||||
|
||||
memcpy(data, pBody, len);
|
||||
SPackedData data1 = (SPackedData){.ver = ver, .msgLen = len, .msgStr = data};
|
||||
|
||||
*pItem = (SStreamQueueItem*)streamDataSubmitNew(&data1, STREAM_INPUT__DATA_SUBMIT);
|
||||
if (*pItem == NULL) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
tqError("%s failed to create data submit for stream since out of memory", id);
|
||||
return terrno;
|
||||
}
|
||||
} else if (pReader->pHead->head.msgType == TDMT_VND_DELETE) {
|
||||
void* pBody = POINTER_SHIFT(pReader->pHead->head.body, sizeof(SMsgHead));
|
||||
int32_t len = pReader->pHead->head.bodyLen - sizeof(SMsgHead);
|
||||
|
||||
code = extractDelDataBlock(pBody, len, ver, (SStreamRefDataBlock**)pItem);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
tqError("s-task:%s extract delete msg from WAL failed, code:%s", id, tstrerror(code));
|
||||
} else {
|
||||
tqDebug("s-task:%s delete msg extract from WAL, len:%d, ver:%"PRId64, id, len, ver);
|
||||
}
|
||||
} else {
|
||||
ASSERT(0);
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
// todo ignore the error in wal?
|
||||
|
|
|
@ -780,6 +780,7 @@ int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp, i
|
|||
}
|
||||
|
||||
streamFreeQitem(pTask->msgInfo.pData);
|
||||
pTask->msgInfo.pData = NULL;
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
|
|
|
@ -35,18 +35,17 @@ FAIL:
|
|||
return NULL;
|
||||
}
|
||||
|
||||
void streamQueueClose(SStreamQueue* queue) {
|
||||
while (1) {
|
||||
void* qItem = streamQueueNextItem(queue);
|
||||
if (qItem) {
|
||||
streamFreeQitem(qItem);
|
||||
} else {
|
||||
break;
|
||||
}
|
||||
void streamQueueClose(SStreamQueue* pQueue, int32_t taskId) {
|
||||
qDebug("s-task:0x%x free the queue:%p, items in queue:%d", taskId, pQueue->queue, taosQueueItemSize(pQueue->queue));
|
||||
|
||||
void* qItem = NULL;
|
||||
while ((qItem = streamQueueNextItem(pQueue)) != NULL) {
|
||||
streamFreeQitem(qItem);
|
||||
}
|
||||
taosFreeQall(queue->qall);
|
||||
taosCloseQueue(queue->queue);
|
||||
taosMemoryFree(queue);
|
||||
|
||||
taosFreeQall(pQueue->qall);
|
||||
taosCloseQueue(pQueue->queue);
|
||||
taosMemoryFree(pQueue);
|
||||
}
|
||||
|
||||
#if 0
|
||||
|
|
|
@ -236,11 +236,11 @@ void tFreeStreamTask(SStreamTask* pTask) {
|
|||
|
||||
int32_t status = atomic_load_8((int8_t*)&(pTask->status.taskStatus));
|
||||
if (pTask->inputQueue) {
|
||||
streamQueueClose(pTask->inputQueue);
|
||||
streamQueueClose(pTask->inputQueue, pTask->id.taskId);
|
||||
}
|
||||
|
||||
if (pTask->outputInfo.queue) {
|
||||
streamQueueClose(pTask->outputInfo.queue);
|
||||
streamQueueClose(pTask->outputInfo.queue, pTask->id.taskId);
|
||||
}
|
||||
|
||||
if (pTask->exec.qmsg) {
|
||||
|
@ -271,6 +271,11 @@ void tFreeStreamTask(SStreamTask* pTask) {
|
|||
streamStateClose(pTask->pState, status == TASK_STATUS__DROPPING);
|
||||
}
|
||||
|
||||
if (pTask->msgInfo.pData != NULL) {
|
||||
destroyStreamDataBlock(pTask->msgInfo.pData);
|
||||
pTask->msgInfo.pData = NULL;
|
||||
}
|
||||
|
||||
if (pTask->id.idStr != NULL) {
|
||||
taosMemoryFree((void*)pTask->id.idStr);
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue