fix transfer error

This commit is contained in:
yihaoDeng 2023-08-23 21:43:10 +08:00
parent 23252749be
commit 0959758bd5
5 changed files with 90 additions and 9 deletions

View File

@ -137,5 +137,6 @@ int32_t streamStatePutBatchOptimize(SStreamState* pState, int32_t cfIdx, rocksdb
void* val, int32_t vlen, int64_t ttl, void* tmpBuf); void* val, int32_t vlen, int64_t ttl, void* tmpBuf);
int32_t streamStatePutBatch_rocksdb(SStreamState* pState, void* pBatch); int32_t streamStatePutBatch_rocksdb(SStreamState* pState, void* pBatch);
int32_t streamBackendTriggerChkp(void* pMeta, char* dst);
// int32_t streamDefaultIter_rocksdb(SStreamState* pState, const void* start, const void* end, SArray* result); // int32_t streamDefaultIter_rocksdb(SStreamState* pState, const void* start, const void* end, SArray* result);
#endif #endif

View File

@ -738,6 +738,7 @@ int32_t streamBackendLoadCheckpointInfo(void* arg) {
taosMemoryFree(chkpPath); taosMemoryFree(chkpPath);
return 0; return 0;
} }
taosArrayClear(pMeta->chkpSaved);
TdDirPtr pDir = taosOpenDir(chkpPath); TdDirPtr pDir = taosOpenDir(chkpPath);
@ -878,6 +879,49 @@ int32_t chkpPreCheckDir(char* path, int64_t chkpId, char** chkpDir, char** chkpI
return 0; return 0;
} }
int32_t streamBackendTriggerChkp(void* arg, char* dst) {
SStreamMeta* pMeta = arg;
int64_t backendRid = pMeta->streamBackendRid;
int32_t code = -1;
SArray* refs = taosArrayInit(16, sizeof(int64_t));
rocksdb_column_family_handle_t** ppCf = NULL;
int64_t st = taosGetTimestampMs();
SBackendWrapper* pHandle = taosAcquireRef(streamBackendId, backendRid);
if (pHandle == NULL || pHandle->db == NULL) {
goto _ERROR;
}
int32_t nCf = chkpGetAllDbCfHandle(pMeta, &ppCf, refs);
qDebug("stream backend:%p start to do checkpoint at:%s, cf num: %d ", pHandle, dst, nCf);
code = chkpPreFlushDb(pHandle->db, ppCf, nCf);
if (code == 0) {
code = chkpDoDbCheckpoint(pHandle->db, dst);
if (code != 0) {
qError("stream backend:%p failed to do checkpoint at:%s", pHandle, dst);
} else {
qDebug("stream backend:%p end to do checkpoint at:%s, time cost:%" PRId64 "ms", pHandle, dst,
taosGetTimestampMs() - st);
}
} else {
qError("stream backend:%p failed to flush db at:%s", pHandle, dst);
}
// release all ref to cfWrapper;
for (int i = 0; i < taosArrayGetSize(refs); i++) {
int64_t id = *(int64_t*)taosArrayGet(refs, i);
taosReleaseRef(streamBackendCfWrapperId, id);
}
_ERROR:
taosReleaseRef(streamBackendId, backendRid);
taosArrayDestroy(refs);
return code;
}
int32_t streamBackendDoCheckpoint(void* arg, uint64_t checkpointId) { int32_t streamBackendDoCheckpoint(void* arg, uint64_t checkpointId) {
SStreamMeta* pMeta = arg; SStreamMeta* pMeta = arg;
int64_t backendRid = pMeta->streamBackendRid; int64_t backendRid = pMeta->streamBackendRid;
@ -902,7 +946,7 @@ int32_t streamBackendDoCheckpoint(void* arg, uint64_t checkpointId) {
// Get all cf and acquire cfWrappter // Get all cf and acquire cfWrappter
int32_t nCf = chkpGetAllDbCfHandle(pMeta, &ppCf, refs); int32_t nCf = chkpGetAllDbCfHandle(pMeta, &ppCf, refs);
qDebug("stream backend:%p start to do checkpoint at:%s, cf num: %d ", pHandle, pChkpIdDir, 0); qDebug("stream backend:%p start to do checkpoint at:%s, cf num: %d ", pHandle, pChkpIdDir, nCf);
code = chkpPreFlushDb(pHandle->db, ppCf, nCf); code = chkpPreFlushDb(pHandle->db, ppCf, nCf);
if (code == 0) { if (code == 0) {
@ -928,6 +972,8 @@ int32_t streamBackendDoCheckpoint(void* arg, uint64_t checkpointId) {
// delete obsolte checkpoint // delete obsolte checkpoint
delObsoleteCheckpoint(arg, pChkpDir); delObsoleteCheckpoint(arg, pChkpDir);
// pMeta->chkpId = checkpointId;
} }
_ERROR: _ERROR:

View File

@ -175,8 +175,9 @@ int32_t streamMetaReopen(SStreamMeta* pMeta, int64_t chkpId) {
// return -1; // return -1;
} }
} }
pMeta->streamBackendRid = taosAddRef(streamBackendId, pMeta->streamBackend); pMeta->streamBackendRid = taosAddRef(streamBackendId, pMeta->streamBackend);
streamBackendLoadCheckpointInfo(pMeta);
return 0; return 0;
} }

View File

@ -16,6 +16,7 @@
#include "streamSnapshot.h" #include "streamSnapshot.h"
#include "query.h" #include "query.h"
#include "rocksdb/c.h" #include "rocksdb/c.h"
#include "streamBackendRocksdb.h"
#include "tcommon.h" #include "tcommon.h"
enum SBackendFileType { enum SBackendFileType {
@ -79,7 +80,7 @@ const char* ROCKSDB_CURRENT = "CURRENT";
const char* ROCKSDB_CHECKPOINT_META = "CHECKPOINT"; const char* ROCKSDB_CHECKPOINT_META = "CHECKPOINT";
static int64_t kBlockSize = 64 * 1024; static int64_t kBlockSize = 64 * 1024;
int32_t streamSnapHandleInit(SStreamSnapHandle* handle, char* path, int64_t chkpId); int32_t streamSnapHandleInit(SStreamSnapHandle* handle, char* path, int64_t chkpId, void* pMeta);
void streamSnapHandleDestroy(SStreamSnapHandle* handle); void streamSnapHandleDestroy(SStreamSnapHandle* handle);
// static void streamBuildFname(char* path, char* file, char* fullname) // static void streamBuildFname(char* path, char* file, char* fullname)
@ -107,19 +108,33 @@ TdFilePtr streamOpenFile(char* path, char* name, int32_t opt) {
return taosOpenFile(fullname, opt); return taosOpenFile(fullname, opt);
} }
int32_t streamSnapHandleInit(SStreamSnapHandle* pHandle, char* path, int64_t chkpId) { int32_t streamSnapHandleInit(SStreamSnapHandle* pHandle, char* path, int64_t chkpId, void* pMeta) {
// impl later // impl later
int len = strlen(path); int len = strlen(path);
char* tdir = taosMemoryCalloc(1, len + 128); char* tdir = taosMemoryCalloc(1, len + 128);
memcpy(tdir, path, len); memcpy(tdir, path, len);
int32_t code = 0;
if (chkpId != 0) { if (chkpId != 0) {
sprintf(tdir, "%s%s%s%s%s%scheckpoint%" PRId64 "", path, TD_DIRSEP, "stream", TD_DIRSEP, "checkpoints", TD_DIRSEP, sprintf(tdir, "%s%s%s%s%s%scheckpoint%" PRId64 "", path, TD_DIRSEP, "stream", TD_DIRSEP, "checkpoints", TD_DIRSEP,
chkpId); chkpId);
} else { } else {
sprintf(tdir, "%s%s%s%s%s", path, TD_DIRSEP, "stream", TD_DIRSEP, "state"); sprintf(tdir, "%s%s%s%s%s", path, TD_DIRSEP, "stream", TD_DIRSEP, "state");
char* chkpdir = taosMemoryCalloc(1, len + 256);
sprintf(chkpdir, "%s%s%s", tdir, TD_DIRSEP, "tmp");
taosMemoryFree(tdir);
tdir = chkpdir;
code = streamBackendTriggerChkp(pMeta, tdir);
if (code != 0) {
qError("failed to trigger chekckpoint at %s", tdir);
taosMemoryFree(tdir);
return code;
}
} }
int32_t code = 0; qInfo("start to read dir: %s", tdir);
TdDirPtr pDir = taosOpenDir(tdir); TdDirPtr pDir = taosOpenDir(tdir);
if (NULL == pDir) { if (NULL == pDir) {
@ -156,11 +171,25 @@ int32_t streamSnapHandleInit(SStreamSnapHandle* pHandle, char* path, int64_t chk
continue; continue;
} }
if (strlen(name) >= strlen(ROCKSDB_SST) && if (strlen(name) >= strlen(ROCKSDB_SST) &&
0 == strncmp(name - strlen(ROCKSDB_SST), ROCKSDB_SST, strlen(ROCKSDB_SST))) { 0 == strncmp(name + strlen(name) - strlen(ROCKSDB_SST), ROCKSDB_SST, strlen(ROCKSDB_SST))) {
char* sst = taosStrdup(name); char* sst = taosStrdup(name);
taosArrayPush(pFile->pSst, &sst); taosArrayPush(pFile->pSst, &sst);
} }
} }
{
char* buf = taosMemoryCalloc(1, 512);
sprintf(buf, "current: %s", pFile->pCurrent);
sprintf(buf + strlen(buf), "MANIFEST: %s", pFile->pMainfest);
sprintf(buf + strlen(buf), "options: %s", pFile->pOptions);
for (int i = 0; i < taosArrayGetSize(pFile->pSst); i++) {
char* name = taosArrayGetP(pFile->pSst, i);
sprintf(buf + strlen(buf), "sst: %s", name);
}
qInfo("get file list: %s", buf);
taosMemoryFree(buf);
}
taosCloseDir(&pDir); taosCloseDir(&pDir);
if (pFile->pCurrent == NULL) { if (pFile->pCurrent == NULL) {
@ -221,6 +250,12 @@ _err:
void streamSnapHandleDestroy(SStreamSnapHandle* handle) { void streamSnapHandleDestroy(SStreamSnapHandle* handle) {
SBanckendFile* pFile = handle->pBackendFile; SBanckendFile* pFile = handle->pBackendFile;
if (handle->checkpointId == 0) {
if (taosIsDir(pFile->path)) {
taosRemoveDir(pFile->path);
}
}
if (pFile) { if (pFile) {
taosMemoryFree(pFile->pCheckpointMeta); taosMemoryFree(pFile->pCheckpointMeta);
taosMemoryFree(pFile->pCurrent); taosMemoryFree(pFile->pCurrent);
@ -234,7 +269,6 @@ void streamSnapHandleDestroy(SStreamSnapHandle* handle) {
taosArrayDestroy(pFile->pSst); taosArrayDestroy(pFile->pSst);
taosMemoryFree(pFile); taosMemoryFree(pFile);
} }
taosArrayDestroy(handle->pFileList); taosArrayDestroy(handle->pFileList);
taosCloseFile(&handle->fd); taosCloseFile(&handle->fd);
return; return;
@ -247,7 +281,7 @@ int32_t streamSnapReaderOpen(void* pMeta, int64_t sver, int64_t chkpId, char* pa
return TSDB_CODE_OUT_OF_MEMORY; return TSDB_CODE_OUT_OF_MEMORY;
} }
if (streamSnapHandleInit(&pReader->handle, (char*)path, chkpId) < 0) { if (streamSnapHandleInit(&pReader->handle, (char*)path, chkpId, pMeta) < 0) {
taosMemoryFree(pReader); taosMemoryFree(pReader);
return -1; return -1;
} }

View File

@ -262,7 +262,6 @@ bool transAsyncPoolIsEmpty(SAsyncPool* pool);
#define ASYNC_CHECK_HANDLE(exh1, id) \ #define ASYNC_CHECK_HANDLE(exh1, id) \
do { \ do { \
if (id > 0) { \ if (id > 0) { \
tTrace("handle step1"); \
SExHandle* exh2 = transAcquireExHandle(transGetRefMgt(), id); \ SExHandle* exh2 = transAcquireExHandle(transGetRefMgt(), id); \
if (exh2 == NULL || id != exh2->refId) { \ if (exh2 == NULL || id != exh2->refId) { \
tTrace("handle %p except, may already freed, ignore msg, ref1:%" PRIu64 ", ref2:%" PRIu64, exh1, \ tTrace("handle %p except, may already freed, ignore msg, ref1:%" PRIu64 ", ref2:%" PRIu64, exh1, \