enh(tmq): offset commit only when needed
This commit is contained in:
parent
bc469f7f7b
commit
118e5bf5d3
|
@ -17,9 +17,9 @@
|
||||||
#include "tq.h"
|
#include "tq.h"
|
||||||
|
|
||||||
struct STqOffsetStore {
|
struct STqOffsetStore {
|
||||||
char* fname;
|
|
||||||
STQ* pTq;
|
STQ* pTq;
|
||||||
SHashObj* pHash; // SHashObj<subscribeKey, offset>
|
SHashObj* pHash; // SHashObj<subscribeKey, offset>
|
||||||
|
int8_t needCommit;
|
||||||
};
|
};
|
||||||
|
|
||||||
char* tqOffsetBuildFName(const char* path, int32_t fVer) {
|
char* tqOffsetBuildFName(const char* path, int32_t fVer) {
|
||||||
|
@ -74,6 +74,7 @@ STqOffsetStore* tqOffsetOpen(STQ* pTq) {
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
pStore->pTq = pTq;
|
pStore->pTq = pTq;
|
||||||
|
pStore->needCommit = 0;
|
||||||
pTq->pOffsetStore = pStore;
|
pTq->pOffsetStore = pStore;
|
||||||
|
|
||||||
pStore->pHash = taosHashInit(64, MurmurHash3_32, true, HASH_NO_LOCK);
|
pStore->pHash = taosHashInit(64, MurmurHash3_32, true, HASH_NO_LOCK);
|
||||||
|
@ -100,6 +101,7 @@ STqOffset* tqOffsetRead(STqOffsetStore* pStore, const char* subscribeKey) {
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t tqOffsetWrite(STqOffsetStore* pStore, const STqOffset* pOffset) {
|
int32_t tqOffsetWrite(STqOffsetStore* pStore, const STqOffset* pOffset) {
|
||||||
|
pStore->needCommit = 1;
|
||||||
return taosHashPut(pStore->pHash, pOffset->subKey, strlen(pOffset->subKey), pOffset, sizeof(STqOffset));
|
return taosHashPut(pStore->pHash, pOffset->subKey, strlen(pOffset->subKey), pOffset, sizeof(STqOffset));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -108,14 +110,23 @@ int32_t tqOffsetDelete(STqOffsetStore* pStore, const char* subscribeKey) {
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t tqOffsetCommitFile(STqOffsetStore* pStore) {
|
int32_t tqOffsetCommitFile(STqOffsetStore* pStore) {
|
||||||
|
if (!pStore->needCommit) return 0;
|
||||||
// TODO file name should be with a newer version
|
// TODO file name should be with a newer version
|
||||||
char* fname = tqOffsetBuildFName(pStore->pTq->path, 0);
|
char* fname = tqOffsetBuildFName(pStore->pTq->path, 0);
|
||||||
TdFilePtr pFile = taosOpenFile(fname, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_APPEND);
|
TdFilePtr pFile = taosOpenFile(fname, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_APPEND);
|
||||||
taosMemoryFree(fname);
|
|
||||||
if (pFile == NULL) {
|
if (pFile == NULL) {
|
||||||
|
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||||
|
|
||||||
|
int32_t err = terrno;
|
||||||
|
const char* errStr = tstrerror(err);
|
||||||
|
int32_t sysErr = errno;
|
||||||
|
const char* sysErrStr = strerror(errno);
|
||||||
|
tqError("vgId:%d, cannot open file %s when commit offset since %s", pStore->pTq->pVnode->config.vgId, fname,
|
||||||
|
sysErrStr);
|
||||||
ASSERT(0);
|
ASSERT(0);
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
taosMemoryFree(fname);
|
||||||
void* pIter = NULL;
|
void* pIter = NULL;
|
||||||
while (1) {
|
while (1) {
|
||||||
pIter = taosHashIterate(pStore->pHash, pIter);
|
pIter = taosHashIterate(pStore->pHash, pIter);
|
||||||
|
@ -152,5 +163,6 @@ int32_t tqOffsetCommitFile(STqOffsetStore* pStore) {
|
||||||
}
|
}
|
||||||
// close and rename file
|
// close and rename file
|
||||||
taosCloseFile(&pFile);
|
taosCloseFile(&pFile);
|
||||||
|
pStore->needCommit = 0;
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue