Merge branch 'enh/chkpTransfer' of https://github.com/taosdata/TDengine into enh/chkpTransfer

This commit is contained in:
yihaoDeng 2023-08-10 14:57:25 +08:00
commit b2fb94d7f8
14 changed files with 442 additions and 152 deletions

View File

@ -369,11 +369,11 @@ typedef struct SStreamMeta {
int32_t chkptNotReadyTasks;
int64_t checkpointId;
SArray* checkpointSaved;
SArray* checkpointInUse;
int32_t checkpointCap;
SRWLatch checkpointDirLock;
int64_t chkpId;
SArray* chkpSaved;
SArray* chkpInUse;
int32_t chkpCap;
SRWLatch chkpDirLock;
} SStreamMeta;
int32_t tEncodeStreamEpInfo(SEncoder* pEncoder, const SStreamChildEpInfo* pInfo);
@ -611,7 +611,8 @@ SStreamTask* streamMetaAcquireTask(SStreamMeta* pMeta, int32_t taskId);
void streamMetaReleaseTask(SStreamMeta* pMeta, SStreamTask* pTask);
void streamMetaRemoveTask(SStreamMeta* pMeta, int32_t taskId);
int32_t streamStateRebuild(SStreamMeta* pMeta, char* path, int64_t chkpId);
// int32_t streamStateRebuild(SStreamMeta* pMeta, char* path, int64_t chkpId);
int32_t streamMetaReopen(SStreamMeta* pMeta, int64_t chkpId);
int32_t streamMetaBegin(SStreamMeta* pMeta);
int32_t streamMetaCommit(SStreamMeta* pMeta);

View File

@ -89,7 +89,7 @@ typedef struct SRpcInit {
int32_t retryMinInterval; // retry init interval
int32_t retryStepFactor; // retry interval factor
int32_t retryMaxInterval; // retry max interval
int64_t retryMaxTimouet;
int64_t retryMaxTimeout;
int32_t failFastThreshold;
int32_t failFastInterval;

View File

@ -200,8 +200,11 @@ void taosArrayClear(SArray* pArray);
* @param pArray
* @param fp
*/
void taosArrayClearEx(SArray* pArray, void (*fp)(void*));
void taosArrayClearP(SArray* pArray, void (*fp)(void*));
void* taosArrayDestroy(SArray* pArray);
void taosArrayDestroyP(SArray* pArray, FDelete fp);

View File

@ -169,7 +169,7 @@ void *openTransporter(const char *user, const char *auth, int32_t numOfThread) {
rpcInit.retryMinInterval = tsRedirectPeriod;
rpcInit.retryStepFactor = tsRedirectFactor;
rpcInit.retryMaxInterval = tsRedirectMaxPeriod;
rpcInit.retryMaxTimouet = tsMaxRetryWaitTime;
rpcInit.retryMaxTimeout = tsMaxRetryWaitTime;
int32_t connLimitNum = tsNumOfRpcSessions / (tsNumOfRpcThreads * 3);
connLimitNum = TMAX(connLimitNum, 10);

View File

@ -298,7 +298,7 @@ int32_t dmInitClient(SDnode *pDnode) {
rpcInit.retryMinInterval = tsRedirectPeriod;
rpcInit.retryStepFactor = tsRedirectFactor;
rpcInit.retryMaxInterval = tsRedirectMaxPeriod;
rpcInit.retryMaxTimouet = tsMaxRetryWaitTime;
rpcInit.retryMaxTimeout = tsMaxRetryWaitTime;
rpcInit.failFastInterval = 5000; // interval threshold(ms)
rpcInit.failFastThreshold = 3; // failed threshold

View File

@ -331,6 +331,9 @@ int32_t streamStateSnapWriterClose(SStreamStateWriter* pWriter, int8_t rollback)
int32_t streamStateSnapWrite(SStreamStateWriter* pWriter, uint8_t* pData, uint32_t nData);
int32_t streamStateRebuildFromSnap(SStreamStateWriter* pWriter, char* path, int64_t chkpId);
int32_t streamStateLoadTasks(SStreamStateWriter* pWriter);
// SStreamTaskReader ======================================
// SStreamStateWriter =====================================
// SStreamStateReader =====================================

View File

@ -47,11 +47,11 @@ int32_t streamStateSnapReaderOpen(STQ* pTq, int64_t sver, int64_t ever, SStreamS
pReader->sver = sver;
pReader->ever = ever;
int64_t checkpointId = meta ? meta->checkpointId : 0;
int64_t chkpId = meta ? meta->chkpId : 0;
SStreamSnapReader* pSnapReader = NULL;
if (streamSnapReaderOpen(pTq, sver, checkpointId, pTq->path, &pSnapReader) == 0) {
if (streamSnapReaderOpen(pTq, sver, chkpId, pTq->path, &pSnapReader) == 0) {
pReader->complete = 1;
} else {
code = -1;
@ -163,7 +163,20 @@ int32_t streamStateSnapWriterClose(SStreamStateWriter* pWriter, int8_t rollback)
return code;
}
int32_t streamStateRebuildFromSnap(SStreamStateWriter* pWriter, char* path, int64_t chkpId) {
return streamStateRebuild(pWriter->pTq->pStreamMeta, path, chkpId);
int32_t code = streamMetaReopen(pWriter->pTq->pStreamMeta, chkpId);
if (code == 0) {
code = streamStateLoadTasks(pWriter);
}
return code;
}
int32_t streamStateLoadTasksImpl(SStreamMeta* pMeta, int64_t ver) {
// impl later
return streamLoadTasks(pMeta, ver);
}
int32_t streamStateLoadTasks(SStreamStateWriter* pWriter) {
SWal* pWal = pWriter->pTq->pVnode->pWal;
return streamStateLoadTasksImpl(pWriter->pTq->pStreamMeta, walGetCommittedVer(pWal));
}
int32_t streamStateSnapWrite(SStreamStateWriter* pWriter, uint8_t* pData, uint32_t nData) {

View File

@ -409,6 +409,9 @@ int32_t vnodeSnapWriterClose(SVSnapWriter *pWriter, int8_t rollback, SSnapshot *
if (pWriter->pStreamStateWriter) {
code = streamStateSnapWriterClose(pWriter->pStreamStateWriter, rollback);
if (code) goto _exit;
code = streamStateRebuildFromSnap(pWriter->pStreamStateWriter, NULL, 0);
if (code) goto _exit;
}
if (pWriter->pRsmaSnapWriter) {

View File

@ -20,6 +20,27 @@
#include "tcommon.h"
#include "tref.h"
typedef struct {
int8_t init;
char* pCurrent;
char* pManifest;
SArray* pSST;
int64_t preCkptId;
int64_t curChkpId;
char* path;
char* buf;
int32_t len;
// ping-pong buf
SHashObj* pSstTbl[2];
int8_t idx;
SArray* pAdd;
SArray* pDel;
int8_t update;
} SBackendManager;
typedef struct SCompactFilteFactory {
void* status;
} SCompactFilteFactory;
@ -127,6 +148,218 @@ void destroyFunc(void* arg);
int32_t encodeValueFunc(void* value, int32_t vlen, int64_t ttl, char** dest);
int32_t decodeValueFunc(void* value, int32_t vlen, int64_t* ttl, char** dest);
SBackendManager* bkdMgtCreate(char* path) {
SBackendManager* p = taosMemoryCalloc(1, sizeof(SBackendManager));
p->curChkpId = 0;
p->preCkptId = 0;
p->pSST = taosArrayInit(64, sizeof(void*));
p->path = taosStrdup(path);
p->len = strlen(path) + 128;
p->buf = taosMemoryCalloc(1, p->len);
p->idx = 0;
p->pSstTbl[0] = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK);
p->pSstTbl[1] = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK);
p->pAdd = taosArrayInit(64, sizeof(void*));
p->pDel = taosArrayInit(64, sizeof(void*));
p->update = 0;
return p;
}
void bkdMgtDestroy(SBackendManager* bm) {
if (bm == NULL) return;
taosMemoryFree(bm->buf);
taosMemoryFree(bm->path);
taosArrayDestroyP(bm->pSST, taosMemoryFree);
taosArrayDestroyP(bm->pAdd, taosMemoryFree);
taosArrayDestroyP(bm->pDel, taosMemoryFree);
taosHashCleanup(bm->pSstTbl[0]);
taosHashCleanup(bm->pSstTbl[1]);
taosMemoryFree(bm);
}
int32_t compareHashTableImpl(SHashObj* p1, SHashObj* p2, SArray* diff) {
int32_t code = 0;
size_t len = 0;
void* pIter = taosHashIterate(p2, NULL);
while (pIter) {
char* name = taosHashGetKey(pIter, &len);
if (!taosHashGet(p1, name, len)) {
char* p = taosStrdup(name);
taosArrayPush(diff, &p);
}
pIter = taosHashIterate(p2, pIter);
}
return code;
}
int32_t compareHashTable(SHashObj* p1, SHashObj* p2, SArray* add, SArray* del) {
int32_t code = 0;
code = compareHashTableImpl(p1, p2, add);
code = compareHashTableImpl(p2, p1, del);
return code;
}
int32_t bkdMgtGetDelta(SBackendManager* bm, int64_t chkpId, SArray* list) {
const char* pCurrent = "CURRENT";
int32_t currLen = strlen(pCurrent);
const char* pManifest = "MANIFEST-";
int32_t maniLen = strlen(pManifest);
const char* pSST = ".sst";
int32_t sstLen = strlen(pSST);
memset(bm->buf, 0, bm->len);
sprintf(bm->buf, "%s%scheckpoint%" PRId64 "", bm->path, TD_DIRSEP, chkpId);
taosArrayClearP(bm->pAdd, taosMemoryFree);
taosArrayClearP(bm->pDel, taosMemoryFree);
TdDirPtr pDir = taosOpenDir(bm->buf);
TdDirEntryPtr de = NULL;
int8_t dummy = 0;
while ((de = taosReadDir(pDir)) != NULL) {
char* name = taosGetDirEntryName(de);
if (strcmp(name, ".") == 0 || strcmp(name, "..") == 0) continue;
if (strlen(name) == currLen && strcmp(name, pCurrent) == 0) {
taosMemoryFreeClear(bm->pCurrent);
bm->pCurrent = taosStrdup(name);
taosHashPut(bm->pSstTbl[1 - bm->idx], name, strlen(name), &dummy, sizeof(dummy));
continue;
}
if (strlen(name) >= maniLen && strncmp(name, pManifest, maniLen) == 0) {
taosMemoryFreeClear(bm->pManifest);
bm->pManifest = taosStrdup(name);
taosHashPut(bm->pSstTbl[1 - bm->idx], name, strlen(name), &dummy, sizeof(dummy));
continue;
}
if (strlen(name) >= sstLen && strncmp(name + strlen(name) - 4, pSST, sstLen) == 0) {
char* p = taosStrdup(name);
taosHashPut(bm->pSstTbl[1 - bm->idx], name, strlen(name), &dummy, sizeof(dummy));
continue;
}
}
if (bm->init == 0) {
bm->preCkptId = -1;
bm->curChkpId = chkpId;
bm->init = 1;
void* pIter = taosHashIterate(bm->pSstTbl[1 - bm->idx], NULL);
while (pIter) {
size_t len;
char* name = taosHashGetKey(pIter, &len);
if (name != NULL && len != 0) {
taosArrayPush(bm->pAdd, &name);
}
pIter = taosHashIterate(bm->pSstTbl[1 - bm->idx], pIter);
}
if (taosArrayGetSize(bm->pAdd) > 0) bm->update = 1;
} else {
int32_t code = compareHashTable(bm->pSstTbl[bm->idx], bm->pSstTbl[1 - bm->idx], bm->pAdd, bm->pDel);
if (code != 0) {
// dead code
taosArrayClearP(bm->pAdd, taosMemoryFree);
taosArrayClearP(bm->pDel, taosMemoryFree);
taosHashClear(bm->pSstTbl[1 - bm->idx]);
bm->update = 0;
return code;
}
bm->preCkptId = bm->curChkpId;
bm->curChkpId = chkpId;
if (taosArrayGetSize(bm->pAdd) == 0 && taosArrayGetSize(bm->pDel) == 0) {
bm->update = 0;
}
}
taosHashClear(bm->pSstTbl[bm->idx]);
bm->idx = 1 - bm->idx;
return 0;
}
int32_t bkdMgtDumpTo(SBackendManager* bm, char* dname) {
int32_t code = 0;
int32_t len = bm->len + 128;
char* dstBuf = taosMemoryCalloc(1, len);
char* srcBuf = taosMemoryCalloc(1, len);
char* srcDir = taosMemoryCalloc(1, len);
char* dstDir = taosMemoryCalloc(1, len);
sprintf(srcDir, "%s%s%s%" PRId64 "", bm->path, TD_DIRSEP, "checkpoint", bm->curChkpId);
sprintf(dstDir, "%s%s%s", bm->path, TD_DIRSEP, dname);
if (!taosDirExist(srcDir)) {
return 0;
}
code = taosMkDir(dstDir);
if (code != 0) {
return code;
}
// clear current file
memset(dstBuf, 0, len);
sprintf(dstBuf, "%s%s%s", dstDir, TD_DIRSEP, bm->pCurrent);
taosRemoveFile(dstBuf);
memset(dstBuf, 0, len);
sprintf(dstBuf, "%s%s%s", dstDir, TD_DIRSEP, bm->pManifest);
taosRemoveFile(dstBuf);
// add file to $name dir
for (int i = 0; i < taosArrayGetSize(bm->pAdd); i++) {
memset(dstBuf, 0, len);
memset(srcBuf, 0, len);
char* filename = taosArrayGetP(bm->pAdd, i);
sprintf(srcBuf, "%s%s%s", srcDir, TD_DIRSEP, filename);
sprintf(dstBuf, "%s%s%s", dstDir, TD_DIRSEP, filename);
taosCopyFile(srcBuf, dstBuf);
}
// del file in $name
for (int i = 0; i < taosArrayGetSize(bm->pDel); i++) {
memset(dstBuf, 0, len);
memset(srcBuf, 0, len);
char* filename = taosArrayGetP(bm->pDel, i);
sprintf(dstBuf, "%s%s%s", dstDir, TD_DIRSEP, filename);
taosRemoveFile(dstBuf);
}
// copy current file to dst dir
memset(srcBuf, 0, len);
memset(dstBuf, 0, len);
sprintf(srcBuf, "%s%s%s", srcDir, TD_DIRSEP, bm->pCurrent);
sprintf(dstBuf, "%s%s%s", dstDir, TD_DIRSEP, bm->pCurrent);
taosCopyFile(srcBuf, dstBuf);
// copy manifest file to dst dir
memset(srcBuf, 0, len);
memset(dstBuf, 0, len);
sprintf(srcBuf, "%s%s%s", srcDir, TD_DIRSEP, bm->pManifest);
sprintf(dstBuf, "%s%s%s", dstDir, TD_DIRSEP, bm->pManifest);
taosCopyFile(srcBuf, dstBuf);
// clear delta data buf
taosArrayClearP(bm->pAdd, taosMemoryFree);
taosArrayClearP(bm->pDel, taosMemoryFree);
taosMemoryFree(srcBuf);
taosMemoryFree(dstBuf);
taosMemoryFree(srcDir);
taosMemoryFree(dstDir);
return code;
}
SCfInit ginitDict[] = {
{"default", 7, 0, defaultKeyComp, defaultKeyEncode, defaultKeyDecode, defaultKeyToString, compareDefaultName,
destroyFunc, encodeValueFunc, decodeValueFunc},
@ -151,8 +384,8 @@ int32_t copyFiles(const char* src, const char* dst) {
// opt later, just hard link
int32_t sLen = strlen(src);
int32_t dLen = strlen(dst);
char* absSrcPath = taosMemoryCalloc(1, sLen + 64);
char* absDstPath = taosMemoryCalloc(1, dLen + 64);
char* srcName = taosMemoryCalloc(1, sLen + 64);
char* dstName = taosMemoryCalloc(1, dLen + 64);
TdDirPtr pDir = taosOpenDir(src);
if (pDir == NULL) return 0;
@ -162,22 +395,22 @@ int32_t copyFiles(const char* src, const char* dst) {
char* name = taosGetDirEntryName(de);
if (strcmp(name, ".") == 0 || strcmp(name, "..") == 0) continue;
sprintf(absSrcPath, "%s%s%s", src, TD_DIRSEP, name);
sprintf(absDstPath, "%s%s%s", dst, TD_DIRSEP, name);
sprintf(srcName, "%s%s%s", src, TD_DIRSEP, name);
sprintf(dstName, "%s%s%s", dst, TD_DIRSEP, name);
if (!taosDirEntryIsDir(de)) {
code = taosCopyFile(absSrcPath, absDstPath);
code = taosCopyFile(srcName, dstName);
if (code == -1) {
goto _err;
}
}
memset(absSrcPath, 0, sLen + 64);
memset(absDstPath, 0, dLen + 64);
memset(srcName, 0, sLen + 64);
memset(dstName, 0, dLen + 64);
}
_err:
taosMemoryFreeClear(absSrcPath);
taosMemoryFreeClear(absDstPath);
taosMemoryFreeClear(srcName);
taosMemoryFreeClear(dstName);
taosCloseDir(&pDir);
return code >= 0 ? 0 : -1;
}
@ -219,6 +452,7 @@ int32_t rebuildDirFromCheckpoint(const char* path, int64_t chkpId, char** dst) {
return 0;
}
void* streamBackendInit(const char* streamPath, int64_t chkpId) {
char* backendPath = NULL;
int32_t code = rebuildDirFromCheckpoint(streamPath, chkpId, &backendPath);
@ -405,74 +639,75 @@ void streamBackendHandleCleanup(void* arg) {
int32_t getLatestCheckpoint(void* arg, int64_t* checkpoint) {
SStreamMeta* pMeta = arg;
taosWLockLatch(&pMeta->checkpointDirLock);
taosWLockLatch(&pMeta->chkpDirLock);
int64_t tc = 0;
int32_t sz = taosArrayGetSize(pMeta->checkpointSaved);
int32_t sz = taosArrayGetSize(pMeta->chkpSaved);
if (sz <= 0) {
taosWUnLockLatch(&pMeta->chkpDirLock);
return -1;
} else {
tc = *(int64_t*)taosArrayGetLast(pMeta->checkpointSaved);
tc = *(int64_t*)taosArrayGetLast(pMeta->chkpSaved);
}
taosArrayPush(pMeta->checkpointInUse, &tc);
taosArrayPush(pMeta->chkpInUse, &tc);
*checkpoint = tc;
taosWUnLockLatch(&pMeta->checkpointDirLock);
taosWUnLockLatch(&pMeta->chkpDirLock);
return 0;
}
/*
* checkpointSave |--cp1--|--cp2--|--cp3--|--cp4--|--cp5--|
* checkpointInUse: |--cp2--|--cp4--|
* checkpointInUse is doing translation, cannot del until
* chkpInUse: |--cp2--|--cp4--|
* chkpInUse is doing translation, cannot del until
* replication is finished
*/
int32_t delObsoleteCheckpoint(void* arg, const char* path) {
SStreamMeta* pMeta = arg;
taosWLockLatch(&pMeta->checkpointDirLock);
taosWLockLatch(&pMeta->chkpDirLock);
SArray* checkpointDel = taosArrayInit(10, sizeof(int64_t));
SArray* checkpointDup = taosArrayInit(10, sizeof(int64_t));
SArray* chkpDel = taosArrayInit(10, sizeof(int64_t));
SArray* chkpDup = taosArrayInit(10, sizeof(int64_t));
int64_t minId = 0;
if (taosArrayGetSize(pMeta->checkpointInUse) >= 1) {
minId = *(int64_t*)taosArrayGet(pMeta->checkpointInUse, 0);
if (taosArrayGetSize(pMeta->chkpInUse) >= 1) {
minId = *(int64_t*)taosArrayGet(pMeta->chkpInUse, 0);
for (int i = 0; i < taosArrayGetSize(pMeta->checkpointSaved); i++) {
int64_t id = *(int64_t*)taosArrayGet(pMeta->checkpointSaved, i);
for (int i = 0; i < taosArrayGetSize(pMeta->chkpSaved); i++) {
int64_t id = *(int64_t*)taosArrayGet(pMeta->chkpSaved, i);
if (id >= minId) {
taosArrayPush(checkpointDup, &id);
taosArrayPush(chkpDup, &id);
} else {
taosArrayPush(checkpointDel, &id);
taosArrayPush(chkpDel, &id);
}
}
} else {
int32_t sz = taosArrayGetSize(pMeta->checkpointSaved);
int32_t dsz = sz - pMeta->checkpointCap; // del size
int32_t sz = taosArrayGetSize(pMeta->chkpSaved);
int32_t dsz = sz - pMeta->chkpCap; // del size
for (int i = 0; i < dsz; i++) {
int64_t id = *(int64_t*)taosArrayGet(pMeta->checkpointSaved, i);
taosArrayPush(checkpointDel, &id);
int64_t id = *(int64_t*)taosArrayGet(pMeta->chkpSaved, i);
taosArrayPush(chkpDel, &id);
}
for (int i = dsz < 0 ? 0 : dsz; i < sz; i++) {
int64_t id = *(int64_t*)taosArrayGet(pMeta->checkpointSaved, i);
taosArrayPush(checkpointDup, &id);
int64_t id = *(int64_t*)taosArrayGet(pMeta->chkpSaved, i);
taosArrayPush(chkpDup, &id);
}
}
taosArrayDestroy(pMeta->checkpointSaved);
pMeta->checkpointSaved = checkpointDup;
taosArrayDestroy(pMeta->chkpSaved);
pMeta->chkpSaved = chkpDup;
taosWUnLockLatch(&pMeta->checkpointDirLock);
taosWUnLockLatch(&pMeta->chkpDirLock);
for (int i = 0; i < taosArrayGetSize(checkpointDel); i++) {
int64_t id = *(int64_t*)taosArrayGet(checkpointDel, i);
for (int i = 0; i < taosArrayGetSize(chkpDel); i++) {
int64_t id = *(int64_t*)taosArrayGet(chkpDel, i);
char tbuf[256] = {0};
sprintf(tbuf, "%s%scheckpoint%" PRId64 "", path, TD_DIRSEP, id);
if (taosIsDir(tbuf)) {
taosRemoveDir(tbuf);
}
}
taosArrayDestroy(checkpointDel);
taosArrayDestroy(chkpDel);
return 0;
}
@ -487,16 +722,21 @@ int32_t streamBackendLoadCheckpointInfo(void* arg) {
int32_t code = 0;
int32_t len = strlen(pMeta->path) + 30;
char* checkpointPath = taosMemoryCalloc(1, len);
sprintf(checkpointPath, "%s%s%s", pMeta->path, TD_DIRSEP, "checkpoints");
char* chkpPath = taosMemoryCalloc(1, len);
sprintf(chkpPath, "%s%s%s", pMeta->path, TD_DIRSEP, "checkpoints");
if (!taosDirExist(checkpointPath)) {
if (!taosDirExist(chkpPath)) {
// no checkpoint, nothing to load
taosMemoryFree(chkpPath);
return 0;
}
TdDirPtr pDir = taosOpenDir(checkpointPath);
if (pDir == NULL) return 0;
TdDirPtr pDir = taosOpenDir(chkpPath);
if (pDir == NULL) {
taosMemoryFree(chkpPath);
return 0;
}
TdDirEntryPtr de = NULL;
SArray* suffix = taosArrayInit(4, sizeof(int64_t));
@ -520,12 +760,12 @@ int32_t streamBackendLoadCheckpointInfo(void* arg) {
for (int i = 0; i < taosArrayGetSize(suffix); i++) {
int64_t id = *(int64_t*)taosArrayGet(suffix, i);
taosArrayPush(pMeta->checkpointSaved, &id);
taosArrayPush(pMeta->chkpSaved, &id);
}
taosArrayDestroy(suffix);
taosCloseDir(&pDir);
taosMemoryFree(checkpointPath);
taosMemoryFree(chkpPath);
return 0;
}
int32_t streamBackendDoCheckpoint(void* arg, uint64_t checkpointId) {
@ -572,9 +812,9 @@ int32_t streamBackendDoCheckpoint(void* arg, uint64_t checkpointId) {
}
rocksdb_checkpoint_object_destroy(cp);
}
taosWLockLatch(&pMeta->checkpointDirLock);
taosArrayPush(pMeta->checkpointSaved, &checkpointId);
taosWUnLockLatch(&pMeta->checkpointDirLock);
taosWLockLatch(&pMeta->chkpDirLock);
taosArrayPush(pMeta->chkpSaved, &checkpointId);
taosWUnLockLatch(&pMeta->chkpDirLock);
delObsoleteCheckpoint(arg, path);
@ -624,9 +864,9 @@ int defaultKeyComp(void* state, const char* aBuf, size_t aLen, const char* bBuf,
return ret;
}
}
int streamStateValueIsStale(char* vv) {
int streamStateValueIsStale(char* v) {
int64_t ts = 0;
taosDecodeFixedI64(vv, &ts);
taosDecodeFixedI64(v, &ts);
return (ts != 0 && ts < taosGetTimestampMs()) ? 1 : 0;
}
int iterValueIsStale(rocksdb_iterator_t* iter) {
@ -732,8 +972,8 @@ int stateSessionKeyDBComp(void* state, const char* aBuf, size_t aLen, const char
return stateSessionKeyCmpr(&w1, sizeof(w1), &w2, sizeof(w2));
}
int stateSessionKeyEncode(void* ses, char* buf) {
SStateSessionKey* sess = ses;
int stateSessionKeyEncode(void* k, char* buf) {
SStateSessionKey* sess = k;
int len = 0;
len += taosEncodeFixedI64((void**)&buf, sess->key.win.skey);
len += taosEncodeFixedI64((void**)&buf, sess->key.win.ekey);
@ -741,8 +981,8 @@ int stateSessionKeyEncode(void* ses, char* buf) {
len += taosEncodeFixedI64((void**)&buf, sess->opNum);
return len;
}
int stateSessionKeyDecode(void* ses, char* buf) {
SStateSessionKey* sess = ses;
int stateSessionKeyDecode(void* k, char* buf) {
SStateSessionKey* sess = k;
int len = 0;
char* p = buf;
@ -957,33 +1197,23 @@ int32_t decodeValueFunc(void* value, int32_t vlen, int64_t* ttl, char** dest) {
SStreamValue key = {0};
char* p = value;
if (streamStateValueIsStale(p)) {
if (dest != NULL) *dest = NULL;
return -1;
goto _EXCEPT;
}
p = taosDecodeFixedI64(p, &key.unixTimestamp);
p = taosDecodeFixedI32(p, &key.len);
if (vlen != (sizeof(int64_t) + sizeof(int32_t) + key.len)) {
if (dest != NULL) *dest = NULL;
qError("vlen: %d, read len: %d", vlen, key.len);
return -1;
goto _EXCEPT;
}
if (key.len != 0 && dest != NULL) p = taosDecodeBinary(p, (void**)dest, key.len);
if (key.len == 0) {
key.data = NULL;
} else {
p = taosDecodeBinary(p, (void**)&(key.data), key.len);
}
if (ttl != NULL) {
int64_t now = taosGetTimestampMs();
*ttl = key.unixTimestamp == 0 ? 0 : key.unixTimestamp - now;
}
if (dest != NULL) {
*dest = key.data;
} else {
taosMemoryFree(key.data);
}
if (ttl != NULL) *ttl = key.unixTimestamp == 0 ? 0 : key.unixTimestamp - taosGetTimestampMs();
return key.len;
_EXCEPT:
if (dest != NULL) *dest = NULL;
if (ttl != NULL) *ttl = 0;
return -1;
}
const char* compareDefaultName(void* arg) {
@ -1097,15 +1327,17 @@ int32_t streamStateOpenBackendCf(void* backend, char* name, char** cfs, int32_t
} else {
qDebug("succ to open rocksdb cf");
}
// close default cf
// close default cf and destroy default cfOpts
if (((rocksdb_column_family_handle_t**)cfHandle)[0] != 0) rocksdb_column_family_handle_destroy(cfHandle[0]);
rocksdb_options_destroy(cfOpts[0]);
handle->db = db;
static int32_t cfLen = sizeof(ginitDict) / sizeof(ginitDict[0]);
for (int i = 0; i < nCf; i++) {
char* cf = cfs[i];
if (i == 0) continue;
if (i == 0) continue; // skip default column family, not set opt
char funcname[64] = {0};
if (3 == sscanf(cf, "0x%" PRIx64 "-%d_%s", &streamId, &taskId, funcname)) {
char idstr[128] = {0};
@ -1127,7 +1359,6 @@ int32_t streamStateOpenBackendCf(void* backend, char* name, char** cfs, int32_t
inst->pCompares = taosMemoryCalloc(cfLen, sizeof(rocksdb_comparator_t*));
inst->dbOpt = handle->dbOpt;
// rocksdb_writeoptions_disable_WAL(inst->wOpt, 1);
taosHashPut(handle->cfInst, idstr, strlen(idstr) + 1, &inst, sizeof(void*));
} else {
inst = *pInst;
@ -1181,8 +1412,8 @@ int streamStateOpenBackend(void* backend, SStreamState* pState) {
taosAcquireRef(streamBackendId, pState->streamBackendRid);
SBackendWrapper* handle = backend;
SBackendCfWrapper* pBackendCfWrapper = taosMemoryCalloc(1, sizeof(SBackendCfWrapper));
taosThreadMutexLock(&handle->cfMutex);
taosThreadMutexLock(&handle->cfMutex);
RocksdbCfInst** ppInst = taosHashGet(handle->cfInst, pState->pTdbState->idstr, strlen(pState->pTdbState->idstr) + 1);
if (ppInst != NULL && *ppInst != NULL) {
RocksdbCfInst* inst = *ppInst;
@ -2355,9 +2586,7 @@ void* streamDefaultIterCreate_rocksdb(SStreamState* pState) {
}
int32_t streamDefaultIterValid_rocksdb(void* iter) {
SStreamStateCur* pCur = iter;
bool val = rocksdb_iter_valid(pCur->iter);
return val ? 1 : 0;
return rocksdb_iter_valid(pCur->iter) ? 1 : 0;
}
void streamDefaultIterSeek_rocksdb(void* iter, const char* key) {
SStreamStateCur* pCur = iter;
@ -2373,13 +2602,16 @@ char* streamDefaultIterKey_rocksdb(void* iter, int32_t* len) {
}
char* streamDefaultIterVal_rocksdb(void* iter, int32_t* len) {
SStreamStateCur* pCur = iter;
char* ret = NULL;
int32_t vlen = 0;
char* dst = NULL;
const char* vval = rocksdb_iter_value(pCur->iter, (size_t*)&vlen);
if (decodeValueFunc((void*)vval, vlen, NULL, &dst) < 0) {
const char* val = rocksdb_iter_value(pCur->iter, (size_t*)&vlen);
*len = decodeValueFunc((void*)val, vlen, NULL, &ret);
if (*len < 0) {
return NULL;
}
return dst;
return ret;
}
// batch func
void* streamStateCreateBatch() {
@ -2434,6 +2666,7 @@ int32_t streamStatePutBatchOptimize(SStreamState* pState, int32_t cfIdx, rocksdb
if (tmpBuf == NULL) {
taosMemoryFree(ttlV);
}
{
char tbuf[256] = {0};
ginitDict[cfIdx].toStrFunc((void*)key, tbuf);

View File

@ -36,14 +36,14 @@ void streamMetaCleanup() {
taosCloseRef(streamBackendCfWrapperId);
}
int32_t streamStateRebuild(SStreamMeta* pMeta, char* path, int64_t chkpId) {
int32_t code = 0;
// int32_t streamStateRebuild(SStreamMeta* pMeta, char* path, int64_t chkpId) {
// int32_t code = 0;
int32_t nTask = taosHashGetSize(pMeta->pTasks);
assert(nTask == 0);
// int32_t nTask = taosHashGetSize(pMeta->pTasks);
// assert(nTask == 0);
return code;
}
// return code;
// }
SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandFunc, int32_t vgId) {
int32_t code = -1;
SStreamMeta* pMeta = taosMemoryCalloc(1, sizeof(SStreamMeta));
@ -91,12 +91,13 @@ SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandF
pMeta->pTaskBackendUnique =
taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK);
pMeta->checkpointSaved = taosArrayInit(4, sizeof(int64_t));
pMeta->checkpointInUse = taosArrayInit(4, sizeof(int64_t));
pMeta->checkpointCap = 8;
taosInitRWLatch(&pMeta->checkpointDirLock);
pMeta->chkpSaved = taosArrayInit(4, sizeof(int64_t));
pMeta->chkpInUse = taosArrayInit(4, sizeof(int64_t));
pMeta->chkpCap = 8;
taosInitRWLatch(&pMeta->chkpDirLock);
int64_t chkpId = streamGetLatestCheckpointId(pMeta);
pMeta->chkpId = chkpId;
pMeta->streamBackend = streamBackendInit(pMeta->path, chkpId);
if (pMeta->streamBackend == NULL) {
@ -109,7 +110,6 @@ SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandF
terrno = TAOS_SYSTEM_ERROR(code);
goto _err;
}
taosInitRWLatch(&pMeta->lock);
taosThreadMutexInit(&pMeta->backendMutex, NULL);
@ -128,48 +128,65 @@ _err:
return NULL;
}
void streamMetaReopen(SStreamMeta** ppMeta) {
SStreamMeta* pMeta = *ppMeta;
int32_t streamMetaReopen(SStreamMeta* pMeta, int64_t chkpId) {
// stop all running tasking and reopen later
void* pIter = NULL;
while (1) {
pIter = taosHashIterate(pMeta->pTasks, pIter);
if (pIter == NULL) {
break;
}
SStreamMeta* pNewMeta = taosMemoryCalloc(1, sizeof(SStreamMeta));
pNewMeta->path = taosStrdup(pMeta->path);
pNewMeta->vgId = pMeta->vgId;
pNewMeta->walScanCounter = 0;
pNewMeta->ahandle = pMeta->ahandle;
pNewMeta->expandFunc = pMeta->expandFunc;
SStreamTask* pTask = *(SStreamTask**)pIter;
if (pTask->schedTimer) {
taosTmrStop(pTask->schedTimer);
pTask->schedTimer = NULL;
}
*ppMeta = pNewMeta;
if (pTask->launchTaskTimer) {
taosTmrStop(pTask->launchTaskTimer);
pTask->launchTaskTimer = NULL;
}
streamMetaClose(pMeta);
tFreeStreamTask(pTask);
}
// tdbAbort(pMeta->db, pMeta->txn);
// tdbTbClose(pMeta->pTaskDb);
// tdbTbClose(pMeta->pCheckpointDb);
// tdbClose(pMeta->db);
// close stream backend
streamBackendCleanup(pMeta->streamBackend);
taosRemoveRef(streamBackendId, pMeta->streamBackendRid);
pMeta->streamBackendRid = -1;
pMeta->streamBackend = NULL;
// void* pIter = NULL;
// while (1) {
// pIter = taosHashIterate(pMeta->pTasks, pIter);
// if (pIter == NULL) {
// break;
// }
char* defaultPath = taosMemoryCalloc(1, strlen(pMeta->path) + 64);
sprintf(defaultPath, "%s%s%s", pMeta->path, TD_DIRSEP, "state");
taosRemoveDir(defaultPath);
// SStreamTask* pTask = *(SStreamTask**)pIter;
// if (pTask->schedTimer) {
// taosTmrStop(pTask->schedTimer);
// pTask->schedTimer = NULL;
// }
char* newPath = taosMemoryCalloc(1, strlen(pMeta->path) + 64);
sprintf(newPath, "%s%s%s", pMeta->path, TD_DIRSEP, "received");
// if (pTask->launchTaskTimer) {
// taosTmrStop(pTask->launchTaskTimer);
// pTask->launchTaskTimer = NULL;
// }
if (taosRenameFile(newPath, defaultPath) < 0) {
taosMemoryFree(defaultPath);
taosMemoryFree(newPath);
return -1;
}
// tFreeStreamTask(pTask);
// }
pMeta->streamBackend = streamBackendInit(pMeta->path, 0);
if (pMeta->streamBackend == NULL) {
return -1;
}
pMeta->streamBackendRid = taosAddRef(streamBackendId, pMeta->streamBackend);
// taosHashClear(pMeta->pTasks);
// taosRemoveRef(streamBackendId, pMeta->streamBackendRid);
taosHashClear(pMeta->pTasks);
taosArrayClear(pMeta->pTaskList);
taosHashClear(pMeta->pTaskBackendUnique);
taosArrayClear(pMeta->chkpSaved);
taosArrayClear(pMeta->chkpInUse);
return 0;
}
void streamMetaClose(SStreamMeta* pMeta) {
tdbAbort(pMeta->db, pMeta->txn);
@ -205,8 +222,8 @@ void streamMetaClose(SStreamMeta* pMeta) {
taosThreadMutexDestroy(&pMeta->backendMutex);
taosHashCleanup(pMeta->pTaskBackendUnique);
taosArrayDestroy(pMeta->checkpointSaved);
taosArrayDestroy(pMeta->checkpointInUse);
taosArrayDestroy(pMeta->chkpSaved);
taosArrayDestroy(pMeta->chkpInUse);
taosMemoryFree(pMeta);
}

View File

@ -53,7 +53,7 @@ typedef struct {
int32_t retryMinInterval; // retry init interval
int32_t retryStepFactor; // retry interval factor
int32_t retryMaxInterval; // retry max interval
int32_t retryMaxTimouet;
int32_t retryMaxTimeout;
int32_t failFastThreshold;
int32_t failFastInterval;

View File

@ -55,7 +55,7 @@ void* rpcOpen(const SRpcInit* pInit) {
pRpc->retryMinInterval = pInit->retryMinInterval; // retry init interval
pRpc->retryStepFactor = pInit->retryStepFactor;
pRpc->retryMaxInterval = pInit->retryMaxInterval;
pRpc->retryMaxTimouet = pInit->retryMaxTimouet;
pRpc->retryMaxTimeout = pInit->retryMaxTimeout;
pRpc->failFastThreshold = pInit->failFastThreshold;
pRpc->failFastInterval = pInit->failFastInterval;

View File

@ -2256,7 +2256,7 @@ bool cliGenRetryRule(SCliConn* pConn, STransMsg* pResp, SCliMsg* pMsg) {
pCtx->retryMinInterval = pTransInst->retryMinInterval;
pCtx->retryMaxInterval = pTransInst->retryMaxInterval;
pCtx->retryStepFactor = pTransInst->retryStepFactor;
pCtx->retryMaxTimeout = pTransInst->retryMaxTimouet;
pCtx->retryMaxTimeout = pTransInst->retryMaxTimeout;
pCtx->retryInitTimestamp = taosGetTimestampMs();
pCtx->retryNextInterval = pCtx->retryMinInterval;
pCtx->retryStep = 0;

View File

@ -191,7 +191,7 @@ void* taosArrayGet(const SArray* pArray, size_t index) {
}
if (index >= pArray->size) {
uError("index is out of range, current:%"PRIzu" max:%d", index, pArray->capacity);
uError("index is out of range, current:%" PRIzu " max:%d", index, pArray->capacity);
return NULL;
}
@ -360,6 +360,23 @@ void taosArrayClearEx(SArray* pArray, void (*fp)(void*)) {
pArray->size = 0;
}
void taosArrayClearP(SArray* pArray, void (*fp)(void*)) {
// if (pArray == NULL) return;
// if (fp == NULL) {
// pArray->size = 0;
// return;
// }
// for (int32_t i = 0; i < pArray->size; ++i) {
// fp(TARRAY_GET_ELEM(pArray, i));
// }
if (pArray) {
for (int32_t i = 0; i < pArray->size; i++) {
fp(*(void**)TARRAY_GET_ELEM(pArray, i));
}
}
taosArrayClear(pArray);
}
void* taosArrayDestroy(SArray* pArray) {
if (pArray) {
@ -492,7 +509,7 @@ void* taosDecodeArray(const void* buf, SArray** pArray, FDecode decode, int32_t
// order array<type *>
void taosArraySortPWithExt(SArray* pArray, __ext_compar_fn_t fn, const void* param) {
taosqsort(pArray->pData, pArray->size, pArray->elemSize, param, fn);
// taosArrayGetSize(pArray) > 8 ? taosArrayQuickSort(pArray, fn, param) : taosArrayInsertSort(pArray, fn, param);
// taosArrayGetSize(pArray) > 8 ? taosArrayQuickSort(pArray, fn, param) : taosArrayInsertSort(pArray, fn, param);
}
void taosArraySwap(SArray* a, SArray* b) {