fix(stream): continue check wal when meeting empty delete block msg.
This commit is contained in:
parent
5445e836de
commit
4c6bc4d2c3
|
@ -296,53 +296,70 @@ int32_t tqReaderSeek(STqReader* pReader, int64_t ver, const char* id) {
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t extractMsgFromWal(SWalReader* pReader, void** pItem, int64_t maxVer, const char* id) {
|
int32_t extractMsgFromWal(SWalReader* pReader, void** pItem, int64_t maxVer, const char* id) {
|
||||||
int32_t code = walNextValidMsg(pReader);
|
int32_t code = 0;
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
SWalCont* pCont = &pReader->pHead->head;
|
||||||
|
|
||||||
|
while(1) {
|
||||||
|
code = walNextValidMsg(pReader);
|
||||||
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
|
int64_t ver = pCont->version;
|
||||||
|
if (ver > maxVer) {
|
||||||
|
tqDebug("maxVer in WAL:%" PRId64 " reached current:%" PRId64 ", do not scan wal anymore, %s", maxVer, ver, id);
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (pCont->msgType == TDMT_VND_SUBMIT) {
|
||||||
|
void* pBody = POINTER_SHIFT(pCont->body, sizeof(SSubmitReq2Msg));
|
||||||
|
int32_t len = pCont->bodyLen - sizeof(SSubmitReq2Msg);
|
||||||
|
|
||||||
|
void* data = taosMemoryMalloc(len);
|
||||||
|
if (data == NULL) {
|
||||||
|
// todo: for all stream in this vnode, keep this offset in the offset files, and wait for a moment, and then retry
|
||||||
|
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
terrno = code;
|
||||||
|
|
||||||
|
tqError("vgId:%d, failed to copy submit data for stream processing, since out of memory", 0);
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
|
memcpy(data, pBody, len);
|
||||||
|
SPackedData data1 = (SPackedData){.ver = ver, .msgLen = len, .msgStr = data};
|
||||||
|
|
||||||
|
*pItem = (SStreamQueueItem*)streamDataSubmitNew(&data1, STREAM_INPUT__DATA_SUBMIT);
|
||||||
|
if (*pItem == NULL) {
|
||||||
|
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
terrno = code;
|
||||||
|
tqError("%s failed to create data submit for stream since out of memory", id);
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
} else if (pCont->msgType == TDMT_VND_DELETE) {
|
||||||
|
void* pBody = POINTER_SHIFT(pCont->body, sizeof(SMsgHead));
|
||||||
|
int32_t len = pCont->bodyLen - sizeof(SMsgHead);
|
||||||
|
|
||||||
|
code = extractDelDataBlock(pBody, len, ver, (SStreamRefDataBlock**)pItem);
|
||||||
|
if (code == TSDB_CODE_SUCCESS) {
|
||||||
|
if (*pItem == NULL) {
|
||||||
|
tqDebug("s-task:%s empty delete msg, discard it, len:%d, ver:%" PRId64, id, len, ver);
|
||||||
|
// we need to continue check next data in the wal files.
|
||||||
|
continue;
|
||||||
|
} else {
|
||||||
|
tqDebug("s-task:%s delete msg extract from WAL, len:%d, ver:%" PRId64, id, len, ver);
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
terrno = code;
|
||||||
|
tqError("s-task:%s extract delete msg from WAL failed, code:%s", id, tstrerror(code));
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
|
} else {
|
||||||
|
ASSERT(0);
|
||||||
|
}
|
||||||
|
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
int64_t ver = pReader->pHead->head.version;
|
|
||||||
if (ver > maxVer) {
|
|
||||||
tqDebug("maxVer in WAL:%"PRId64" reached current:%"PRId64", do not scan wal anymore, %s", maxVer, ver, id);
|
|
||||||
return TSDB_CODE_SUCCESS;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (pReader->pHead->head.msgType == TDMT_VND_SUBMIT) {
|
|
||||||
void* pBody = POINTER_SHIFT(pReader->pHead->head.body, sizeof(SSubmitReq2Msg));
|
|
||||||
int32_t len = pReader->pHead->head.bodyLen - sizeof(SSubmitReq2Msg);
|
|
||||||
|
|
||||||
void* data = taosMemoryMalloc(len);
|
|
||||||
if (data == NULL) {
|
|
||||||
// todo: for all stream in this vnode, keep this offset in the offset files, and wait for a moment, and then retry
|
|
||||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
|
||||||
tqError("vgId:%d, failed to copy submit data for stream processing, since out of memory", 0);
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
|
|
||||||
memcpy(data, pBody, len);
|
|
||||||
SPackedData data1 = (SPackedData){.ver = ver, .msgLen = len, .msgStr = data};
|
|
||||||
|
|
||||||
*pItem = (SStreamQueueItem*)streamDataSubmitNew(&data1, STREAM_INPUT__DATA_SUBMIT);
|
|
||||||
if (*pItem == NULL) {
|
|
||||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
|
||||||
tqError("%s failed to create data submit for stream since out of memory", id);
|
|
||||||
return terrno;
|
|
||||||
}
|
|
||||||
} else if (pReader->pHead->head.msgType == TDMT_VND_DELETE) {
|
|
||||||
void* pBody = POINTER_SHIFT(pReader->pHead->head.body, sizeof(SMsgHead));
|
|
||||||
int32_t len = pReader->pHead->head.bodyLen - sizeof(SMsgHead);
|
|
||||||
|
|
||||||
code = extractDelDataBlock(pBody, len, ver, (SStreamRefDataBlock**)pItem);
|
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
|
||||||
tqError("s-task:%s extract delete msg from WAL failed, code:%s", id, tstrerror(code));
|
|
||||||
} else {
|
|
||||||
tqDebug("s-task:%s delete msg extract from WAL, len:%d, ver:%"PRId64, id, len, ver);
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
ASSERT(0);
|
|
||||||
}
|
|
||||||
|
|
||||||
return 0;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// todo ignore the error in wal?
|
// todo ignore the error in wal?
|
||||||
|
|
Loading…
Reference in New Issue