Merge remote-tracking branch 'origin/enh/triggerCheckPoint2' into enh/triggerCheckPoint2

This commit is contained in:
Haojun Liao 2023-08-11 10:37:40 +08:00
commit 1e636a2ef9
2 changed files with 19 additions and 7 deletions

View File

@ -220,20 +220,29 @@ int32_t vnodeSnapRead(SVSnapReader *pReader, uint8_t **ppData, uint32_t *nData)
} }
// STREAM ============ // STREAM ============
vInfo("stream task start");
if (!pReader->streamTaskDone) { if (!pReader->streamTaskDone) {
if (pReader->pStreamTaskReader == NULL) { if (pReader->pStreamTaskReader == NULL) {
vInfo("stream task start 1");
code = streamTaskSnapReaderOpen(pReader->pVnode->pTq, pReader->sver, pReader->sver, &pReader->pStreamTaskReader); code = streamTaskSnapReaderOpen(pReader->pVnode->pTq, pReader->sver, pReader->sver, &pReader->pStreamTaskReader);
if (code) goto _err; if (code) {
vInfo("stream task start err");
goto _err;
}
} }
code = streamTaskSnapRead(pReader->pStreamTaskReader, ppData); code = streamTaskSnapRead(pReader->pStreamTaskReader, ppData);
vInfo("stream task start 2");
if (code) { if (code) {
vInfo("stream task start 3");
goto _err; goto _err;
} else { } else {
if (*ppData) { if (*ppData) {
goto _exit; goto _exit;
vInfo("stream task start 4");
} else { } else {
pReader->streamTaskDone = 1; pReader->streamTaskDone = 1;
code = streamTaskSnapReaderClose(pReader->pStreamTaskReader); code = streamTaskSnapReaderClose(pReader->pStreamTaskReader);
vInfo("stream task start 5");
if (code) goto _err; if (code) goto _err;
pReader->pStreamTaskReader = NULL; pReader->pStreamTaskReader = NULL;
} }

View File

@ -43,7 +43,7 @@ SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandF
SStreamMeta* pMeta = taosMemoryCalloc(1, sizeof(SStreamMeta)); SStreamMeta* pMeta = taosMemoryCalloc(1, sizeof(SStreamMeta));
if (pMeta == NULL) { if (pMeta == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
qError("vgId:%d failed to prepare stream meta, alloc size:%"PRIzu", out of memory", vgId, sizeof(SStreamMeta)); qError("vgId:%d failed to prepare stream meta, alloc size:%" PRIzu ", out of memory", vgId, sizeof(SStreamMeta));
return NULL; return NULL;
} }
@ -122,7 +122,7 @@ SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandF
taosInitRWLatch(&pMeta->lock); taosInitRWLatch(&pMeta->lock);
taosThreadMutexInit(&pMeta->backendMutex, NULL); taosThreadMutexInit(&pMeta->backendMutex, NULL);
qInfo("vgId:%d open stream meta successfully, latest checkpoint:%"PRId64, vgId, chkpId); qInfo("vgId:%d open stream meta successfully, latest checkpoint:%" PRId64, vgId, chkpId);
return pMeta; return pMeta;
_err: _err:
@ -390,6 +390,9 @@ int64_t streamGetLatestCheckpointId(SStreamMeta* pMeta) {
tdbTbcMoveToFirst(pCur); tdbTbcMoveToFirst(pCur);
while (tdbTbcNext(pCur, &pKey, &kLen, &pVal, &vLen) == 0) { while (tdbTbcNext(pCur, &pKey, &kLen, &pVal, &vLen) == 0) {
if (pVal != NULL && vLen != 0) {
break;
}
SCheckpointInfo info; SCheckpointInfo info;
tDecoderInit(&decoder, (uint8_t*)pVal, vLen); tDecoderInit(&decoder, (uint8_t*)pVal, vLen);
if (tDecodeStreamTaskChkInfo(&decoder, &info) < 0) { if (tDecodeStreamTaskChkInfo(&decoder, &info) < 0) {