Merge branch 'enh/chkpTransfer' into enh/triggerCheckPoint2

This commit is contained in:
yihaoDeng 2023-08-16 09:00:30 +00:00
commit ab7e2b3122
12 changed files with 644 additions and 244 deletions

View File

@ -14,13 +14,13 @@
*/ */
#include "os.h" #include "os.h"
#include "ttimer.h"
#include "streamState.h" #include "streamState.h"
#include "tdatablock.h" #include "tdatablock.h"
#include "tdbInt.h" #include "tdbInt.h"
#include "tmsg.h" #include "tmsg.h"
#include "tmsgcb.h" #include "tmsgcb.h"
#include "tqueue.h" #include "tqueue.h"
#include "ttimer.h"
#ifdef __cplusplus #ifdef __cplusplus
extern "C" { extern "C" {
@ -389,10 +389,12 @@ typedef struct SStreamMeta {
SMgmtInfo mgmtInfo; SMgmtInfo mgmtInfo;
int32_t chkptNotReadyTasks; int32_t chkptNotReadyTasks;
SArray* checkpointSaved;
SArray* checkpointInUse; int64_t chkpId;
int32_t checkpointCap; SArray* chkpSaved;
SRWLatch checkpointDirLock; SArray* chkpInUse;
int32_t chkpCap;
SRWLatch chkpDirLock;
} SStreamMeta; } SStreamMeta;
int32_t tEncodeStreamEpInfo(SEncoder* pEncoder, const SStreamChildEpInfo* pInfo); int32_t tEncodeStreamEpInfo(SEncoder* pEncoder, const SStreamChildEpInfo* pInfo);
@ -694,6 +696,9 @@ int32_t streamMetaGetNumOfTasks(SStreamMeta* pMeta); // todo remove it
SStreamTask* streamMetaAcquireTask(SStreamMeta* pMeta, int64_t streamId, int32_t taskId); SStreamTask* streamMetaAcquireTask(SStreamMeta* pMeta, int64_t streamId, int32_t taskId);
void streamMetaReleaseTask(SStreamMeta* pMeta, SStreamTask* pTask); void streamMetaReleaseTask(SStreamMeta* pMeta, SStreamTask* pTask);
// int32_t streamStateRebuild(SStreamMeta* pMeta, char* path, int64_t chkpId);
int32_t streamMetaReopen(SStreamMeta* pMeta, int64_t chkpId);
int32_t streamMetaBegin(SStreamMeta* pMeta); int32_t streamMetaBegin(SStreamMeta* pMeta);
int32_t streamMetaCommit(SStreamMeta* pMeta); int32_t streamMetaCommit(SStreamMeta* pMeta);
int32_t streamLoadTasks(SStreamMeta* pMeta); int32_t streamLoadTasks(SStreamMeta* pMeta);

View File

@ -200,8 +200,11 @@ void taosArrayClear(SArray* pArray);
* @param pArray * @param pArray
* @param fp * @param fp
*/ */
void taosArrayClearEx(SArray* pArray, void (*fp)(void*)); void taosArrayClearEx(SArray* pArray, void (*fp)(void*));
void taosArrayClearP(SArray* pArray, void (*fp)(void*));
void* taosArrayDestroy(SArray* pArray); void* taosArrayDestroy(SArray* pArray);
void taosArrayDestroyP(SArray* pArray, FDelete fp); void taosArrayDestroyP(SArray* pArray, FDelete fp);

View File

@ -48,6 +48,7 @@ typedef struct SStreamVnodeRevertIndex {
static int32_t mndNodeCheckSentinel = 0; static int32_t mndNodeCheckSentinel = 0;
static SStreamVnodeRevertIndex execNodeList; static SStreamVnodeRevertIndex execNodeList;
#define MND_STREAM_CHECKPOINT_NAME "stream-checkpoint"
static int32_t mndStreamActionInsert(SSdb *pSdb, SStreamObj *pStream); static int32_t mndStreamActionInsert(SSdb *pSdb, SStreamObj *pStream);
static int32_t mndStreamActionDelete(SSdb *pSdb, SStreamObj *pStream); static int32_t mndStreamActionDelete(SSdb *pSdb, SStreamObj *pStream);
static int32_t mndStreamActionUpdate(SSdb *pSdb, SStreamObj *pOldStream, SStreamObj *pNewStream); static int32_t mndStreamActionUpdate(SSdb *pSdb, SStreamObj *pOldStream, SStreamObj *pNewStream);
@ -1060,7 +1061,6 @@ static int32_t mndAddStreamCheckpointToTrans(STrans *pTrans, SStreamObj *pStream
} }
} }
pStream->checkpointFreq = checkpointId;
pStream->checkpointId = checkpointId; pStream->checkpointId = checkpointId;
pStream->checkpointFreq = taosGetTimestampMs(); pStream->checkpointFreq = taosGetTimestampMs();
atomic_store_64(&pStream->currentTick, 0); atomic_store_64(&pStream->currentTick, 0);
@ -1097,7 +1097,7 @@ static int32_t mndProcessStreamDoCheckpoint(SRpcMsg *pReq) {
SMStreamDoCheckpointMsg *pMsg = (SMStreamDoCheckpointMsg *)pReq->pCont; SMStreamDoCheckpointMsg *pMsg = (SMStreamDoCheckpointMsg *)pReq->pCont;
int64_t checkpointId = pMsg->checkpointId; int64_t checkpointId = pMsg->checkpointId;
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_DB_INSIDE, NULL, "stream-checkpoint"); STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_DB_INSIDE, NULL, MND_STREAM_CHECKPOINT_NAME);
if (pTrans == NULL) { if (pTrans == NULL) {
mError("failed to trigger checkpoint, reason: %s", tstrerror(TSDB_CODE_OUT_OF_MEMORY)); mError("failed to trigger checkpoint, reason: %s", tstrerror(TSDB_CODE_OUT_OF_MEMORY));
return -1; return -1;

View File

@ -331,6 +331,10 @@ int32_t streamStateSnapRead(SStreamStateReader* pReader, uint8_t** ppData);
int32_t streamStateSnapWriterOpen(STQ* pTq, int64_t sver, int64_t ever, SStreamStateWriter** ppWriter); int32_t streamStateSnapWriterOpen(STQ* pTq, int64_t sver, int64_t ever, SStreamStateWriter** ppWriter);
int32_t streamStateSnapWriterClose(SStreamStateWriter* pWriter, int8_t rollback); int32_t streamStateSnapWriterClose(SStreamStateWriter* pWriter, int8_t rollback);
int32_t streamStateSnapWrite(SStreamStateWriter* pWriter, uint8_t* pData, uint32_t nData); int32_t streamStateSnapWrite(SStreamStateWriter* pWriter, uint8_t* pData, uint32_t nData);
int32_t streamStateRebuildFromSnap(SStreamStateWriter* pWriter, int64_t chkpId);
int32_t streamStateLoadTasks(SStreamStateWriter* pWriter);
// SStreamTaskReader ====================================== // SStreamTaskReader ======================================
// SStreamStateWriter ===================================== // SStreamStateWriter =====================================
// SStreamStateReader ===================================== // SStreamStateReader =====================================

View File

@ -41,13 +41,17 @@ int32_t streamStateSnapReaderOpen(STQ* pTq, int64_t sver, int64_t ever, SStreamS
code = TSDB_CODE_OUT_OF_MEMORY; code = TSDB_CODE_OUT_OF_MEMORY;
goto _err; goto _err;
} }
SStreamMeta* meta = pTq->pStreamMeta;
pReader->pTq = pTq; pReader->pTq = pTq;
pReader->sver = sver; pReader->sver = sver;
pReader->ever = ever; pReader->ever = ever;
int64_t chkpId = meta ? meta->chkpId : 0;
SStreamSnapReader* pSnapReader = NULL; SStreamSnapReader* pSnapReader = NULL;
sprintf(tdir, "%s%s%s%s%s", pTq->path, TD_DIRSEP, VNODE_TQ_STREAM, TD_DIRSEP, "checkpoints");
if (streamSnapReaderOpen(pTq, sver, ever, tdir, &pSnapReader) == 0) { if (streamSnapReaderOpen(pTq, sver, chkpId, pTq->path, &pSnapReader) == 0) {
pReader->complete = 1; pReader->complete = 1;
} else { } else {
code = -1; code = -1;
@ -131,14 +135,18 @@ int32_t streamStateSnapWriterOpen(STQ* pTq, int64_t sver, int64_t ever, SStreamS
pWriter->sver = sver; pWriter->sver = sver;
pWriter->ever = ever; pWriter->ever = ever;
sprintf(tdir, "%s%s%s", pTq->path, TD_DIRSEP, VNODE_TQ_STREAM); sprintf(tdir, "%s%s%s%s%s", pTq->path, TD_DIRSEP, VNODE_TQ_STREAM, TD_DIRSEP, "received");
taosMkDir(tdir);
SStreamSnapWriter* pSnapWriter = NULL; SStreamSnapWriter* pSnapWriter = NULL;
if (streamSnapWriterOpen(pTq, sver, ever, tdir, &pSnapWriter) < 0) { if (streamSnapWriterOpen(pTq, sver, ever, tdir, &pSnapWriter) < 0) {
goto _err; goto _err;
} }
tqDebug("vgId:%d, vnode stream-state snapshot writer opened", TD_VID(pTq->pVnode)); tqDebug("vgId:%d, vnode stream-state snapshot writer opened, path:%s", TD_VID(pTq->pVnode), tdir);
pWriter->pWriterImpl = pSnapWriter; pWriter->pWriterImpl = pSnapWriter;
*ppWriter = pWriter;
return code; return code;
_err: _err:
tqError("vgId:%d, vnode stream-state snapshot writer failed to open since %s", TD_VID(pTq->pVnode), tstrerror(code)); tqError("vgId:%d, vnode stream-state snapshot writer failed to open since %s", TD_VID(pTq->pVnode), tstrerror(code));
@ -154,10 +162,17 @@ int32_t streamStateSnapWriterClose(SStreamStateWriter* pWriter, int8_t rollback)
taosMemoryFree(pWriter); taosMemoryFree(pWriter);
return code; return code;
} }
int32_t streamStateRebuildFromSnap(SStreamStateWriter* pWriter, int64_t chkpId) {
int32_t streamStateSnapWrite(SStreamStateWriter* pWriter, uint8_t* pData, uint32_t nData) { int32_t code = streamMetaReopen(pWriter->pTq->pStreamMeta, chkpId);
int32_t code = 0; if (code == 0) {
tqDebug("vgId:%d, vnode stream-state snapshot write", TD_VID(pWriter->pTq->pVnode)); code = streamStateLoadTasks(pWriter);
code = streamSnapWrite(pWriter->pWriterImpl, pData + sizeof(SSnapDataHdr), nData - sizeof(SSnapDataHdr)); }
return code; return code;
} }
int32_t streamStateLoadTasks(SStreamStateWriter* pWriter) { return streamLoadTasks(pWriter->pTq->pStreamMeta); }
int32_t streamStateSnapWrite(SStreamStateWriter* pWriter, uint8_t* pData, uint32_t nData) {
tqDebug("vgId:%d, vnode stream-state snapshot write", TD_VID(pWriter->pTq->pVnode));
return streamSnapWrite(pWriter->pWriterImpl, pData + sizeof(SSnapDataHdr), nData - sizeof(SSnapDataHdr));
}

View File

@ -248,31 +248,30 @@ int32_t vnodeSnapRead(SVSnapReader *pReader, uint8_t **ppData, uint32_t *nData)
} }
} }
} }
// if (!pReader->streamStateDone) { if (!pReader->streamStateDone) {
// if (pReader->pStreamStateReader == NULL) { if (pReader->pStreamStateReader == NULL) {
// code = code =
// streamStateSnapReaderOpen(pReader->pVnode->pTq, pReader->sver, pReader->sver, streamStateSnapReaderOpen(pReader->pVnode->pTq, pReader->sver, pReader->sver, &pReader->pStreamStateReader);
// &pReader->pStreamStateReader); if (code) {
// if (code) { pReader->streamStateDone = 1;
// pReader->streamStateDone = 1; pReader->pStreamStateReader = NULL;
// pReader->pStreamStateReader = NULL; goto _err;
// goto _err; }
// } }
// } code = streamStateSnapRead(pReader->pStreamStateReader, ppData);
// code = streamStateSnapRead(pReader->pStreamStateReader, ppData); if (code) {
// if (code) { goto _err;
// goto _err; } else {
// } else { if (*ppData) {
// if (*ppData) { goto _exit;
// goto _exit; } else {
// } else { pReader->streamStateDone = 1;
// pReader->streamStateDone = 1; code = streamStateSnapReaderClose(pReader->pStreamStateReader);
// code = streamStateSnapReaderClose(pReader->pStreamStateReader); if (code) goto _err;
// if (code) goto _err; pReader->pStreamStateReader = NULL;
// pReader->pStreamStateReader = NULL; }
// } }
// } }
// }
// RSMA ============== // RSMA ==============
if (VND_IS_RSMA(pReader->pVnode) && !pReader->rsmaDone) { if (VND_IS_RSMA(pReader->pVnode) && !pReader->rsmaDone) {
@ -419,6 +418,9 @@ int32_t vnodeSnapWriterClose(SVSnapWriter *pWriter, int8_t rollback, SSnapshot *
if (pWriter->pStreamStateWriter) { if (pWriter->pStreamStateWriter) {
code = streamStateSnapWriterClose(pWriter->pStreamStateWriter, rollback); code = streamStateSnapWriterClose(pWriter->pStreamStateWriter, rollback);
if (code) goto _exit; if (code) goto _exit;
code = streamStateRebuildFromSnap(pWriter->pStreamStateWriter, 0);
if (code) goto _exit;
} }
if (pWriter->pRsmaSnapWriter) { if (pWriter->pRsmaSnapWriter) {
@ -527,7 +529,7 @@ int32_t vnodeSnapWrite(SVSnapWriter *pWriter, uint8_t *pData, uint32_t nData) {
code = streamTaskSnapWrite(pWriter->pStreamTaskWriter, pData, nData); code = streamTaskSnapWrite(pWriter->pStreamTaskWriter, pData, nData);
if (code) goto _err; if (code) goto _err;
} break; } break;
case SNAP_DATA_STREAM_STATE: { case SNAP_DATA_STREAM_STATE_BACKEND: {
if (pWriter->pStreamStateWriter == NULL) { if (pWriter->pStreamStateWriter == NULL) {
code = streamStateSnapWriterOpen(pVnode->pTq, pWriter->sver, pWriter->ever, &pWriter->pStreamStateWriter); code = streamStateSnapWriterOpen(pVnode->pTq, pWriter->sver, pWriter->ever, &pWriter->pStreamStateWriter);
if (code) goto _err; if (code) goto _err;

View File

@ -20,6 +20,27 @@
#include "tcommon.h" #include "tcommon.h"
#include "tref.h" #include "tref.h"
typedef struct {
int8_t init;
char* pCurrent;
char* pManifest;
SArray* pSST;
int64_t preCkptId;
int64_t curChkpId;
char* path;
char* buf;
int32_t len;
// ping-pong buf
SHashObj* pSstTbl[2];
int8_t idx;
SArray* pAdd;
SArray* pDel;
int8_t update;
} SBackendManager;
typedef struct SCompactFilteFactory { typedef struct SCompactFilteFactory {
void* status; void* status;
} SCompactFilteFactory; } SCompactFilteFactory;
@ -127,6 +148,218 @@ void destroyFunc(void* arg);
int32_t encodeValueFunc(void* value, int32_t vlen, int64_t ttl, char** dest); int32_t encodeValueFunc(void* value, int32_t vlen, int64_t ttl, char** dest);
int32_t decodeValueFunc(void* value, int32_t vlen, int64_t* ttl, char** dest); int32_t decodeValueFunc(void* value, int32_t vlen, int64_t* ttl, char** dest);
SBackendManager* bkdMgtCreate(char* path) {
SBackendManager* p = taosMemoryCalloc(1, sizeof(SBackendManager));
p->curChkpId = 0;
p->preCkptId = 0;
p->pSST = taosArrayInit(64, sizeof(void*));
p->path = taosStrdup(path);
p->len = strlen(path) + 128;
p->buf = taosMemoryCalloc(1, p->len);
p->idx = 0;
p->pSstTbl[0] = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK);
p->pSstTbl[1] = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK);
p->pAdd = taosArrayInit(64, sizeof(void*));
p->pDel = taosArrayInit(64, sizeof(void*));
p->update = 0;
return p;
}
void bkdMgtDestroy(SBackendManager* bm) {
if (bm == NULL) return;
taosMemoryFree(bm->buf);
taosMemoryFree(bm->path);
taosArrayDestroyP(bm->pSST, taosMemoryFree);
taosArrayDestroyP(bm->pAdd, taosMemoryFree);
taosArrayDestroyP(bm->pDel, taosMemoryFree);
taosHashCleanup(bm->pSstTbl[0]);
taosHashCleanup(bm->pSstTbl[1]);
taosMemoryFree(bm);
}
int32_t compareHashTableImpl(SHashObj* p1, SHashObj* p2, SArray* diff) {
int32_t code = 0;
size_t len = 0;
void* pIter = taosHashIterate(p2, NULL);
while (pIter) {
char* name = taosHashGetKey(pIter, &len);
if (!taosHashGet(p1, name, len)) {
char* p = taosStrdup(name);
taosArrayPush(diff, &p);
}
pIter = taosHashIterate(p2, pIter);
}
return code;
}
int32_t compareHashTable(SHashObj* p1, SHashObj* p2, SArray* add, SArray* del) {
int32_t code = 0;
code = compareHashTableImpl(p1, p2, add);
code = compareHashTableImpl(p2, p1, del);
return code;
}
int32_t bkdMgtGetDelta(SBackendManager* bm, int64_t chkpId, SArray* list) {
const char* pCurrent = "CURRENT";
int32_t currLen = strlen(pCurrent);
const char* pManifest = "MANIFEST-";
int32_t maniLen = strlen(pManifest);
const char* pSST = ".sst";
int32_t sstLen = strlen(pSST);
memset(bm->buf, 0, bm->len);
sprintf(bm->buf, "%s%scheckpoint%" PRId64 "", bm->path, TD_DIRSEP, chkpId);
taosArrayClearP(bm->pAdd, taosMemoryFree);
taosArrayClearP(bm->pDel, taosMemoryFree);
TdDirPtr pDir = taosOpenDir(bm->buf);
TdDirEntryPtr de = NULL;
int8_t dummy = 0;
while ((de = taosReadDir(pDir)) != NULL) {
char* name = taosGetDirEntryName(de);
if (strcmp(name, ".") == 0 || strcmp(name, "..") == 0) continue;
if (strlen(name) == currLen && strcmp(name, pCurrent) == 0) {
taosMemoryFreeClear(bm->pCurrent);
bm->pCurrent = taosStrdup(name);
taosHashPut(bm->pSstTbl[1 - bm->idx], name, strlen(name), &dummy, sizeof(dummy));
continue;
}
if (strlen(name) >= maniLen && strncmp(name, pManifest, maniLen) == 0) {
taosMemoryFreeClear(bm->pManifest);
bm->pManifest = taosStrdup(name);
taosHashPut(bm->pSstTbl[1 - bm->idx], name, strlen(name), &dummy, sizeof(dummy));
continue;
}
if (strlen(name) >= sstLen && strncmp(name + strlen(name) - 4, pSST, sstLen) == 0) {
char* p = taosStrdup(name);
taosHashPut(bm->pSstTbl[1 - bm->idx], name, strlen(name), &dummy, sizeof(dummy));
continue;
}
}
if (bm->init == 0) {
bm->preCkptId = -1;
bm->curChkpId = chkpId;
bm->init = 1;
void* pIter = taosHashIterate(bm->pSstTbl[1 - bm->idx], NULL);
while (pIter) {
size_t len;
char* name = taosHashGetKey(pIter, &len);
if (name != NULL && len != 0) {
taosArrayPush(bm->pAdd, &name);
}
pIter = taosHashIterate(bm->pSstTbl[1 - bm->idx], pIter);
}
if (taosArrayGetSize(bm->pAdd) > 0) bm->update = 1;
} else {
int32_t code = compareHashTable(bm->pSstTbl[bm->idx], bm->pSstTbl[1 - bm->idx], bm->pAdd, bm->pDel);
if (code != 0) {
// dead code
taosArrayClearP(bm->pAdd, taosMemoryFree);
taosArrayClearP(bm->pDel, taosMemoryFree);
taosHashClear(bm->pSstTbl[1 - bm->idx]);
bm->update = 0;
return code;
}
bm->preCkptId = bm->curChkpId;
bm->curChkpId = chkpId;
if (taosArrayGetSize(bm->pAdd) == 0 && taosArrayGetSize(bm->pDel) == 0) {
bm->update = 0;
}
}
taosHashClear(bm->pSstTbl[bm->idx]);
bm->idx = 1 - bm->idx;
return 0;
}
int32_t bkdMgtDumpTo(SBackendManager* bm, char* dname) {
int32_t code = 0;
int32_t len = bm->len + 128;
char* dstBuf = taosMemoryCalloc(1, len);
char* srcBuf = taosMemoryCalloc(1, len);
char* srcDir = taosMemoryCalloc(1, len);
char* dstDir = taosMemoryCalloc(1, len);
sprintf(srcDir, "%s%s%s%" PRId64 "", bm->path, TD_DIRSEP, "checkpoint", bm->curChkpId);
sprintf(dstDir, "%s%s%s", bm->path, TD_DIRSEP, dname);
if (!taosDirExist(srcDir)) {
return 0;
}
code = taosMkDir(dstDir);
if (code != 0) {
return code;
}
// clear current file
memset(dstBuf, 0, len);
sprintf(dstBuf, "%s%s%s", dstDir, TD_DIRSEP, bm->pCurrent);
taosRemoveFile(dstBuf);
memset(dstBuf, 0, len);
sprintf(dstBuf, "%s%s%s", dstDir, TD_DIRSEP, bm->pManifest);
taosRemoveFile(dstBuf);
// add file to $name dir
for (int i = 0; i < taosArrayGetSize(bm->pAdd); i++) {
memset(dstBuf, 0, len);
memset(srcBuf, 0, len);
char* filename = taosArrayGetP(bm->pAdd, i);
sprintf(srcBuf, "%s%s%s", srcDir, TD_DIRSEP, filename);
sprintf(dstBuf, "%s%s%s", dstDir, TD_DIRSEP, filename);
taosCopyFile(srcBuf, dstBuf);
}
// del file in $name
for (int i = 0; i < taosArrayGetSize(bm->pDel); i++) {
memset(dstBuf, 0, len);
memset(srcBuf, 0, len);
char* filename = taosArrayGetP(bm->pDel, i);
sprintf(dstBuf, "%s%s%s", dstDir, TD_DIRSEP, filename);
taosRemoveFile(dstBuf);
}
// copy current file to dst dir
memset(srcBuf, 0, len);
memset(dstBuf, 0, len);
sprintf(srcBuf, "%s%s%s", srcDir, TD_DIRSEP, bm->pCurrent);
sprintf(dstBuf, "%s%s%s", dstDir, TD_DIRSEP, bm->pCurrent);
taosCopyFile(srcBuf, dstBuf);
// copy manifest file to dst dir
memset(srcBuf, 0, len);
memset(dstBuf, 0, len);
sprintf(srcBuf, "%s%s%s", srcDir, TD_DIRSEP, bm->pManifest);
sprintf(dstBuf, "%s%s%s", dstDir, TD_DIRSEP, bm->pManifest);
taosCopyFile(srcBuf, dstBuf);
// clear delta data buf
taosArrayClearP(bm->pAdd, taosMemoryFree);
taosArrayClearP(bm->pDel, taosMemoryFree);
taosMemoryFree(srcBuf);
taosMemoryFree(dstBuf);
taosMemoryFree(srcDir);
taosMemoryFree(dstDir);
return code;
}
SCfInit ginitDict[] = { SCfInit ginitDict[] = {
{"default", 7, 0, defaultKeyComp, defaultKeyEncode, defaultKeyDecode, defaultKeyToString, compareDefaultName, {"default", 7, 0, defaultKeyComp, defaultKeyEncode, defaultKeyDecode, defaultKeyToString, compareDefaultName,
destroyFunc, encodeValueFunc, decodeValueFunc}, destroyFunc, encodeValueFunc, decodeValueFunc},
@ -151,34 +384,33 @@ int32_t copyFiles(const char* src, const char* dst) {
// opt later, just hard link // opt later, just hard link
int32_t sLen = strlen(src); int32_t sLen = strlen(src);
int32_t dLen = strlen(dst); int32_t dLen = strlen(dst);
char* absSrcPath = taosMemoryCalloc(1, sLen + 64); char* srcName = taosMemoryCalloc(1, sLen + 64);
char* absDstPath = taosMemoryCalloc(1, dLen + 64); char* dstName = taosMemoryCalloc(1, dLen + 64);
TdDirPtr pDir = taosOpenDir(src); TdDirPtr pDir = taosOpenDir(src);
if (pDir == NULL) return 0; if (pDir == NULL) return 0;
TdDirEntryPtr de = NULL; TdDirEntryPtr de = NULL;
while ((de = taosReadDir(pDir)) != NULL) { while ((de = taosReadDir(pDir)) != NULL) {
char* name = taosGetDirEntryName(de); char* name = taosGetDirEntryName(de);
if (strcmp(name, ".") == 0 || strcmp(name, "..") == 0) continue; if (strcmp(name, ".") == 0 || strcmp(name, "..") == 0) continue;
sprintf(absSrcPath, "%s/%s", src, name); sprintf(srcName, "%s%s%s", src, TD_DIRSEP, name);
sprintf(absDstPath, "%s/%s", dst, name); sprintf(dstName, "%s%s%s", dst, TD_DIRSEP, name);
if (!taosDirEntryIsDir(de)) { if (!taosDirEntryIsDir(de)) {
code = taosCopyFile(absSrcPath, absDstPath); code = taosCopyFile(srcName, dstName);
if (code == -1) { if (code == -1) {
goto _err; goto _err;
} }
} }
memset(absSrcPath, 0, sLen + 64); memset(srcName, 0, sLen + 64);
memset(absDstPath, 0, dLen + 64); memset(dstName, 0, dLen + 64);
} }
_err: _err:
taosMemoryFreeClear(absSrcPath); taosMemoryFreeClear(srcName);
taosMemoryFreeClear(absDstPath); taosMemoryFreeClear(dstName);
taosCloseDir(&pDir); taosCloseDir(&pDir);
return code >= 0 ? 0 : -1; return code >= 0 ? 0 : -1;
} }
@ -186,12 +418,16 @@ int32_t rebuildDirFromCheckpoint(const char* path, int64_t chkpId, char** dst) {
// impl later // impl later
int32_t code = 0; int32_t code = 0;
// chkpId = 0; /*param@1: checkpointId dir
param@2: state
copy checkpointdir's file to state dir
opt to set hard link to previous file
*/
char* state = taosMemoryCalloc(1, strlen(path) + 32); char* state = taosMemoryCalloc(1, strlen(path) + 32);
sprintf(state, "%s/%s", path, "state"); sprintf(state, "%s%s%s", path, TD_DIRSEP, "state");
if (chkpId != 0) { if (chkpId != 0) {
char* chkp = taosMemoryCalloc(1, strlen(path) + 64); char* chkp = taosMemoryCalloc(1, strlen(path) + 64);
sprintf(chkp, "%s/%s/checkpoint%" PRId64 "", path, "checkpoints", chkpId); sprintf(chkp, "%s%s%s%scheckpoint%" PRId64 "", path, TD_DIRSEP, "checkpoints", TD_DIRSEP, chkpId);
if (taosIsDir(chkp) && isValidCheckpoint(chkp)) { if (taosIsDir(chkp) && isValidCheckpoint(chkp)) {
if (taosIsDir(state)) { if (taosIsDir(state)) {
// remove dir if exists // remove dir if exists
@ -216,6 +452,7 @@ int32_t rebuildDirFromCheckpoint(const char* path, int64_t chkpId, char** dst) {
return 0; return 0;
} }
void* streamBackendInit(const char* streamPath, int64_t chkpId) { void* streamBackendInit(const char* streamPath, int64_t chkpId) {
char* backendPath = NULL; char* backendPath = NULL;
int32_t code = rebuildDirFromCheckpoint(streamPath, chkpId, &backendPath); int32_t code = rebuildDirFromCheckpoint(streamPath, chkpId, &backendPath);
@ -295,12 +532,14 @@ _EXIT:
} }
void streamBackendCleanup(void* arg) { void streamBackendCleanup(void* arg) {
SBackendWrapper* pHandle = (SBackendWrapper*)arg; SBackendWrapper* pHandle = (SBackendWrapper*)arg;
void* pIter = taosHashIterate(pHandle->cfInst, NULL); void* pIter = taosHashIterate(pHandle->cfInst, NULL);
while (pIter != NULL) { while (pIter != NULL) {
RocksdbCfInst* inst = *(RocksdbCfInst**)pIter; RocksdbCfInst* inst = *(RocksdbCfInst**)pIter;
destroyRocksdbCfInst(inst); destroyRocksdbCfInst(inst);
pIter = taosHashIterate(pHandle->cfInst, pIter); pIter = taosHashIterate(pHandle->cfInst, pIter);
} }
taosHashCleanup(pHandle->cfInst); taosHashCleanup(pHandle->cfInst);
if (pHandle->db) { if (pHandle->db) {
@ -399,74 +638,75 @@ void streamBackendHandleCleanup(void* arg) {
int32_t getLatestCheckpoint(void* arg, int64_t* checkpoint) { int32_t getLatestCheckpoint(void* arg, int64_t* checkpoint) {
SStreamMeta* pMeta = arg; SStreamMeta* pMeta = arg;
taosWLockLatch(&pMeta->checkpointDirLock); taosWLockLatch(&pMeta->chkpDirLock);
int64_t tc = 0; int64_t tc = 0;
int32_t sz = taosArrayGetSize(pMeta->checkpointSaved); int32_t sz = taosArrayGetSize(pMeta->chkpSaved);
if (sz <= 0) { if (sz <= 0) {
taosWUnLockLatch(&pMeta->chkpDirLock);
return -1; return -1;
} else { } else {
tc = *(int64_t*)taosArrayGetLast(pMeta->checkpointSaved); tc = *(int64_t*)taosArrayGetLast(pMeta->chkpSaved);
} }
taosArrayPush(pMeta->checkpointInUse, &tc); taosArrayPush(pMeta->chkpInUse, &tc);
*checkpoint = tc; *checkpoint = tc;
taosWUnLockLatch(&pMeta->checkpointDirLock); taosWUnLockLatch(&pMeta->chkpDirLock);
return 0; return 0;
} }
/* /*
* checkpointSave |--cp1--|--cp2--|--cp3--|--cp4--|--cp5--| * checkpointSave |--cp1--|--cp2--|--cp3--|--cp4--|--cp5--|
* checkpointInUse: |--cp2--|--cp4--| * chkpInUse: |--cp2--|--cp4--|
* checkpointInUse is doing translation, cannot del until * chkpInUse is doing translation, cannot del until
* replication is finished * replication is finished
*/ */
int32_t delObsoleteCheckpoint(void* arg, const char* path) { int32_t delObsoleteCheckpoint(void* arg, const char* path) {
SStreamMeta* pMeta = arg; SStreamMeta* pMeta = arg;
taosWLockLatch(&pMeta->checkpointDirLock); taosWLockLatch(&pMeta->chkpDirLock);
SArray* checkpointDel = taosArrayInit(10, sizeof(int64_t)); SArray* chkpDel = taosArrayInit(10, sizeof(int64_t));
SArray* checkpointDup = taosArrayInit(10, sizeof(int64_t)); SArray* chkpDup = taosArrayInit(10, sizeof(int64_t));
int64_t minId = 0; int64_t minId = 0;
if (taosArrayGetSize(pMeta->checkpointInUse) >= 1) { if (taosArrayGetSize(pMeta->chkpInUse) >= 1) {
minId = *(int64_t*)taosArrayGet(pMeta->checkpointInUse, 0); minId = *(int64_t*)taosArrayGet(pMeta->chkpInUse, 0);
for (int i = 0; i < taosArrayGetSize(pMeta->checkpointSaved); i++) { for (int i = 0; i < taosArrayGetSize(pMeta->chkpSaved); i++) {
int64_t id = *(int64_t*)taosArrayGet(pMeta->checkpointSaved, i); int64_t id = *(int64_t*)taosArrayGet(pMeta->chkpSaved, i);
if (id >= minId) { if (id >= minId) {
taosArrayPush(checkpointDup, &id); taosArrayPush(chkpDup, &id);
} else { } else {
taosArrayPush(checkpointDel, &id); taosArrayPush(chkpDel, &id);
} }
} }
} else { } else {
int32_t sz = taosArrayGetSize(pMeta->checkpointSaved); int32_t sz = taosArrayGetSize(pMeta->chkpSaved);
int32_t dsz = sz - pMeta->checkpointCap; // del size int32_t dsz = sz - pMeta->chkpCap; // del size
for (int i = 0; i < dsz; i++) { for (int i = 0; i < dsz; i++) {
int64_t id = *(int64_t*)taosArrayGet(pMeta->checkpointSaved, i); int64_t id = *(int64_t*)taosArrayGet(pMeta->chkpSaved, i);
taosArrayPush(checkpointDel, &id); taosArrayPush(chkpDel, &id);
} }
for (int i = dsz < 0 ? 0 : dsz; i < sz; i++) { for (int i = dsz < 0 ? 0 : dsz; i < sz; i++) {
int64_t id = *(int64_t*)taosArrayGet(pMeta->checkpointSaved, i); int64_t id = *(int64_t*)taosArrayGet(pMeta->chkpSaved, i);
taosArrayPush(checkpointDup, &id); taosArrayPush(chkpDup, &id);
} }
} }
taosArrayDestroy(pMeta->checkpointSaved); taosArrayDestroy(pMeta->chkpSaved);
pMeta->checkpointSaved = checkpointDup; pMeta->chkpSaved = chkpDup;
taosWUnLockLatch(&pMeta->checkpointDirLock); taosWUnLockLatch(&pMeta->chkpDirLock);
for (int i = 0; i < taosArrayGetSize(checkpointDel); i++) { for (int i = 0; i < taosArrayGetSize(chkpDel); i++) {
int64_t id = *(int64_t*)taosArrayGet(checkpointDel, i); int64_t id = *(int64_t*)taosArrayGet(chkpDel, i);
char tbuf[256] = {0}; char tbuf[256] = {0};
sprintf(tbuf, "%s/checkpoint%" PRId64 "", path, id); sprintf(tbuf, "%s%scheckpoint%" PRId64 "", path, TD_DIRSEP, id);
if (taosIsDir(tbuf)) { if (taosIsDir(tbuf)) {
taosRemoveDir(tbuf); taosRemoveDir(tbuf);
} }
} }
taosArrayDestroy(checkpointDel); taosArrayDestroy(chkpDel);
return 0; return 0;
} }
@ -481,16 +721,21 @@ int32_t streamBackendLoadCheckpointInfo(void* arg) {
int32_t code = 0; int32_t code = 0;
int32_t len = strlen(pMeta->path) + 30; int32_t len = strlen(pMeta->path) + 30;
char* checkpointPath = taosMemoryCalloc(1, len); char* chkpPath = taosMemoryCalloc(1, len);
sprintf(checkpointPath, "%s/%s", pMeta->path, "checkpoints"); sprintf(chkpPath, "%s%s%s", pMeta->path, TD_DIRSEP, "checkpoints");
if (!taosDirExist(checkpointPath)) { if (!taosDirExist(chkpPath)) {
// no checkpoint, nothing to load // no checkpoint, nothing to load
taosMemoryFree(chkpPath);
return 0; return 0;
} }
TdDirPtr pDir = taosOpenDir(checkpointPath); TdDirPtr pDir = taosOpenDir(chkpPath);
if (pDir == NULL) return 0;
if (pDir == NULL) {
taosMemoryFree(chkpPath);
return 0;
}
TdDirEntryPtr de = NULL; TdDirEntryPtr de = NULL;
SArray* suffix = taosArrayInit(4, sizeof(int64_t)); SArray* suffix = taosArrayInit(4, sizeof(int64_t));
@ -514,12 +759,12 @@ int32_t streamBackendLoadCheckpointInfo(void* arg) {
for (int i = 0; i < taosArrayGetSize(suffix); i++) { for (int i = 0; i < taosArrayGetSize(suffix); i++) {
int64_t id = *(int64_t*)taosArrayGet(suffix, i); int64_t id = *(int64_t*)taosArrayGet(suffix, i);
taosArrayPush(pMeta->checkpointSaved, &id); taosArrayPush(pMeta->chkpSaved, &id);
} }
taosArrayDestroy(suffix); taosArrayDestroy(suffix);
taosCloseDir(&pDir); taosCloseDir(&pDir);
taosMemoryFree(checkpointPath); taosMemoryFree(chkpPath);
return 0; return 0;
} }
int32_t streamBackendDoCheckpoint(void* arg, uint64_t checkpointId) { int32_t streamBackendDoCheckpoint(void* arg, uint64_t checkpointId) {
@ -529,7 +774,7 @@ int32_t streamBackendDoCheckpoint(void* arg, uint64_t checkpointId) {
int32_t code = -1; int32_t code = -1;
char path[256] = {0}; char path[256] = {0};
sprintf(path, "%s/%s", pMeta->path, "checkpoints"); sprintf(path, "%s%s%s", pMeta->path, TD_DIRSEP, "checkpoints");
code = taosMulModeMkDir(path, 0755); code = taosMulModeMkDir(path, 0755);
if (code != 0) { if (code != 0) {
qError("failed to prepare checkpoint dir, path:%s, reason:%s", path, tstrerror(code)); qError("failed to prepare checkpoint dir, path:%s, reason:%s", path, tstrerror(code));
@ -566,9 +811,9 @@ int32_t streamBackendDoCheckpoint(void* arg, uint64_t checkpointId) {
} }
rocksdb_checkpoint_object_destroy(cp); rocksdb_checkpoint_object_destroy(cp);
} }
taosWLockLatch(&pMeta->checkpointDirLock); taosWLockLatch(&pMeta->chkpDirLock);
taosArrayPush(pMeta->checkpointSaved, &checkpointId); taosArrayPush(pMeta->chkpSaved, &checkpointId);
taosWUnLockLatch(&pMeta->checkpointDirLock); taosWUnLockLatch(&pMeta->chkpDirLock);
delObsoleteCheckpoint(arg, path); delObsoleteCheckpoint(arg, path);
@ -618,9 +863,9 @@ int defaultKeyComp(void* state, const char* aBuf, size_t aLen, const char* bBuf,
return ret; return ret;
} }
} }
int streamStateValueIsStale(char* vv) { int streamStateValueIsStale(char* v) {
int64_t ts = 0; int64_t ts = 0;
taosDecodeFixedI64(vv, &ts); taosDecodeFixedI64(v, &ts);
return (ts != 0 && ts < taosGetTimestampMs()) ? 1 : 0; return (ts != 0 && ts < taosGetTimestampMs()) ? 1 : 0;
} }
int iterValueIsStale(rocksdb_iterator_t* iter) { int iterValueIsStale(rocksdb_iterator_t* iter) {
@ -726,8 +971,8 @@ int stateSessionKeyDBComp(void* state, const char* aBuf, size_t aLen, const char
return stateSessionKeyCmpr(&w1, sizeof(w1), &w2, sizeof(w2)); return stateSessionKeyCmpr(&w1, sizeof(w1), &w2, sizeof(w2));
} }
int stateSessionKeyEncode(void* ses, char* buf) { int stateSessionKeyEncode(void* k, char* buf) {
SStateSessionKey* sess = ses; SStateSessionKey* sess = k;
int len = 0; int len = 0;
len += taosEncodeFixedI64((void**)&buf, sess->key.win.skey); len += taosEncodeFixedI64((void**)&buf, sess->key.win.skey);
len += taosEncodeFixedI64((void**)&buf, sess->key.win.ekey); len += taosEncodeFixedI64((void**)&buf, sess->key.win.ekey);
@ -735,8 +980,8 @@ int stateSessionKeyEncode(void* ses, char* buf) {
len += taosEncodeFixedI64((void**)&buf, sess->opNum); len += taosEncodeFixedI64((void**)&buf, sess->opNum);
return len; return len;
} }
int stateSessionKeyDecode(void* ses, char* buf) { int stateSessionKeyDecode(void* k, char* buf) {
SStateSessionKey* sess = ses; SStateSessionKey* sess = k;
int len = 0; int len = 0;
char* p = buf; char* p = buf;
@ -951,33 +1196,23 @@ int32_t decodeValueFunc(void* value, int32_t vlen, int64_t* ttl, char** dest) {
SStreamValue key = {0}; SStreamValue key = {0};
char* p = value; char* p = value;
if (streamStateValueIsStale(p)) { if (streamStateValueIsStale(p)) {
if (dest != NULL) *dest = NULL; goto _EXCEPT;
return -1;
} }
p = taosDecodeFixedI64(p, &key.unixTimestamp); p = taosDecodeFixedI64(p, &key.unixTimestamp);
p = taosDecodeFixedI32(p, &key.len); p = taosDecodeFixedI32(p, &key.len);
if (vlen != (sizeof(int64_t) + sizeof(int32_t) + key.len)) { if (vlen != (sizeof(int64_t) + sizeof(int32_t) + key.len)) {
if (dest != NULL) *dest = NULL;
qError("vlen: %d, read len: %d", vlen, key.len); qError("vlen: %d, read len: %d", vlen, key.len);
return -1; goto _EXCEPT;
} }
if (key.len != 0 && dest != NULL) p = taosDecodeBinary(p, (void**)dest, key.len);
if (key.len == 0) { if (ttl != NULL) *ttl = key.unixTimestamp == 0 ? 0 : key.unixTimestamp - taosGetTimestampMs();
key.data = NULL;
} else {
p = taosDecodeBinary(p, (void**)&(key.data), key.len);
}
if (ttl != NULL) {
int64_t now = taosGetTimestampMs();
*ttl = key.unixTimestamp == 0 ? 0 : key.unixTimestamp - now;
}
if (dest != NULL) {
*dest = key.data;
} else {
taosMemoryFree(key.data);
}
return key.len; return key.len;
_EXCEPT:
if (dest != NULL) *dest = NULL;
if (ttl != NULL) *ttl = 0;
return -1;
} }
const char* compareDefaultName(void* arg) { const char* compareDefaultName(void* arg) {
@ -1097,12 +1332,14 @@ int32_t streamStateOpenBackendCf(void* backend, char* name, char** cfs, int32_t
cfHandle[0] = NULL; cfHandle[0] = NULL;
} }
rocksdb_options_destroy(cfOpts[0]); rocksdb_options_destroy(cfOpts[0]);
handle->db = db; handle->db = db;
static int32_t cfLen = sizeof(ginitDict) / sizeof(ginitDict[0]); static int32_t cfLen = sizeof(ginitDict) / sizeof(ginitDict[0]);
for (int i = 0; i < nCf; i++) { for (int i = 0; i < nCf; i++) {
char* cf = cfs[i]; char* cf = cfs[i];
if (i == 0) continue; if (i == 0) continue; // skip default column family, not set opt
char funcname[64] = {0}; char funcname[64] = {0};
if (3 == sscanf(cf, "0x%" PRIx64 "-%d_%s", &streamId, &taskId, funcname)) { if (3 == sscanf(cf, "0x%" PRIx64 "-%d_%s", &streamId, &taskId, funcname)) {
char idstr[128] = {0}; char idstr[128] = {0};
@ -1135,9 +1372,9 @@ int32_t streamStateOpenBackendCf(void* backend, char* name, char** cfs, int32_t
inst->pHandle[idx] = cfHandle[i]; inst->pHandle[idx] = cfHandle[i];
} }
} }
void** pIter = taosHashIterate(handle->cfInst, NULL); void* pIter = taosHashIterate(handle->cfInst, NULL);
while (pIter) { while (pIter) {
RocksdbCfInst* inst = *pIter; RocksdbCfInst* inst = *(RocksdbCfInst**)pIter;
for (int i = 0; i < cfLen; i++) { for (int i = 0; i < cfLen; i++) {
if (inst->cfOpt[i] == NULL) { if (inst->cfOpt[i] == NULL) {
@ -1178,8 +1415,8 @@ int streamStateOpenBackend(void* backend, SStreamState* pState) {
// taosAcquireRef(streamBackendId, pState->streamBackendRid); // taosAcquireRef(streamBackendId, pState->streamBackendRid);
SBackendWrapper* handle = backend; SBackendWrapper* handle = backend;
SBackendCfWrapper* pBackendCfWrapper = taosMemoryCalloc(1, sizeof(SBackendCfWrapper)); SBackendCfWrapper* pBackendCfWrapper = taosMemoryCalloc(1, sizeof(SBackendCfWrapper));
taosThreadMutexLock(&handle->cfMutex);
taosThreadMutexLock(&handle->cfMutex);
RocksdbCfInst** ppInst = taosHashGet(handle->cfInst, pState->pTdbState->idstr, strlen(pState->pTdbState->idstr) + 1); RocksdbCfInst** ppInst = taosHashGet(handle->cfInst, pState->pTdbState->idstr, strlen(pState->pTdbState->idstr) + 1);
if (ppInst != NULL && *ppInst != NULL) { if (ppInst != NULL && *ppInst != NULL) {
RocksdbCfInst* inst = *ppInst; RocksdbCfInst* inst = *ppInst;
@ -1470,6 +1707,7 @@ int32_t streamStateClear_rocksdb(SStreamState* pState) {
qDebug("streamStateClear_rocksdb"); qDebug("streamStateClear_rocksdb");
SBackendCfWrapper* wrapper = pState->pTdbState->pBackendCfWrapper; SBackendCfWrapper* wrapper = pState->pTdbState->pBackendCfWrapper;
char sKeyStr[128] = {0}; char sKeyStr[128] = {0};
char eKeyStr[128] = {0}; char eKeyStr[128] = {0};
SStateKey sKey = {.key = {.ts = 0, .groupId = 0}, .opNum = pState->number}; SStateKey sKey = {.key = {.ts = 0, .groupId = 0}, .opNum = pState->number};
@ -1508,8 +1746,8 @@ int32_t streamStateGetFirst_rocksdb(SStreamState* pState, SWinKey* key) {
qDebug("streamStateGetFirst_rocksdb"); qDebug("streamStateGetFirst_rocksdb");
SWinKey tmp = {.ts = 0, .groupId = 0}; SWinKey tmp = {.ts = 0, .groupId = 0};
streamStatePut_rocksdb(pState, &tmp, NULL, 0); streamStatePut_rocksdb(pState, &tmp, NULL, 0);
SStreamStateCur* pCur = streamStateSeekKeyNext_rocksdb(pState, &tmp);
SStreamStateCur* pCur = streamStateSeekKeyNext_rocksdb(pState, &tmp);
int32_t code = streamStateGetKVByCur_rocksdb(pCur, key, NULL, 0); int32_t code = streamStateGetKVByCur_rocksdb(pCur, key, NULL, 0);
streamStateFreeCur(pCur); streamStateFreeCur(pCur);
streamStateDel_rocksdb(pState, &tmp); streamStateDel_rocksdb(pState, &tmp);
@ -1631,6 +1869,9 @@ SStreamStateCur* streamStateSeekToLast_rocksdb(SStreamState* pState, const SWinK
const SStateKey maxStateKey = {.key = {.groupId = UINT64_MAX, .ts = INT64_MAX}, .opNum = INT64_MAX}; const SStateKey maxStateKey = {.key = {.groupId = UINT64_MAX, .ts = INT64_MAX}, .opNum = INT64_MAX};
STREAM_STATE_PUT_ROCKSDB(pState, "state", &maxStateKey, "", 0); STREAM_STATE_PUT_ROCKSDB(pState, "state", &maxStateKey, "", 0);
if (code != 0) {
return NULL;
}
char buf[128] = {0}; char buf[128] = {0};
int32_t klen = stateKeyEncode((void*)&maxStateKey, buf); int32_t klen = stateKeyEncode((void*)&maxStateKey, buf);
@ -1667,9 +1908,10 @@ SStreamStateCur* streamStateSeekToLast_rocksdb(SStreamState* pState, const SWinK
SStreamStateCur* streamStateGetCur_rocksdb(SStreamState* pState, const SWinKey* key) { SStreamStateCur* streamStateGetCur_rocksdb(SStreamState* pState, const SWinKey* key) {
qDebug("streamStateGetCur_rocksdb"); qDebug("streamStateGetCur_rocksdb");
SBackendCfWrapper* wrapper = pState->pTdbState->pBackendCfWrapper; SBackendCfWrapper* wrapper = pState->pTdbState->pBackendCfWrapper;
SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur));
SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur));
if (pCur == NULL) return NULL; if (pCur == NULL) return NULL;
pCur->db = wrapper->rocksdb; pCur->db = wrapper->rocksdb;
pCur->iter = streamStateIterCreate(pState, "state", (rocksdb_snapshot_t**)&pCur->snapshot, pCur->iter = streamStateIterCreate(pState, "state", (rocksdb_snapshot_t**)&pCur->snapshot,
(rocksdb_readoptions_t**)&pCur->readOpt); (rocksdb_readoptions_t**)&pCur->readOpt);
@ -1727,23 +1969,21 @@ int32_t streamStateSessionGet_rocksdb(SStreamState* pState, SSessionKey* key, vo
SSessionKey resKey = *key; SSessionKey resKey = *key;
void* tmp = NULL; void* tmp = NULL;
int32_t vLen = 0; int32_t vLen = 0;
code = streamStateSessionGetKVByCur_rocksdb(pCur, &resKey, &tmp, &vLen);
if (code == 0) {
if (pVLen != NULL) *pVLen = vLen;
if (key->win.skey != resKey.win.skey) { code = streamStateSessionGetKVByCur_rocksdb(pCur, &resKey, &tmp, &vLen);
code = -1; if (code == 0 && key->win.skey != resKey.win.skey) {
} else {
*key = resKey; *key = resKey;
if (pVal != NULL && pVLen != NULL) {
*pVal = taosMemoryCalloc(1, *pVLen); if (pVal) {
memcpy(*pVal, tmp, *pVLen); *pVal = tmp;
} tmp = NULL;
} };
if (pVLen) *pVLen = vLen;
} else {
code = -1;
} }
taosMemoryFree(tmp); taosMemoryFree(tmp);
streamStateFreeCur(pCur);
// impl later
return code; return code;
} }
@ -1789,8 +2029,6 @@ SStreamStateCur* streamStateSessionSeekKeyCurrentPrev_rocksdb(SStreamState* pSta
rocksdb_iter_prev(pCur->iter); rocksdb_iter_prev(pCur->iter);
if (!rocksdb_iter_valid(pCur->iter)) { if (!rocksdb_iter_valid(pCur->iter)) {
// qWarn("streamState failed to seek key prev
// %s", toString);
streamStateFreeCur(pCur); streamStateFreeCur(pCur);
return NULL; return NULL;
} }
@ -1809,9 +2047,9 @@ SStreamStateCur* streamStateSessionSeekKeyCurrentNext_rocksdb(SStreamState* pSta
pCur->number = pState->number; pCur->number = pState->number;
char buf[128] = {0}; char buf[128] = {0};
SStateSessionKey sKey = {.key = *key, .opNum = pState->number}; SStateSessionKey sKey = {.key = *key, .opNum = pState->number};
int len = stateSessionKeyEncode(&sKey, buf); int len = stateSessionKeyEncode(&sKey, buf);
if (!streamStateIterSeekAndValid(pCur->iter, buf, len)) { if (!streamStateIterSeekAndValid(pCur->iter, buf, len)) {
streamStateFreeCur(pCur); streamStateFreeCur(pCur);
return NULL; return NULL;
@ -1859,6 +2097,7 @@ SStreamStateCur* streamStateSessionSeekKeyNext_rocksdb(SStreamState* pState, con
streamStateFreeCur(pCur); streamStateFreeCur(pCur);
return NULL; return NULL;
} }
size_t klen; size_t klen;
const char* iKey = rocksdb_iter_key(pCur->iter, &klen); const char* iKey = rocksdb_iter_key(pCur->iter, &klen);
SStateSessionKey curKey = {0}; SStateSessionKey curKey = {0};
@ -2355,9 +2594,7 @@ void* streamDefaultIterCreate_rocksdb(SStreamState* pState) {
} }
int32_t streamDefaultIterValid_rocksdb(void* iter) { int32_t streamDefaultIterValid_rocksdb(void* iter) {
SStreamStateCur* pCur = iter; SStreamStateCur* pCur = iter;
bool val = rocksdb_iter_valid(pCur->iter); return rocksdb_iter_valid(pCur->iter) ? 1 : 0;
return val ? 1 : 0;
} }
void streamDefaultIterSeek_rocksdb(void* iter, const char* key) { void streamDefaultIterSeek_rocksdb(void* iter, const char* key) {
SStreamStateCur* pCur = iter; SStreamStateCur* pCur = iter;
@ -2373,13 +2610,16 @@ char* streamDefaultIterKey_rocksdb(void* iter, int32_t* len) {
} }
char* streamDefaultIterVal_rocksdb(void* iter, int32_t* len) { char* streamDefaultIterVal_rocksdb(void* iter, int32_t* len) {
SStreamStateCur* pCur = iter; SStreamStateCur* pCur = iter;
char* ret = NULL;
int32_t vlen = 0; int32_t vlen = 0;
char* dst = NULL; const char* val = rocksdb_iter_value(pCur->iter, (size_t*)&vlen);
const char* vval = rocksdb_iter_value(pCur->iter, (size_t*)&vlen); *len = decodeValueFunc((void*)val, vlen, NULL, &ret);
if (decodeValueFunc((void*)vval, vlen, NULL, &dst) < 0) { if (*len < 0) {
return NULL; return NULL;
} }
return dst;
return ret;
} }
// batch func // batch func
void* streamStateCreateBatch() { void* streamStateCreateBatch() {
@ -2396,8 +2636,8 @@ void streamStateDestroyBatch(void* pBatch) { rocksdb_writebatch_destroy((rock
int32_t streamStatePutBatch(SStreamState* pState, const char* cfKeyName, rocksdb_writebatch_t* pBatch, void* key, int32_t streamStatePutBatch(SStreamState* pState, const char* cfKeyName, rocksdb_writebatch_t* pBatch, void* key,
void* val, int32_t vlen, int64_t ttl) { void* val, int32_t vlen, int64_t ttl) {
SBackendCfWrapper* wrapper = pState->pTdbState->pBackendCfWrapper; SBackendCfWrapper* wrapper = pState->pTdbState->pBackendCfWrapper;
int i = streamStateGetCfIdx(pState, cfKeyName);
int i = streamStateGetCfIdx(pState, cfKeyName);
if (i < 0) { if (i < 0) {
qError("streamState failed to put to cf name:%s", cfKeyName); qError("streamState failed to put to cf name:%s", cfKeyName);
return -1; return -1;
@ -2434,6 +2674,7 @@ int32_t streamStatePutBatchOptimize(SStreamState* pState, int32_t cfIdx, rocksdb
if (tmpBuf == NULL) { if (tmpBuf == NULL) {
taosMemoryFree(ttlV); taosMemoryFree(ttlV);
} }
{ {
char tbuf[256] = {0}; char tbuf[256] = {0};
ginitDict[cfIdx].toStrFunc((void*)key, tbuf); ginitDict[cfIdx].toStrFunc((void*)key, tbuf);

View File

@ -48,22 +48,13 @@ SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandF
return NULL; return NULL;
} }
int32_t len = strlen(path) + 20; char* tpath = taosMemoryCalloc(1, strlen(path) + 64);
char* streamPath = taosMemoryCalloc(1, len); sprintf(tpath, "%s%s%s", path, TD_DIRSEP, "stream");
sprintf(streamPath, "%s/%s", path, "stream"); pMeta->path = tpath;
pMeta->path = taosStrdup(streamPath);
if (tdbOpen(pMeta->path, 16 * 1024, 1, &pMeta->db, 0) < 0) { if (tdbOpen(pMeta->path, 16 * 1024, 1, &pMeta->db, 0) < 0) {
goto _err; goto _err;
} }
memset(streamPath, 0, len);
sprintf(streamPath, "%s/%s", pMeta->path, "checkpoints");
code = taosMulModeMkDir(streamPath, 0755);
if (code != 0) {
terrno = TAOS_SYSTEM_ERROR(code);
goto _err;
}
if (tdbTbOpen("task.db", sizeof(int32_t), -1, NULL, pMeta->db, &pMeta->pTaskDb, 0) < 0) { if (tdbTbOpen("task.db", sizeof(int32_t), -1, NULL, pMeta->db, &pMeta->pTaskDb, 0) < 0) {
goto _err; goto _err;
} }
@ -100,12 +91,13 @@ SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandF
pMeta->pTaskBackendUnique = pMeta->pTaskBackendUnique =
taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK); taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK);
pMeta->checkpointSaved = taosArrayInit(4, sizeof(int64_t)); pMeta->chkpSaved = taosArrayInit(4, sizeof(int64_t));
pMeta->checkpointInUse = taosArrayInit(4, sizeof(int64_t)); pMeta->chkpInUse = taosArrayInit(4, sizeof(int64_t));
pMeta->checkpointCap = 8; pMeta->chkpCap = 8;
taosInitRWLatch(&pMeta->checkpointDirLock); taosInitRWLatch(&pMeta->chkpDirLock);
int64_t chkpId = streamGetLatestCheckpointId(pMeta); int64_t chkpId = streamGetLatestCheckpointId(pMeta);
pMeta->chkpId = chkpId;
pMeta->streamBackend = streamBackendInit(pMeta->path, chkpId); pMeta->streamBackend = streamBackendInit(pMeta->path, chkpId);
if (pMeta->streamBackend == NULL) { if (pMeta->streamBackend == NULL) {
@ -118,9 +110,6 @@ SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandF
terrno = TAOS_SYSTEM_ERROR(code); terrno = TAOS_SYSTEM_ERROR(code);
goto _err; goto _err;
} }
taosMemoryFree(streamPath);
taosInitRWLatch(&pMeta->lock); taosInitRWLatch(&pMeta->lock);
taosThreadMutexInit(&pMeta->backendMutex, NULL); taosThreadMutexInit(&pMeta->backendMutex, NULL);
@ -128,7 +117,6 @@ SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandF
return pMeta; return pMeta;
_err: _err:
taosMemoryFree(streamPath);
taosMemoryFree(pMeta->path); taosMemoryFree(pMeta->path);
if (pMeta->pTasks) taosHashCleanup(pMeta->pTasks); if (pMeta->pTasks) taosHashCleanup(pMeta->pTasks);
if (pMeta->pTaskList) taosArrayDestroy(pMeta->pTaskList); if (pMeta->pTaskList) taosArrayDestroy(pMeta->pTaskList);
@ -141,6 +129,66 @@ _err:
return NULL; return NULL;
} }
int32_t streamMetaReopen(SStreamMeta* pMeta, int64_t chkpId) {
// stop all running tasking and reopen later
void* pIter = NULL;
while (1) {
pIter = taosHashIterate(pMeta->pTasks, pIter);
if (pIter == NULL) {
break;
}
SStreamTask* pTask = *(SStreamTask**)pIter;
if (pTask->schedTimer) {
taosTmrStop(pTask->schedTimer);
pTask->schedTimer = NULL;
}
if (pTask->launchTaskTimer) {
taosTmrStop(pTask->launchTaskTimer);
pTask->launchTaskTimer = NULL;
}
tFreeStreamTask(pTask);
}
// close stream backend
streamBackendCleanup(pMeta->streamBackend);
taosRemoveRef(streamBackendId, pMeta->streamBackendRid);
pMeta->streamBackendRid = -1;
pMeta->streamBackend = NULL;
char* defaultPath = taosMemoryCalloc(1, strlen(pMeta->path) + 64);
sprintf(defaultPath, "%s%s%s", pMeta->path, TD_DIRSEP, "state");
taosRemoveDir(defaultPath);
char* newPath = taosMemoryCalloc(1, strlen(pMeta->path) + 64);
sprintf(newPath, "%s%s%s", pMeta->path, TD_DIRSEP, "received");
if (taosRenameFile(newPath, defaultPath) < 0) {
taosMemoryFree(defaultPath);
taosMemoryFree(newPath);
return -1;
}
pMeta->streamBackend = streamBackendInit(pMeta->path, 0);
if (pMeta->streamBackend == NULL) {
return -1;
}
pMeta->streamBackendRid = taosAddRef(streamBackendId, pMeta->streamBackend);
taosHashClear(pMeta->pTasks);
taosArrayClear(pMeta->pTaskList);
taosHashClear(pMeta->pTaskBackendUnique);
taosArrayClear(pMeta->chkpSaved);
taosArrayClear(pMeta->chkpInUse);
return 0;
}
void streamMetaClose(SStreamMeta* pMeta) { void streamMetaClose(SStreamMeta* pMeta) {
qDebug("start to close stream meta"); qDebug("start to close stream meta");
if (pMeta == NULL) { if (pMeta == NULL) {
@ -168,8 +216,8 @@ void streamMetaClose(SStreamMeta* pMeta) {
taosThreadMutexDestroy(&pMeta->backendMutex); taosThreadMutexDestroy(&pMeta->backendMutex);
taosHashCleanup(pMeta->pTaskBackendUnique); taosHashCleanup(pMeta->pTaskBackendUnique);
taosArrayDestroy(pMeta->checkpointSaved); taosArrayDestroy(pMeta->chkpSaved);
taosArrayDestroy(pMeta->checkpointInUse); taosArrayDestroy(pMeta->chkpInUse);
taosMemoryFree(pMeta); taosMemoryFree(pMeta);
qDebug("end to close stream meta"); qDebug("end to close stream meta");

View File

@ -64,6 +64,7 @@ struct SStreamSnapReader {
int64_t sver; int64_t sver;
int64_t ever; int64_t ever;
SStreamSnapHandle handle; SStreamSnapHandle handle;
int64_t checkpointId;
}; };
struct SStreamSnapWriter { struct SStreamSnapWriter {
void* pMeta; void* pMeta;
@ -78,31 +79,60 @@ const char* ROCKSDB_CURRENT = "CURRENT";
const char* ROCKSDB_CHECKPOINT_META = "CHECKPOINT"; const char* ROCKSDB_CHECKPOINT_META = "CHECKPOINT";
static int64_t kBlockSize = 64 * 1024; static int64_t kBlockSize = 64 * 1024;
int32_t streamSnapHandleInit(SStreamSnapHandle* handle, char* path); int32_t streamSnapHandleInit(SStreamSnapHandle* handle, char* path, int64_t chkpId);
void streamSnapHandleDestroy(SStreamSnapHandle* handle); void streamSnapHandleDestroy(SStreamSnapHandle* handle);
// static void streamBuildFname(char* path, char* file, char* fullname) // static void streamBuildFname(char* path, char* file, char* fullname)
#define STREAM_ROCKSDB_BUILD_FULLNAME(path, file, fullname) \ #define STREAM_ROCKSDB_BUILD_FULLNAME(path, file, fullname) \
do { \ do { \
sprintf(fullname, "%s/%s", path, file); \ sprintf(fullname, "%s%s%s", path, TD_DIRSEP, file); \
} while (0) } while (0)
int32_t streamSnapHandleInit(SStreamSnapHandle* pHandle, char* path) { int32_t streamGetFileSize(char* path, char* name, int64_t* sz) {
int ret = 0;
char* fullname = taosMemoryCalloc(1, strlen(path) + 32);
sprintf(fullname, "%s%s%s", path, TD_DIRSEP, name);
ret = taosStatFile(fullname, sz, NULL);
taosMemoryFree(fullname);
return ret;
}
TdFilePtr streamOpenFile(char* path, char* name, int32_t opt) {
char fullname[256] = {0};
STREAM_ROCKSDB_BUILD_FULLNAME(path, name, fullname);
return taosOpenFile(fullname, opt);
}
int32_t streamSnapHandleInit(SStreamSnapHandle* pHandle, char* path, int64_t chkpId) {
// impl later // impl later
int len = strlen(path);
char* tdir = taosMemoryCalloc(1, len + 128);
memcpy(tdir, path, len);
if (chkpId != 0) {
sprintf(tdir, "%s%s%s%s%s%scheckpoint%" PRId64 "", path, TD_DIRSEP, "stream", TD_DIRSEP, "checkpoints", TD_DIRSEP,
chkpId);
} else {
sprintf(tdir, "%s%s%s%s%s", path, TD_DIRSEP, "stream", TD_DIRSEP, "state");
}
int32_t code = 0; int32_t code = 0;
TdDirPtr pDir = taosOpenDir(path); TdDirPtr pDir = taosOpenDir(tdir);
if (NULL == pDir) { if (NULL == pDir) {
qError("stream-state failed to open %s", tdir);
goto _err; goto _err;
} }
SBanckendFile* pFile = taosMemoryCalloc(1, sizeof(SBanckendFile)); SBanckendFile* pFile = taosMemoryCalloc(1, sizeof(SBanckendFile));
pHandle->pBackendFile = pFile; pHandle->pBackendFile = pFile;
pHandle->checkpointId = 0; pHandle->checkpointId = chkpId;
pHandle->seraial = 0; pHandle->seraial = 0;
pFile->path = taosStrdup(path); pFile->path = tdir;
pFile->pSst = taosArrayInit(16, sizeof(void*)); pFile->pSst = taosArrayInit(16, sizeof(void*));
TdDirEntryPtr pDirEntry; TdDirEntryPtr pDirEntry;
@ -117,7 +147,7 @@ int32_t streamSnapHandleInit(SStreamSnapHandle* pHandle, char* path) {
continue; continue;
} }
if (strlen(name) >= strlen(ROCKSDB_OPTIONS) && 0 == strncmp(name, ROCKSDB_OPTIONS, strlen(ROCKSDB_OPTIONS))) { if (strlen(name) >= strlen(ROCKSDB_OPTIONS) && 0 == strncmp(name, ROCKSDB_OPTIONS, strlen(ROCKSDB_OPTIONS))) {
pFile->pMainfest = taosStrdup(name); pFile->pOptions = taosStrdup(name);
continue; continue;
} }
if (strlen(name) >= strlen(ROCKSDB_CHECKPOINT_META) && if (strlen(name) >= strlen(ROCKSDB_CHECKPOINT_META) &&
@ -134,7 +164,9 @@ int32_t streamSnapHandleInit(SStreamSnapHandle* pHandle, char* path) {
taosCloseDir(&pDir); taosCloseDir(&pDir);
if (pFile->pCurrent == NULL) { if (pFile->pCurrent == NULL) {
qError("stream-state failed to open %s, reason: no valid file", tdir);
code = -1; code = -1;
tdir = NULL;
goto _err; goto _err;
} }
SArray* list = taosArrayInit(64, sizeof(SBackendFileItem)); SArray* list = taosArrayInit(64, sizeof(SBackendFileItem));
@ -143,50 +175,45 @@ int32_t streamSnapHandleInit(SStreamSnapHandle* pHandle, char* path) {
// current // current
item.name = pFile->pCurrent; item.name = pFile->pCurrent;
item.type = ROCKSDB_CURRENT_TYPE; item.type = ROCKSDB_CURRENT_TYPE;
taosStatFile(pFile->pCurrent, &item.size, NULL); streamGetFileSize(pFile->path, item.name, &item.size);
taosArrayPush(list, &item); taosArrayPush(list, &item);
// mainfest // mainfest
item.name = pFile->pMainfest; item.name = pFile->pMainfest;
item.type = ROCKSDB_MAINFEST_TYPE; item.type = ROCKSDB_MAINFEST_TYPE;
taosStatFile(pFile->pMainfest, &item.size, NULL); streamGetFileSize(pFile->path, item.name, &item.size);
taosArrayPush(list, &item); taosArrayPush(list, &item);
// options // options
item.name = pFile->pOptions; item.name = pFile->pOptions;
item.type = ROCKSDB_OPTIONS_TYPE; item.type = ROCKSDB_OPTIONS_TYPE;
taosStatFile(pFile->pOptions, &item.size, NULL); streamGetFileSize(pFile->path, item.name, &item.size);
taosArrayPush(list, &item); taosArrayPush(list, &item);
// sst // sst
for (int i = 0; i < taosArrayGetSize(pFile->pSst); i++) { for (int i = 0; i < taosArrayGetSize(pFile->pSst); i++) {
char* sst = taosArrayGetP(pFile->pSst, i); char* sst = taosArrayGetP(pFile->pSst, i);
item.name = sst; item.name = sst;
item.type = ROCKSDB_SST_TYPE; item.type = ROCKSDB_SST_TYPE;
taosStatFile(sst, &item.size, NULL); streamGetFileSize(pFile->path, item.name, &item.size);
taosArrayPush(list, &item); taosArrayPush(list, &item);
} }
// meta // meta
item.name = pFile->pCheckpointMeta; item.name = pFile->pCheckpointMeta;
item.type = ROCKSDB_CHECKPOINT_META_TYPE; item.type = ROCKSDB_CHECKPOINT_META_TYPE;
taosStatFile(pFile->pCheckpointMeta, &item.size, NULL); if (streamGetFileSize(pFile->path, item.name, &item.size) == 0) {
taosArrayPush(list, &item); taosArrayPush(list, &item);
}
pHandle->pBackendFile = pFile; pHandle->pBackendFile = pFile;
pHandle->currFileIdx = 0; pHandle->currFileIdx = 0;
pHandle->pFileList = list; pHandle->pFileList = list;
char fullname[256] = {0};
char* file = taosArrayGet(pHandle->pFileList, pHandle->currFileIdx);
STREAM_ROCKSDB_BUILD_FULLNAME(pFile->path, file, fullname);
pHandle->fd = taosOpenFile(fullname, TD_FILE_READ);
if (pHandle->fd == NULL) {
goto _err;
}
pHandle->seraial = 0; pHandle->seraial = 0;
pHandle->offset = 0; pHandle->offset = 0;
return 0; return 0;
_err: _err:
streamSnapHandleDestroy(pHandle); streamSnapHandleDestroy(pHandle);
taosMemoryFreeClear(tdir);
code = -1; code = -1;
return code; return code;
@ -194,6 +221,7 @@ _err:
void streamSnapHandleDestroy(SStreamSnapHandle* handle) { void streamSnapHandleDestroy(SStreamSnapHandle* handle) {
SBanckendFile* pFile = handle->pBackendFile; SBanckendFile* pFile = handle->pBackendFile;
if (pFile) {
taosMemoryFree(pFile->pCheckpointMeta); taosMemoryFree(pFile->pCheckpointMeta);
taosMemoryFree(pFile->pCurrent); taosMemoryFree(pFile->pCurrent);
taosMemoryFree(pFile->pMainfest); taosMemoryFree(pFile->pMainfest);
@ -205,20 +233,21 @@ void streamSnapHandleDestroy(SStreamSnapHandle* handle) {
} }
taosArrayDestroy(pFile->pSst); taosArrayDestroy(pFile->pSst);
taosMemoryFree(pFile); taosMemoryFree(pFile);
}
taosArrayDestroy(handle->pFileList); taosArrayDestroy(handle->pFileList);
taosCloseFile(&handle->fd); taosCloseFile(&handle->fd);
return; return;
} }
int32_t streamSnapReaderOpen(void* pMeta, int64_t sver, int64_t ever, char* path, SStreamSnapReader** ppReader) { int32_t streamSnapReaderOpen(void* pMeta, int64_t sver, int64_t chkpId, char* path, SStreamSnapReader** ppReader) {
// impl later // impl later
SStreamSnapReader* pReader = taosMemoryCalloc(1, sizeof(SStreamSnapReader)); SStreamSnapReader* pReader = taosMemoryCalloc(1, sizeof(SStreamSnapReader));
if (pReader == NULL) { if (pReader == NULL) {
return TSDB_CODE_OUT_OF_MEMORY; return TSDB_CODE_OUT_OF_MEMORY;
} }
// const char* path = NULL;
if (streamSnapHandleInit(&pReader->handle, (char*)path) < 0) { if (streamSnapHandleInit(&pReader->handle, (char*)path, chkpId) < 0) {
taosMemoryFree(pReader); taosMemoryFree(pReader);
return -1; return -1;
} }
@ -242,17 +271,32 @@ int32_t streamSnapRead(SStreamSnapReader* pReader, uint8_t** ppData, int64_t* si
SBackendFileItem* item = taosArrayGet(pHandle->pFileList, pHandle->currFileIdx); SBackendFileItem* item = taosArrayGet(pHandle->pFileList, pHandle->currFileIdx);
if (pHandle->fd == NULL) {
if (pHandle->currFileIdx >= taosArrayGetSize(pHandle->pFileList)) {
// finish
*ppData = NULL;
*size = 0;
return 0;
} else {
pHandle->fd = streamOpenFile(pFile->path, item->name, TD_FILE_READ);
qDebug("stream-state open file %s, current offset:%" PRId64 ", size:% " PRId64 ", file no.%d", item->name,
(int64_t)pHandle->offset, item->size, pHandle->currFileIdx);
}
}
qDebug("stream-state start to read file %s, current offset:%" PRId64 ", size:%" PRId64 ", file no.%d", item->name,
(int64_t)pHandle->offset, item->size, pHandle->currFileIdx);
uint8_t* buf = taosMemoryCalloc(1, sizeof(SStreamSnapBlockHdr) + kBlockSize); uint8_t* buf = taosMemoryCalloc(1, sizeof(SStreamSnapBlockHdr) + kBlockSize);
int64_t nread = taosPReadFile(pHandle->fd, buf + sizeof(SStreamSnapBlockHdr), kBlockSize, pHandle->offset); int64_t nread = taosPReadFile(pHandle->fd, buf + sizeof(SStreamSnapBlockHdr), kBlockSize, pHandle->offset);
if (nread == -1) { if (nread == -1) {
code = TAOS_SYSTEM_ERROR(terrno); code = TAOS_SYSTEM_ERROR(terrno);
qError("stream snap failed to read snap, file name:%s, type:%d, reason:%s", item->name, item->type, qError("stream-state snap failed to read snap, file name:%s, type:%d,reason:%s", item->name, item->type,
tstrerror(code)); tstrerror(code));
return code;
// handle later
return -1; return -1;
} else if (nread > 0 && nread <= kBlockSize) { } else if (nread > 0 && nread <= kBlockSize) {
// left bytes less than kBlockSize // left bytes less than kBlockSize
qDebug("stream-state read file %s, current offset:%" PRId64 ",size:% " PRId64 ", file no.%d", item->name,
(int64_t)pHandle->offset, item->size, pHandle->currFileIdx);
pHandle->offset += nread; pHandle->offset += nread;
if (pHandle->offset >= item->size || nread < kBlockSize) { if (pHandle->offset >= item->size || nread < kBlockSize) {
taosCloseFile(&pHandle->fd); taosCloseFile(&pHandle->fd);
@ -260,6 +304,11 @@ int32_t streamSnapRead(SStreamSnapReader* pReader, uint8_t** ppData, int64_t* si
pHandle->currFileIdx += 1; pHandle->currFileIdx += 1;
} }
} else { } else {
qDebug("stream-state no data read, close file no.%d, move to next file, open and read", pHandle->currFileIdx);
taosCloseFile(&pHandle->fd);
pHandle->offset = 0;
pHandle->currFileIdx += 1;
if (pHandle->currFileIdx >= taosArrayGetSize(pHandle->pFileList)) { if (pHandle->currFileIdx >= taosArrayGetSize(pHandle->pFileList)) {
// finish // finish
*ppData = NULL; *ppData = NULL;
@ -267,12 +316,13 @@ int32_t streamSnapRead(SStreamSnapReader* pReader, uint8_t** ppData, int64_t* si
return 0; return 0;
} }
item = taosArrayGet(pHandle->pFileList, pHandle->currFileIdx); item = taosArrayGet(pHandle->pFileList, pHandle->currFileIdx);
char fullname[256] = {0}; pHandle->fd = streamOpenFile(pFile->path, item->name, TD_FILE_READ);
STREAM_ROCKSDB_BUILD_FULLNAME(pFile->path, item->name, fullname);
pHandle->fd = taosOpenFile(fullname, TD_FILE_READ);
nread = taosPReadFile(pHandle->fd, buf + sizeof(SStreamSnapBlockHdr), kBlockSize, pHandle->offset); nread = taosPReadFile(pHandle->fd, buf + sizeof(SStreamSnapBlockHdr), kBlockSize, pHandle->offset);
pHandle->offset += nread; pHandle->offset += nread;
qDebug("stream-state open file and read file %s, current offset:%" PRId64 ", size:% " PRId64 ", file no.%d",
item->name, (int64_t)pHandle->offset, item->size, pHandle->currFileIdx);
} }
SStreamSnapBlockHdr* pHdr = (SStreamSnapBlockHdr*)buf; SStreamSnapBlockHdr* pHdr = (SStreamSnapBlockHdr*)buf;
@ -310,7 +360,7 @@ int32_t streamSnapWriterOpen(void* pMeta, int64_t sver, int64_t ever, char* path
pHandle->pFileList = list; pHandle->pFileList = list;
pHandle->currFileIdx = 0; pHandle->currFileIdx = 0;
pHandle->offset = 0; pHandle->offset = 0;
pHandle->fd = taosOpenFile(taosArrayGet(pHandle->pFileList, pHandle->currFileIdx), TD_FILE_WRITE);
*ppWriter = pWriter; *ppWriter = pWriter;
return 0; return 0;
} }
@ -321,14 +371,25 @@ int32_t streamSnapWrite(SStreamSnapWriter* pWriter, uint8_t* pData, uint32_t nDa
SStreamSnapBlockHdr* pHdr = (SStreamSnapBlockHdr*)pData; SStreamSnapBlockHdr* pHdr = (SStreamSnapBlockHdr*)pData;
SStreamSnapHandle* pHandle = &pWriter->handle; SStreamSnapHandle* pHandle = &pWriter->handle;
SBanckendFile* pFile = pHandle->pBackendFile; SBanckendFile* pFile = pHandle->pBackendFile;
SBackendFileItem* pItem = taosArrayGetP(pHandle->pFileList, pHandle->currFileIdx); SBackendFileItem* pItem = taosArrayGet(pHandle->pFileList, pHandle->currFileIdx);
if (strlen(pHdr->name) == strlen(pItem->name) && strcmp(pHdr->name, pItem->name) == 0) {
if (taosPWriteFile(pHandle->fd, pHdr->data, pHdr->size, pHandle->offset) != pHdr->size) { if (pHandle->fd == NULL) {
pHandle->fd = streamOpenFile(pFile->path, pItem->name, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_APPEND);
if (pHandle->fd == NULL) {
code = TAOS_SYSTEM_ERROR(terrno); code = TAOS_SYSTEM_ERROR(terrno);
qError("stream snap failed to write snap, file name:%s, reason:%s", pHdr->name, tstrerror(code)); qError("stream-state failed to open file name:%s%s%s, reason:%s", pFile->path, TD_DIRSEP, pHdr->name,
tstrerror(code));
}
}
if (strlen(pHdr->name) == strlen(pItem->name) && strcmp(pHdr->name, pItem->name) == 0) {
int64_t bytes = taosPWriteFile(pHandle->fd, pHdr->data, pHdr->size, pHandle->offset);
if (bytes != pHdr->size) {
code = TAOS_SYSTEM_ERROR(terrno);
qError("stream-state failed to write snap, file name:%s, reason:%s", pHdr->name, tstrerror(code));
return code; return code;
} }
pHandle->offset += pHdr->size; pHandle->offset += bytes;
} else { } else {
taosCloseFile(&pHandle->fd); taosCloseFile(&pHandle->fd);
pHandle->offset = 0; pHandle->offset = 0;
@ -339,10 +400,13 @@ int32_t streamSnapWrite(SStreamSnapWriter* pWriter, uint8_t* pData, uint32_t nDa
item.type = pHdr->type; item.type = pHdr->type;
taosArrayPush(pHandle->pFileList, &item); taosArrayPush(pHandle->pFileList, &item);
char fullname[256] = {0}; SBackendFileItem* pItem = taosArrayGet(pHandle->pFileList, pHandle->currFileIdx);
char* name = ((SBackendFileItem*)taosArrayGet(pHandle->pFileList, taosArrayGetSize(pHandle->pFileList) - 1))->name; pHandle->fd = streamOpenFile(pFile->path, pItem->name, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_APPEND);
STREAM_ROCKSDB_BUILD_FULLNAME(pFile->path, name, fullname); if (pHandle->fd == NULL) {
pHandle->fd = taosOpenFile(fullname, TD_FILE_WRITE); code = TAOS_SYSTEM_ERROR(terrno);
qError("stream-state failed to open file name:%s%s%s, reason:%s", pFile->path, TD_DIRSEP, pHdr->name,
tstrerror(code));
}
taosPWriteFile(pHandle->fd, pHdr->data, pHdr->size, pHandle->offset); taosPWriteFile(pHandle->fd, pHdr->data, pHdr->size, pHandle->offset);
pHandle->offset += pHdr->size; pHandle->offset += pHdr->size;
@ -367,6 +431,7 @@ int32_t streamSnapWriterClose(SStreamSnapWriter* pWriter, int8_t rollback) {
qDebug("stream snap get file list, %s", buf); qDebug("stream snap get file list, %s", buf);
taosMemoryFree(buf); taosMemoryFree(buf);
} }
for (int i = 0; i < taosArrayGetSize(handle->pFileList); i++) { for (int i = 0; i < taosArrayGetSize(handle->pFileList); i++) {
SBackendFileItem* item = taosArrayGet(handle->pFileList, i); SBackendFileItem* item = taosArrayGet(handle->pFileList, i);
taosMemoryFree(item->name); taosMemoryFree(item->name);

View File

@ -108,7 +108,7 @@ SStreamState* streamStateOpen(char* path, void* pTask, bool specPath, int32_t sz
SStreamTask* pStreamTask = pTask; SStreamTask* pStreamTask = pTask;
char statePath[1024]; char statePath[1024];
if (!specPath) { if (!specPath) {
sprintf(statePath, "%s/%d", path, pStreamTask->id.taskId); sprintf(statePath, "%s%s%d", path, TD_DIRSEP, pStreamTask->id.taskId);
} else { } else {
memset(statePath, 0, 1024); memset(statePath, 0, 1024);
tstrncpy(statePath, path, 1024); tstrncpy(statePath, path, 1024);

View File

@ -804,7 +804,7 @@ TEST(TdbPageRecycleTest, recycly_delete_interior_ofp_nocommit) {
// sprintf(&key[count - 2], "%c", i); // sprintf(&key[count - 2], "%c", i);
key[count - 2] = '0' + i; key[count - 2] = '0' + i;
ret = tdbTbInsert(pDb, key, count, NULL, NULL, txn); ret = tdbTbInsert(pDb, key, count, NULL, 0, txn);
GTEST_ASSERT_EQ(ret, 0); GTEST_ASSERT_EQ(ret, 0);
} }
} }

View File

@ -360,6 +360,23 @@ void taosArrayClearEx(SArray* pArray, void (*fp)(void*)) {
pArray->size = 0; pArray->size = 0;
} }
void taosArrayClearP(SArray* pArray, void (*fp)(void*)) {
// if (pArray == NULL) return;
// if (fp == NULL) {
// pArray->size = 0;
// return;
// }
// for (int32_t i = 0; i < pArray->size; ++i) {
// fp(TARRAY_GET_ELEM(pArray, i));
// }
if (pArray) {
for (int32_t i = 0; i < pArray->size; i++) {
fp(*(void**)TARRAY_GET_ELEM(pArray, i));
}
}
taosArrayClear(pArray);
}
void* taosArrayDestroy(SArray* pArray) { void* taosArrayDestroy(SArray* pArray) {
if (pArray) { if (pArray) {