refactor log

This commit is contained in:
yihaoDeng 2023-08-24 10:15:15 +08:00
parent 0959758bd5
commit 3afea998ac
4 changed files with 57 additions and 42 deletions

View File

@ -16,6 +16,8 @@
#define _STREAM_BACKEDN_SNAPSHOT_H_ #define _STREAM_BACKEDN_SNAPSHOT_H_
#include "tcommon.h" #include "tcommon.h"
#define STREAM_STATE_TRANSFER "stream-state-transfer"
typedef struct SStreamSnapReader SStreamSnapReader; typedef struct SStreamSnapReader SStreamSnapReader;
typedef struct SStreamSnapWriter SStreamSnapWriter; typedef struct SStreamSnapWriter SStreamSnapWriter;

View File

@ -26,7 +26,7 @@ struct SStreamStateReader {
TBC* pCur; TBC* pCur;
SStreamSnapReader* pReaderImpl; SStreamSnapReader* pReaderImpl;
int32_t complete; int32_t complete; // open reader or not
}; };
int32_t streamStateSnapReaderOpen(STQ* pTq, int64_t sver, int64_t ever, SStreamStateReader** ppReader) { int32_t streamStateSnapReaderOpen(STQ* pTq, int64_t sver, int64_t ever, SStreamStateReader** ppReader) {
@ -60,26 +60,29 @@ int32_t streamStateSnapReaderOpen(STQ* pTq, int64_t sver, int64_t ever, SStreamS
} }
pReader->pReaderImpl = pSnapReader; pReader->pReaderImpl = pSnapReader;
tqDebug("vgId:%d, vnode stream-state snapshot reader opened", TD_VID(pTq->pVnode)); tqDebug("vgId:%d, vnode %s snapshot reader opened", TD_VID(pTq->pVnode), STREAM_STATE_TRANSFER);
*ppReader = pReader; *ppReader = pReader;
return code; return code;
_err: _err:
tqError("vgId:%d, vnode stream-state snapshot reader failed to open since %s", TD_VID(pTq->pVnode), tstrerror(code)); tqError("vgId:%d, vnode %s snapshot reader failed to open since %s", TD_VID(pTq->pVnode), STREAM_STATE_TRANSFER,
tstrerror(code));
*ppReader = NULL; *ppReader = NULL;
return code; return code;
} }
int32_t streamStateSnapReaderClose(SStreamStateReader* pReader) { int32_t streamStateSnapReaderClose(SStreamStateReader* pReader) {
int32_t code = 0; int32_t code = 0;
tqDebug("vgId:%d, vnode stream-state snapshot reader closed", TD_VID(pReader->pTq->pVnode)); tqDebug("vgId:%d, vnode %s snapshot reader closed", TD_VID(pReader->pTq->pVnode), STREAM_STATE_TRANSFER);
streamSnapReaderClose(pReader->pReaderImpl); streamSnapReaderClose(pReader->pReaderImpl);
taosMemoryFree(pReader); taosMemoryFree(pReader);
return code; return code;
} }
int32_t streamStateSnapRead(SStreamStateReader* pReader, uint8_t** ppData) { int32_t streamStateSnapRead(SStreamStateReader* pReader, uint8_t** ppData) {
tqDebug("vgId:%d, vnode %s snapshot read data", TD_VID(pReader->pTq->pVnode), STREAM_STATE_TRANSFER);
int32_t code = 0; int32_t code = 0;
if (pReader->complete == 0) { if (pReader->complete == 0) {
return 0; return 0;
@ -143,13 +146,14 @@ int32_t streamStateSnapWriterOpen(STQ* pTq, int64_t sver, int64_t ever, SStreamS
goto _err; goto _err;
} }
tqDebug("vgId:%d, vnode stream-state snapshot writer opened, path:%s", TD_VID(pTq->pVnode), tdir); tqDebug("vgId:%d, vnode %s snapshot writer opened, path:%s", TD_VID(pTq->pVnode), STREAM_STATE_TRANSFER, tdir);
pWriter->pWriterImpl = pSnapWriter; pWriter->pWriterImpl = pSnapWriter;
*ppWriter = pWriter; *ppWriter = pWriter;
return code; return code;
_err: _err:
tqError("vgId:%d, vnode stream-state snapshot writer failed to open since %s", TD_VID(pTq->pVnode), tstrerror(code)); tqError("vgId:%d, vnode %s snapshot writer failed to open since %s", TD_VID(pTq->pVnode), STREAM_STATE_TRANSFER,
tstrerror(code));
taosMemoryFree(pWriter); taosMemoryFree(pWriter);
*ppWriter = NULL; *ppWriter = NULL;
return -1; return -1;
@ -157,16 +161,18 @@ _err:
int32_t streamStateSnapWriterClose(SStreamStateWriter* pWriter, int8_t rollback) { int32_t streamStateSnapWriterClose(SStreamStateWriter* pWriter, int8_t rollback) {
int32_t code = 0; int32_t code = 0;
tqDebug("vgId:%d, vnode stream-state snapshot writer closed", TD_VID(pWriter->pTq->pVnode)); tqDebug("vgId:%d, vnode %s snapshot writer closed", TD_VID(pWriter->pTq->pVnode), STREAM_STATE_TRANSFER);
code = streamSnapWriterClose(pWriter->pWriterImpl, rollback); code = streamSnapWriterClose(pWriter->pWriterImpl, rollback);
return code; return code;
} }
int32_t streamStateRebuildFromSnap(SStreamStateWriter* pWriter, int64_t chkpId) { int32_t streamStateRebuildFromSnap(SStreamStateWriter* pWriter, int64_t chkpId) {
tqDebug("vgId:%d, vnode %s start to rebuild stream-state", TD_VID(pWriter->pTq->pVnode), STREAM_STATE_TRANSFER);
int32_t code = streamMetaReopen(pWriter->pTq->pStreamMeta, chkpId); int32_t code = streamMetaReopen(pWriter->pTq->pStreamMeta, chkpId);
if (code == 0) { if (code == 0) {
code = streamStateLoadTasks(pWriter); code = streamStateLoadTasks(pWriter);
} }
tqDebug("vgId:%d, vnode %s succ to rebuild stream-state", TD_VID(pWriter->pTq->pVnode), STREAM_STATE_TRANSFER);
taosMemoryFree(pWriter); taosMemoryFree(pWriter);
return code; return code;
} }
@ -174,6 +180,6 @@ int32_t streamStateRebuildFromSnap(SStreamStateWriter* pWriter, int64_t chkpId)
int32_t streamStateLoadTasks(SStreamStateWriter* pWriter) { return streamLoadTasks(pWriter->pTq->pStreamMeta); } int32_t streamStateLoadTasks(SStreamStateWriter* pWriter) { return streamLoadTasks(pWriter->pTq->pStreamMeta); }
int32_t streamStateSnapWrite(SStreamStateWriter* pWriter, uint8_t* pData, uint32_t nData) { int32_t streamStateSnapWrite(SStreamStateWriter* pWriter, uint8_t* pData, uint32_t nData) {
tqDebug("vgId:%d, vnode stream-state snapshot write", TD_VID(pWriter->pTq->pVnode)); tqDebug("vgId:%d, vnode %s snapshot write data", TD_VID(pWriter->pTq->pVnode), STREAM_STATE_TRANSFER);
return streamSnapWrite(pWriter->pWriterImpl, pData + sizeof(SSnapDataHdr), nData - sizeof(SSnapDataHdr)); return streamSnapWrite(pWriter->pWriterImpl, pData + sizeof(SSnapDataHdr), nData - sizeof(SSnapDataHdr));
} }

View File

@ -792,12 +792,6 @@ int32_t chkpGetAllDbCfHandle(SStreamMeta* pMeta, rocksdb_column_family_handle_t*
if (wrapper->pHandle[i]) { if (wrapper->pHandle[i]) {
rocksdb_column_family_handle_t* p = wrapper->pHandle[i]; rocksdb_column_family_handle_t* p = wrapper->pHandle[i];
taosArrayPush(pHandle, &p); taosArrayPush(pHandle, &p);
// size_t len = 0;
// char* name = rocksdb_column_family_handle_get_name(p, &len);
// char buf[64] = {0};
// memcpy(buf, name, len);
// qError("column name: name: %s, len: %d", buf, (int)len);
// taosMemoryFree(name);
} }
} }
taosThreadRwlockUnlock(&wrapper->rwLock); taosThreadRwlockUnlock(&wrapper->rwLock);
@ -972,8 +966,7 @@ int32_t streamBackendDoCheckpoint(void* arg, uint64_t checkpointId) {
// delete obsolte checkpoint // delete obsolte checkpoint
delObsoleteCheckpoint(arg, pChkpDir); delObsoleteCheckpoint(arg, pChkpDir);
pMeta->chkpId = checkpointId;
// pMeta->chkpId = checkpointId;
} }
_ERROR: _ERROR:

View File

@ -111,34 +111,45 @@ TdFilePtr streamOpenFile(char* path, char* name, int32_t opt) {
int32_t streamSnapHandleInit(SStreamSnapHandle* pHandle, char* path, int64_t chkpId, void* pMeta) { int32_t streamSnapHandleInit(SStreamSnapHandle* pHandle, char* path, int64_t chkpId, void* pMeta) {
// impl later // impl later
int len = strlen(path); int len = strlen(path);
char* tdir = taosMemoryCalloc(1, len + 128); char* tdir = taosMemoryCalloc(1, len + 256);
memcpy(tdir, path, len); memcpy(tdir, path, len);
int32_t code = 0; int32_t code = 0;
int8_t chkpFlag = 0;
if (chkpId != 0) { if (chkpId != 0) {
sprintf(tdir, "%s%s%s%s%s%scheckpoint%" PRId64 "", path, TD_DIRSEP, "stream", TD_DIRSEP, "checkpoints", TD_DIRSEP, sprintf(tdir, "%s%s%s%s%s%scheckpoint%" PRId64 "", path, TD_DIRSEP, "stream", TD_DIRSEP, "checkpoints", TD_DIRSEP,
chkpId); chkpId);
if (taosIsDir(tdir)) {
chkpFlag = 1;
qInfo("%s start to read snap %s", STREAM_STATE_TRANSFER, tdir);
} else {
qWarn("%s failed to read from %s, reason: dir not exist,retry to default state dir", STREAM_STATE_TRANSFER, tdir);
}
}
} else { if (chkpFlag == 0) {
sprintf(tdir, "%s%s%s%s%s", path, TD_DIRSEP, "stream", TD_DIRSEP, "state"); sprintf(tdir, "%s%s%s%s%s", path, TD_DIRSEP, "stream", TD_DIRSEP, "state");
char* chkpdir = taosMemoryCalloc(1, len + 256); char* chkpdir = taosMemoryCalloc(1, len + 256);
sprintf(chkpdir, "%s%s%s", tdir, TD_DIRSEP, "tmp"); sprintf(chkpdir, "%s%s%s", tdir, TD_DIRSEP, "tmp");
taosMemoryFree(tdir); taosMemoryFree(tdir);
tdir = chkpdir; tdir = chkpdir;
qInfo("%s start to trigger checkpoint on %s", STREAM_STATE_TRANSFER, tdir);
code = streamBackendTriggerChkp(pMeta, tdir); code = streamBackendTriggerChkp(pMeta, tdir);
if (code != 0) { if (code != 0) {
qError("failed to trigger chekckpoint at %s", tdir); qError("%s failed to trigger chekckpoint at %s", STREAM_STATE_TRANSFER, tdir);
taosMemoryFree(tdir); taosMemoryFree(tdir);
return code; return code;
} }
} }
qInfo("start to read dir: %s", tdir);
qInfo("%s start to read dir: %s", STREAM_STATE_TRANSFER, tdir);
TdDirPtr pDir = taosOpenDir(tdir); TdDirPtr pDir = taosOpenDir(tdir);
if (NULL == pDir) { if (NULL == pDir) {
qError("stream-state failed to open %s", tdir); qError("%s failed to open %s", STREAM_STATE_TRANSFER, tdir);
goto _err; goto _err;
} }
@ -178,22 +189,24 @@ int32_t streamSnapHandleInit(SStreamSnapHandle* pHandle, char* path, int64_t chk
} }
{ {
char* buf = taosMemoryCalloc(1, 512); char* buf = taosMemoryCalloc(1, 512);
sprintf(buf, "current: %s", pFile->pCurrent); sprintf(buf, "[current: %s,", pFile->pCurrent);
sprintf(buf + strlen(buf), "MANIFEST: %s", pFile->pMainfest); sprintf(buf + strlen(buf), "MANIFEST: %s,", pFile->pMainfest);
sprintf(buf + strlen(buf), "options: %s", pFile->pOptions); sprintf(buf + strlen(buf), "options: %s,", pFile->pOptions);
for (int i = 0; i < taosArrayGetSize(pFile->pSst); i++) { for (int i = 0; i < taosArrayGetSize(pFile->pSst); i++) {
char* name = taosArrayGetP(pFile->pSst, i); char* name = taosArrayGetP(pFile->pSst, i);
sprintf(buf + strlen(buf), "sst: %s", name); sprintf(buf + strlen(buf), "%s,", name);
} }
qInfo("get file list: %s", buf); sprintf(buf + strlen(buf) - 1, "]");
qInfo("%s get file list: %s", STREAM_STATE_TRANSFER, buf);
taosMemoryFree(buf); taosMemoryFree(buf);
} }
taosCloseDir(&pDir); taosCloseDir(&pDir);
if (pFile->pCurrent == NULL) { if (pFile->pCurrent == NULL) {
qError("stream-state failed to open %s, reason: no valid file", tdir); qError("%s failed to open %s, reason: no valid file", STREAM_STATE_TRANSFER, tdir);
code = -1; code = -1;
tdir = NULL; tdir = NULL;
goto _err; goto _err;
@ -313,24 +326,24 @@ int32_t streamSnapRead(SStreamSnapReader* pReader, uint8_t** ppData, int64_t* si
return 0; return 0;
} else { } else {
pHandle->fd = streamOpenFile(pFile->path, item->name, TD_FILE_READ); pHandle->fd = streamOpenFile(pFile->path, item->name, TD_FILE_READ);
qDebug("stream-state open file %s, current offset:%" PRId64 ", size:% " PRId64 ", file no.%d", item->name, qDebug("%s open file %s, current offset:%" PRId64 ", size:% " PRId64 ", file no.%d", STREAM_STATE_TRANSFER,
(int64_t)pHandle->offset, item->size, pHandle->currFileIdx); item->name, (int64_t)pHandle->offset, item->size, pHandle->currFileIdx);
} }
} }
qDebug("stream-state start to read file %s, current offset:%" PRId64 ", size:%" PRId64 ", file no.%d", item->name, qDebug("%s start to read file %s, current offset:%" PRId64 ", size:%" PRId64 ", file no.%d", STREAM_STATE_TRANSFER,
(int64_t)pHandle->offset, item->size, pHandle->currFileIdx); item->name, (int64_t)pHandle->offset, item->size, pHandle->currFileIdx);
uint8_t* buf = taosMemoryCalloc(1, sizeof(SStreamSnapBlockHdr) + kBlockSize); uint8_t* buf = taosMemoryCalloc(1, sizeof(SStreamSnapBlockHdr) + kBlockSize);
int64_t nread = taosPReadFile(pHandle->fd, buf + sizeof(SStreamSnapBlockHdr), kBlockSize, pHandle->offset); int64_t nread = taosPReadFile(pHandle->fd, buf + sizeof(SStreamSnapBlockHdr), kBlockSize, pHandle->offset);
if (nread == -1) { if (nread == -1) {
code = TAOS_SYSTEM_ERROR(terrno); code = TAOS_SYSTEM_ERROR(terrno);
qError("stream-state snap failed to read snap, file name:%s, type:%d,reason:%s", item->name, item->type, qError("%s snap failed to read snap, file name:%s, type:%d,reason:%s", STREAM_STATE_TRANSFER, item->name,
tstrerror(code)); item->type, tstrerror(code));
return -1; return -1;
} else if (nread > 0 && nread <= kBlockSize) { } else if (nread > 0 && nread <= kBlockSize) {
// left bytes less than kBlockSize // left bytes less than kBlockSize
qDebug("stream-state read file %s, current offset:%" PRId64 ",size:% " PRId64 ", file no.%d", item->name, qDebug("%s read file %s, current offset:%" PRId64 ",size:% " PRId64 ", file no.%d", STREAM_STATE_TRANSFER,
(int64_t)pHandle->offset, item->size, pHandle->currFileIdx); item->name, (int64_t)pHandle->offset, item->size, pHandle->currFileIdx);
pHandle->offset += nread; pHandle->offset += nread;
if (pHandle->offset >= item->size || nread < kBlockSize) { if (pHandle->offset >= item->size || nread < kBlockSize) {
taosCloseFile(&pHandle->fd); taosCloseFile(&pHandle->fd);
@ -338,7 +351,8 @@ int32_t streamSnapRead(SStreamSnapReader* pReader, uint8_t** ppData, int64_t* si
pHandle->currFileIdx += 1; pHandle->currFileIdx += 1;
} }
} else { } else {
qDebug("stream-state no data read, close file no.%d, move to next file, open and read", pHandle->currFileIdx); qDebug("%s no data read, close file no.%d, move to next file, open and read", STREAM_STATE_TRANSFER,
pHandle->currFileIdx);
taosCloseFile(&pHandle->fd); taosCloseFile(&pHandle->fd);
pHandle->offset = 0; pHandle->offset = 0;
pHandle->currFileIdx += 1; pHandle->currFileIdx += 1;
@ -355,8 +369,8 @@ int32_t streamSnapRead(SStreamSnapReader* pReader, uint8_t** ppData, int64_t* si
nread = taosPReadFile(pHandle->fd, buf + sizeof(SStreamSnapBlockHdr), kBlockSize, pHandle->offset); nread = taosPReadFile(pHandle->fd, buf + sizeof(SStreamSnapBlockHdr), kBlockSize, pHandle->offset);
pHandle->offset += nread; pHandle->offset += nread;
qDebug("stream-state open file and read file %s, current offset:%" PRId64 ", size:% " PRId64 ", file no.%d", qDebug("%s open file and read file %s, current offset:%" PRId64 ", size:% " PRId64 ", file no.%d",
item->name, (int64_t)pHandle->offset, item->size, pHandle->currFileIdx); STREAM_STATE_TRANSFER, item->name, (int64_t)pHandle->offset, item->size, pHandle->currFileIdx);
} }
SStreamSnapBlockHdr* pHdr = (SStreamSnapBlockHdr*)buf; SStreamSnapBlockHdr* pHdr = (SStreamSnapBlockHdr*)buf;
@ -411,7 +425,7 @@ int32_t streamSnapWrite(SStreamSnapWriter* pWriter, uint8_t* pData, uint32_t nDa
pHandle->fd = streamOpenFile(pFile->path, pItem->name, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_APPEND); pHandle->fd = streamOpenFile(pFile->path, pItem->name, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_APPEND);
if (pHandle->fd == NULL) { if (pHandle->fd == NULL) {
code = TAOS_SYSTEM_ERROR(terrno); code = TAOS_SYSTEM_ERROR(terrno);
qError("stream-state failed to open file name:%s%s%s, reason:%s", pFile->path, TD_DIRSEP, pHdr->name, qError("%s failed to open file name:%s%s%s, reason:%s", STREAM_STATE_TRANSFER, pFile->path, TD_DIRSEP, pHdr->name,
tstrerror(code)); tstrerror(code));
} }
} }
@ -420,7 +434,7 @@ int32_t streamSnapWrite(SStreamSnapWriter* pWriter, uint8_t* pData, uint32_t nDa
int64_t bytes = taosPWriteFile(pHandle->fd, pHdr->data, pHdr->size, pHandle->offset); int64_t bytes = taosPWriteFile(pHandle->fd, pHdr->data, pHdr->size, pHandle->offset);
if (bytes != pHdr->size) { if (bytes != pHdr->size) {
code = TAOS_SYSTEM_ERROR(terrno); code = TAOS_SYSTEM_ERROR(terrno);
qError("stream-state failed to write snap, file name:%s, reason:%s", pHdr->name, tstrerror(code)); qError("%s failed to write snap, file name:%s, reason:%s", STREAM_STATE_TRANSFER, pHdr->name, tstrerror(code));
return code; return code;
} }
pHandle->offset += bytes; pHandle->offset += bytes;
@ -438,7 +452,7 @@ int32_t streamSnapWrite(SStreamSnapWriter* pWriter, uint8_t* pData, uint32_t nDa
pHandle->fd = streamOpenFile(pFile->path, pItem->name, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_APPEND); pHandle->fd = streamOpenFile(pFile->path, pItem->name, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_APPEND);
if (pHandle->fd == NULL) { if (pHandle->fd == NULL) {
code = TAOS_SYSTEM_ERROR(terrno); code = TAOS_SYSTEM_ERROR(terrno);
qError("stream-state failed to open file name:%s%s%s, reason:%s", pFile->path, TD_DIRSEP, pHdr->name, qError("%s failed to open file name:%s%s%s, reason:%s", STREAM_STATE_TRANSFER, pFile->path, TD_DIRSEP, pHdr->name,
tstrerror(code)); tstrerror(code));
} }
@ -462,7 +476,7 @@ int32_t streamSnapWriterClose(SStreamSnapWriter* pWriter, int8_t rollback) {
n += sprintf(buf + n, "%s %" PRId64 "]", item->name, item->size); n += sprintf(buf + n, "%s %" PRId64 "]", item->name, item->size);
} }
} }
qDebug("stream snap get file list, %s", buf); qDebug("%s snap get file list, %s", STREAM_STATE_TRANSFER, buf);
taosMemoryFree(buf); taosMemoryFree(buf);
} }