Merge pull request #22558 from taosdata/fix/TD-25920

fix(stream): continue check wal when meeting empty delete block msg.
This commit is contained in:
Haojun Liao 2023-08-24 18:28:15 +08:00 committed by GitHub
commit f7baadb2a9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 62 additions and 45 deletions

View File

@ -296,27 +296,33 @@ int32_t tqReaderSeek(STqReader* pReader, int64_t ver, const char* id) {
} }
int32_t extractMsgFromWal(SWalReader* pReader, void** pItem, int64_t maxVer, const char* id) { int32_t extractMsgFromWal(SWalReader* pReader, void** pItem, int64_t maxVer, const char* id) {
int32_t code = walNextValidMsg(pReader); int32_t code = 0;
while(1) {
code = walNextValidMsg(pReader);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
return code; return code;
} }
int64_t ver = pReader->pHead->head.version; SWalCont* pCont = &pReader->pHead->head;
int64_t ver = pCont->version;
if (ver > maxVer) { if (ver > maxVer) {
tqDebug("maxVer in WAL:%" PRId64 " reached current:%" PRId64 ", do not scan wal anymore, %s", maxVer, ver, id); tqDebug("maxVer in WAL:%" PRId64 " reached current:%" PRId64 ", do not scan wal anymore, %s", maxVer, ver, id);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
if (pReader->pHead->head.msgType == TDMT_VND_SUBMIT) { if (pCont->msgType == TDMT_VND_SUBMIT) {
void* pBody = POINTER_SHIFT(pReader->pHead->head.body, sizeof(SSubmitReq2Msg)); void* pBody = POINTER_SHIFT(pCont->body, sizeof(SSubmitReq2Msg));
int32_t len = pReader->pHead->head.bodyLen - sizeof(SSubmitReq2Msg); int32_t len = pCont->bodyLen - sizeof(SSubmitReq2Msg);
void* data = taosMemoryMalloc(len); void* data = taosMemoryMalloc(len);
if (data == NULL) { if (data == NULL) {
// todo: for all stream in this vnode, keep this offset in the offset files, and wait for a moment, and then retry // todo: for all stream in this vnode, keep this offset in the offset files, and wait for a moment, and then retry
terrno = TSDB_CODE_OUT_OF_MEMORY; code = TSDB_CODE_OUT_OF_MEMORY;
terrno = code;
tqError("vgId:%d, failed to copy submit data for stream processing, since out of memory", 0); tqError("vgId:%d, failed to copy submit data for stream processing, since out of memory", 0);
return -1; return code;
} }
memcpy(data, pBody, len); memcpy(data, pBody, len);
@ -324,25 +330,36 @@ int32_t extractMsgFromWal(SWalReader* pReader, void** pItem, int64_t maxVer, con
*pItem = (SStreamQueueItem*)streamDataSubmitNew(&data1, STREAM_INPUT__DATA_SUBMIT); *pItem = (SStreamQueueItem*)streamDataSubmitNew(&data1, STREAM_INPUT__DATA_SUBMIT);
if (*pItem == NULL) { if (*pItem == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY; code = TSDB_CODE_OUT_OF_MEMORY;
terrno = code;
tqError("%s failed to create data submit for stream since out of memory", id); tqError("%s failed to create data submit for stream since out of memory", id);
return terrno; return code;
} }
} else if (pReader->pHead->head.msgType == TDMT_VND_DELETE) { } else if (pCont->msgType == TDMT_VND_DELETE) {
void* pBody = POINTER_SHIFT(pReader->pHead->head.body, sizeof(SMsgHead)); void* pBody = POINTER_SHIFT(pCont->body, sizeof(SMsgHead));
int32_t len = pReader->pHead->head.bodyLen - sizeof(SMsgHead); int32_t len = pCont->bodyLen - sizeof(SMsgHead);
code = extractDelDataBlock(pBody, len, ver, (SStreamRefDataBlock**)pItem); code = extractDelDataBlock(pBody, len, ver, (SStreamRefDataBlock**)pItem);
if (code != TSDB_CODE_SUCCESS) { if (code == TSDB_CODE_SUCCESS) {
tqError("s-task:%s extract delete msg from WAL failed, code:%s", id, tstrerror(code)); if (*pItem == NULL) {
tqDebug("s-task:%s empty delete msg, discard it, len:%d, ver:%" PRId64, id, len, ver);
// we need to continue check next data in the wal files.
continue;
} else { } else {
tqDebug("s-task:%s delete msg extract from WAL, len:%d, ver:%" PRId64, id, len, ver); tqDebug("s-task:%s delete msg extract from WAL, len:%d, ver:%" PRId64, id, len, ver);
} }
} else {
terrno = code;
tqError("s-task:%s extract delete msg from WAL failed, code:%s", id, tstrerror(code));
return code;
}
} else { } else {
ASSERT(0); ASSERT(0);
} }
return 0; return code;
}
} }
// todo ignore the error in wal? // todo ignore the error in wal?