Merge pull request #24094 from taosdata/fix/TD-27905

Fix/td 27905
This commit is contained in:
Haojun Liao 2023-12-16 18:07:17 +08:00 committed by GitHub
commit 9bd80cf0f8
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 31 additions and 7 deletions

View File

@ -79,6 +79,8 @@ typedef struct {
TdThreadRwlock chkpDirLock;
int64_t dataWritten;
void* pMeta;
} STaskDbWrapper;
typedef struct SDbChkp {

View File

@ -87,10 +87,10 @@ struct SStreamQueue {
int8_t status;
};
extern void* streamTimer;
extern int32_t streamBackendId;
extern int32_t streamBackendCfWrapperId;
extern int32_t taskDbWrapperId;
extern void* streamTimer;
extern int32_t streamBackendId;
extern int32_t streamBackendCfWrapperId;
extern int32_t taskDbWrapperId;
void streamRetryDispatchData(SStreamTask* pTask, int64_t waitDuration);
int32_t streamDispatchStreamBlock(SStreamTask* pTask);
@ -156,6 +156,8 @@ void* streamQueueNextItem(SStreamQueue* pQueue);
void streamFreeQitem(SStreamQueueItem* data);
int32_t streamQueueGetItemSize(const SStreamQueue* pQueue);
void streamMetaRemoveDB(void* arg, char* key);
typedef enum UPLOAD_TYPE {
UPLOAD_DISABLE = -1,
UPLOAD_S3 = 0,

View File

@ -982,8 +982,10 @@ int32_t chkpPreBuildDir(char* path, int64_t chkpId, char** chkpDir, char** chkpI
}
int32_t taskDbBuildSnap(void* arg, SArray* pSnap) {
SStreamMeta* pMeta = arg;
void* pIter = taosHashIterate(pMeta->pTaskDbUnique, NULL);
int32_t code = 0;
taosThreadMutexLock(&pMeta->backendMutex);
void* pIter = taosHashIterate(pMeta->pTaskDbUnique, NULL);
int32_t code = 0;
while (pIter) {
STaskDbWrapper* pTaskDb = *(STaskDbWrapper**)pIter;
@ -1000,6 +1002,8 @@ int32_t taskDbBuildSnap(void* arg, SArray* pSnap) {
taosArrayPush(pSnap, &snap);
pIter = taosHashIterate(pMeta->pTaskDbUnique, pIter);
}
taosThreadMutexUnlock(&pMeta->backendMutex);
return code;
}
int32_t streamBackendAddInUseChkp(void* arg, int64_t chkpId) {
@ -1810,6 +1814,10 @@ STaskDbWrapper* taskDbOpen(char* path, char* key, int64_t chkpId) {
void taskDbDestroy(void* pDb, bool flush) {
STaskDbWrapper* wrapper = pDb;
if (wrapper == NULL) return;
streamMetaRemoveDB(wrapper->pMeta, wrapper->idstr);
qDebug("succ to destroy stream backend:%p", wrapper);
int8_t nCf = sizeof(ginitDict) / sizeof(ginitDict[0]);

View File

@ -250,9 +250,11 @@ int32_t streamTaskSetDb(SStreamMeta* pMeta, void* arg, char* key) {
taskDbAddRef(*ppBackend);
STaskDbWrapper* pBackend = *ppBackend;
pBackend->pMeta = pMeta;
pTask->backendRefId = pBackend->refId;
pTask->pBackend = pBackend;
taosThreadMutexUnlock(&pMeta->backendMutex);
stDebug("s-task:0x%x set backend %p", pTask->id.taskId, pBackend);
@ -270,6 +272,7 @@ int32_t streamTaskSetDb(SStreamMeta* pMeta, void* arg, char* key) {
pTask->pBackend = pBackend;
pBackend->refId = tref;
pBackend->pTask = pTask;
pBackend->pMeta = pMeta;
taosHashPut(pMeta->pTaskDbUnique, key, strlen(key), &pBackend, sizeof(void*));
taosThreadMutexUnlock(&pMeta->backendMutex);
@ -277,6 +280,15 @@ int32_t streamTaskSetDb(SStreamMeta* pMeta, void* arg, char* key) {
stDebug("s-task:0x%x set backend %p", pTask->id.taskId, pBackend);
return 0;
}
void streamMetaRemoveDB(void* arg, char* key) {
if (arg == NULL || key == NULL) return;
SStreamMeta* pMeta = arg;
taosThreadMutexLock(&pMeta->backendMutex);
taosHashRemove(pMeta->pTaskDbUnique, key, strlen(key));
taosThreadMutexUnlock(&pMeta->backendMutex);
}
SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandFunc, int32_t vgId, int64_t stage) {
int32_t code = -1;
SStreamMeta* pMeta = taosMemoryCalloc(1, sizeof(SStreamMeta));
@ -794,7 +806,7 @@ static void doClear(void* pKey, void* pVal, TBC* pCur, SArray* pRecycleList) {
}
int32_t streamMetaLoadAllTasks(SStreamMeta* pMeta) {
TBC* pCur = NULL;
TBC* pCur = NULL;
void* pKey = NULL;
int32_t kLen = 0;
void* pVal = NULL;