fix:[TS-4728]transform strore data if data is too big

This commit is contained in:
wangmm0220 2024-04-29 17:22:40 +08:00
parent 145837ab67
commit 1f85cb42e6
4 changed files with 162 additions and 13 deletions

View File

@ -136,6 +136,7 @@ int32_t tqMetaGetHandle(STQ* pTq, const char* key);
int32_t tqCreateHandle(STQ* pTq, SMqRebVgReq* req, STqHandle* handle); int32_t tqCreateHandle(STQ* pTq, SMqRebVgReq* req, STqHandle* handle);
STqOffsetStore* tqOffsetOpen(STQ* pTq); STqOffsetStore* tqOffsetOpen(STQ* pTq);
int32_t tqMetaTransform(STQ* pTq);
void tqOffsetClose(STqOffsetStore*); void tqOffsetClose(STqOffsetStore*);
STqOffset* tqOffsetRead(STqOffsetStore* pStore, const char* subscribeKey); STqOffset* tqOffsetRead(STqOffsetStore* pStore, const char* subscribeKey);
int32_t tqOffsetWrite(STqOffsetStore* pStore, const STqOffset* pOffset); int32_t tqOffsetWrite(STqOffsetStore* pStore, const STqOffset* pOffset);

View File

@ -86,15 +86,6 @@ STQ* tqOpen(const char* path, SVnode* pVnode) {
} }
int32_t tqInitialize(STQ* pTq) { int32_t tqInitialize(STQ* pTq) {
if (tqMetaOpen(pTq) < 0) {
return -1;
}
pTq->pOffsetStore = tqOffsetOpen(pTq);
if (pTq->pOffsetStore == NULL) {
return -1;
}
int32_t vgId = TD_VID(pTq->pVnode); int32_t vgId = TD_VID(pTq->pVnode);
pTq->pStreamMeta = streamMetaOpen(pTq->path, pTq, (FTaskExpand*)tqExpandTask, vgId, -1, tqStartTaskCompleteCallback); pTq->pStreamMeta = streamMetaOpen(pTq->path, pTq, (FTaskExpand*)tqExpandTask, vgId, -1, tqStartTaskCompleteCallback);
if (pTq->pStreamMeta == NULL) { if (pTq->pStreamMeta == NULL) {
@ -102,6 +93,23 @@ int32_t tqInitialize(STQ* pTq) {
} }
/*int32_t code = */streamMetaLoadAllTasks(pTq->pStreamMeta); /*int32_t code = */streamMetaLoadAllTasks(pTq->pStreamMeta);
if (tqMetaTransform(pTq) < 0) {
return -1;
}
pTq->pOffsetStore = tqOffsetOpen(pTq);
if (pTq->pOffsetStore == NULL) {
return -1;
}
if (tqMetaRestoreCheckInfo(pTq) < 0) {
return -1;
}
pTq->pOffsetStore = tqOffsetOpen(pTq);
if (pTq->pOffsetStore == NULL) {
return -1;
}
return 0; return 0;
} }

View File

@ -88,10 +88,6 @@ int32_t tqMetaOpen(STQ* pTq) {
return -1; return -1;
} }
if (tqMetaRestoreCheckInfo(pTq) < 0) {
return -1;
}
return 0; return 0;
} }
@ -378,6 +374,146 @@ int32_t tqCreateHandle(STQ* pTq, SMqRebVgReq* req, STqHandle* handle){
return taosHashPut(pTq->pHandle, handle->subKey, strlen(handle->subKey), handle, sizeof(STqHandle)); return taosHashPut(pTq->pHandle, handle->subKey, strlen(handle->subKey), handle, sizeof(STqHandle));
} }
static int32_t tqMetaTransformInfo(TDB* pMetaDB, TTB* pExecStoreOld, TTB* pExecStoreNew){
TBC* pCur = NULL;
if (tdbTbcOpen(pExecStoreOld, &pCur, NULL) < 0) {
return -1;
}
TXN* txn;
if (tdbBegin(pMetaDB, &txn, tdbDefaultMalloc, tdbDefaultFree, NULL, TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED) < 0) {
return -1;
}
void* pKey = NULL;
int kLen = 0;
void* pVal = NULL;
int vLen = 0;
tdbTbcMoveToFirst(pCur);
while (tdbTbcNext(pCur, &pKey, &kLen, &pVal, &vLen) == 0) {
if (tdbTbUpsert(pExecStoreNew, pKey, kLen, pVal, vLen, txn) < 0) {
tqError("transform sub info error");
tdbFree(pKey);
tdbFree(pVal);
tdbTbcClose(pCur);
return -1;
}
}
tdbFree(pKey);
tdbFree(pVal);
tdbTbcClose(pCur);
if (tdbCommit(pMetaDB, txn) < 0) {
return -1;
}
if (tdbPostCommit(pMetaDB, txn) < 0) {
return -1;
}
return 0;
}
int32_t tqMetaTransform(STQ* pTq) {
int32_t len = strlen(pTq->path) + 64;
char* maindb = taosMemoryCalloc(1, len);
sprintf(maindb, "%s%s%s", pTq->path, TD_DIRSEP, TDB_MAINDB_NAME);
if(!taosCheckExistFile(maindb)){
taosMemoryFree(maindb);
char* tpath = taosMemoryCalloc(1, len);
if(tpath == NULL){
return -1;
}
sprintf(tpath, "%s%s%s", pTq->path, TD_DIRSEP, "subscribe");
taosMemoryFree(pTq->path);
pTq->path = tpath;
return tqMetaOpen(pTq);
}
int32_t code = 0;
TDB* pMetaDB = NULL;
TTB* pExecStore = NULL;
TTB* pCheckStore = NULL;
char* offsetNew = NULL;
char* offset = tqOffsetBuildFName(pTq->path, 0);
if(offset == NULL){
code = -1;
goto END;
}
if (tdbOpen(pTq->path, 16 * 1024, 1, &pMetaDB, 0, 0, NULL) < 0) {
code = -1;
goto END;
}
if (tdbTbOpen("tq.db", -1, -1, NULL, pMetaDB, &pExecStore, 0) < 0) {
code = -1;
goto END;
}
if (tdbTbOpen("tq.check.db", -1, -1, NULL, pMetaDB, &pCheckStore, 0) < 0) {
code = -1;
goto END;
}
char* tpath = taosMemoryCalloc(1, len);
if(tpath == NULL){
code = -1;
goto END;
}
sprintf(tpath, "%s%s%s", pTq->path, TD_DIRSEP, "subscribe");
taosMemoryFree(pTq->path);
pTq->path = tpath;
if (tqMetaOpen(pTq) < 0) {
code = -1;
goto END;
}
if( tqMetaTransformInfo(pTq->pMetaDB, pExecStore, pTq->pExecStore) < 0){
code = -1;
goto END;
}
if(tqMetaTransformInfo(pTq->pMetaDB, pCheckStore, pTq->pCheckStore) < 0){
code = -1;
goto END;
}
tdbTbClose(pExecStore);
pExecStore = NULL;
tdbTbClose(pCheckStore);
pCheckStore = NULL;
tdbClose(pMetaDB);
pMetaDB = NULL;
offsetNew = tqOffsetBuildFName(pTq->path, 0);
if(offsetNew == NULL){
code = -1;
goto END;
}
if(taosCheckExistFile(offset) && taosCopyFile(offset, offsetNew) < 0){
tqError("copy offset file error");
code = -1;
goto END;
}
taosRemoveFile(maindb);
taosRemoveFile(offset);
END:
taosMemoryFree(maindb);
taosMemoryFree(offset);
taosMemoryFree(offsetNew);
tdbTbClose(pExecStore);
tdbTbClose(pCheckStore);
tdbClose(pMetaDB);
return code;
}
//int32_t tqMetaRestoreHandle(STQ* pTq) { //int32_t tqMetaRestoreHandle(STQ* pTq) {
// int code = 0; // int code = 0;
// TBC* pCur = NULL; // TBC* pCur = NULL;

View File

@ -25,6 +25,10 @@ struct STqOffsetStore {
char* tqOffsetBuildFName(const char* path, int32_t fVer) { char* tqOffsetBuildFName(const char* path, int32_t fVer) {
int32_t len = strlen(path); int32_t len = strlen(path);
char* fname = taosMemoryCalloc(1, len + 40); char* fname = taosMemoryCalloc(1, len + 40);
if(fname == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return NULL;
}
snprintf(fname, len + 40, "%s/offset-ver%d", path, fVer); snprintf(fname, len + 40, "%s/offset-ver%d", path, fVer);
return fname; return fname;
} }