fix transfer crash

This commit is contained in:
yihaoDeng 2023-10-17 18:19:52 +08:00
parent 77542cc461
commit 2004c1a346
9 changed files with 58 additions and 32 deletions

View File

@ -727,7 +727,7 @@ int32_t streamMetaCommit(SStreamMeta* pMeta);
int32_t streamMetaLoadAllTasks(SStreamMeta* pMeta);
int32_t streamMetaReloadAllTasks(SStreamMeta* pMeta);
void streamMetaNotifyClose(SStreamMeta* pMeta);
void* streamMetaGetBackendByTaskKey(SStreamMeta* pMeta, char* key, int64_t chkpId, int64_t* ref);
int32_t streamTaskSetDb(SStreamMeta* pMeta, void* pTask);
// checkpoint
int32_t streamProcessCheckpointSourceReq(SStreamTask* pTask, SStreamCheckpointSourceReq* pReq);

View File

@ -738,11 +738,9 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver) {
int32_t code = streamTaskInit(pTask, pTq->pStreamMeta, &pTq->pVnode->msgCb, ver);
if (code != TSDB_CODE_SUCCESS) return code;
pTask->pBackend = streamMetaGetBackendByTaskKey(pTq->pStreamMeta, (char*)pTask->id.idStr, pTask->checkpointingId,
&pTask->backendRefId);
// code = streamTaskSetDb(pTq->pStreamMeta, pTask);
// taskDbUpdateChkpId(pTask->pBackend, pTask->checkpointingId);
if (pTask->pBackend == NULL) return -1;
// if (pTask->pBackend == NULL) return -1;
streamTaskOpenAllUpstreamInput(pTask);

View File

@ -242,11 +242,8 @@ int32_t streamTaskSnapWrite(SStreamTaskWriter* pWriter, uint8_t* pData, uint32_t
goto _err;
}
tDecoderClear(&decoder);
// tdbTbInsert(TTB *pTb, const void *pKey, int keyLen, const void *pVal, int valLen, TXN *pTxn)
taosWLockLatch(&pTq->pStreamMeta->lock);
int64_t key[2] = {task.streamId, task.taskId};
taosWLockLatch(&pTq->pStreamMeta->lock);
if (tdbTbUpsert(pTq->pStreamMeta->pTaskDb, key, sizeof(int64_t) << 1, (uint8_t*)pData + sizeof(SSnapDataHdr),
nData - sizeof(SSnapDataHdr), pTq->pStreamMeta->txn) < 0) {

View File

@ -68,6 +68,9 @@ typedef struct {
char* path;
int64_t refId;
void* pTask;
int64_t streamId;
int64_t taskId;
int64_t chkpId;
SArray* chkpSaved;
SArray* chkpInUse;
@ -184,7 +187,7 @@ int32_t streamBackendTriggerChkp(void* pMeta, char* dst);
int32_t streamBackendAddInUseChkp(void* arg, int64_t chkpId);
int32_t streamBackendDelInUseChkp(void* arg, int64_t chkpId);
int32_t taskDbBuildSnap(void* arg);
int32_t taskDbBuildSnap(void* arg, SArray* pSnap);
// int32_t streamDefaultIter_rocksdb(SStreamState* pState, const void* start, const void* end, SArray* result);
#endif

View File

@ -44,6 +44,7 @@ typedef struct {
int64_t streamId;
int64_t taskId;
int64_t chkpId;
char* dbPrefixPath;
} SStreamTaskSnap;
extern SStreamGlobalEnv streamEnv;
extern int32_t streamBackendId;

View File

@ -1004,7 +1004,7 @@ int32_t chkpPreBuildDir(char* path, int64_t chkpId, char** chkpDir, char** chkpI
return 0;
}
int32_t taskDbBuildSnap(void* arg) {
int32_t taskDbBuildSnap(void* arg, SArray* pSnap) {
SStreamMeta* pMeta = arg;
void* pIter = taosHashIterate(pMeta->pTaskDbUnique, NULL);
int32_t code = 0;
@ -1017,6 +1017,13 @@ int32_t taskDbBuildSnap(void* arg) {
taskDbRemoveRef(pTaskDb);
pIter = taosHashIterate(pMeta->pTaskDbUnique, pIter);
SStreamTask* pTask = pTaskDb->pTask;
SStreamTaskSnap snap = {.streamId = pTask->id.streamId,
.taskId = pTask->id.taskId,
.chkpId = pTaskDb->chkpId,
.dbPrefixPath = taosStrdup(pTaskDb->path)};
taosArrayPush(pSnap, &snap);
}
return 0;
}

View File

@ -222,28 +222,40 @@ int32_t streamMetaMayCvtDbFormat(SStreamMeta* pMeta) {
return 0;
}
void* streamMetaGetBackendByTaskKey(SStreamMeta* pMeta, char* key, int64_t chkpId, int64_t* ref) {
int32_t streamTaskSetDb(SStreamMeta* pMeta, void* arg) {
SStreamTask* pTask = arg;
char* key = (char*)pTask->id.idStr;
int64_t chkpId = pTask->checkpointingId;
taosThreadMutexLock(&pMeta->backendMutex);
void** ppBackend = taosHashGet(pMeta->pTaskDbUnique, key, strlen(key));
if (ppBackend != NULL && *ppBackend != NULL) {
taskDbAddRef(*ppBackend);
*ref = ((STaskDbWrapper*)*ppBackend)->refId;
STaskDbWrapper* pBackend = *ppBackend;
pTask->backendRefId = pBackend->refId;
pTask->pBackend = pBackend;
taosThreadMutexUnlock(&pMeta->backendMutex);
return *ppBackend;
return 0;
}
void* pBackend = taskDbOpen(pMeta->path, key, chkpId);
STaskDbWrapper* pBackend = taskDbOpen(pMeta->path, key, chkpId);
if (pBackend == NULL) {
taosThreadMutexUnlock(&pMeta->backendMutex);
return NULL;
return -1;
}
int64_t tref = taosAddRef(taskDbWrapperId, pBackend);
*ref = tref;
((STaskDbWrapper*)pBackend)->refId = tref;
pTask->backendRefId = tref;
pTask->pBackend = pBackend;
pBackend->refId = tref;
pBackend->pTask = pTask;
taosHashPut(pMeta->pTaskDbUnique, key, strlen(key), &pBackend, sizeof(void*));
taosThreadMutexUnlock(&pMeta->backendMutex);
return pBackend;
return 0;
}
SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandFunc, int32_t vgId, int64_t stage) {
int32_t code = -1;

View File

@ -139,7 +139,7 @@ TdFilePtr streamOpenFile(char* path, char* name, int32_t opt) {
return taosOpenFile(fullname, opt);
}
int32_t streamTaskDbGetSnapInfo(void* arg, char* path) { return taskDbBuildSnap(arg); }
int32_t streamTaskDbGetSnapInfo(void* arg, char* path, SArray* pSnap) { return taskDbBuildSnap(arg, pSnap); }
void snapFileDebugInfo(SBackendSnapFile2* pSnapFile) {
if (qDebugFlag & DEBUG_DEBUG) {
@ -236,16 +236,13 @@ int32_t snapFileReadMeta(SBackendSnapFile2* pSnapFile) {
return 0;
}
int32_t streamBackendSnapInitFile(char* metaPath, SStreamTaskSnap* pSnap, SBackendSnapFile2* pSnapFile) {
// SBanckendFile* pFile = taosMemoryCalloc(1, sizeof(SBanckendFile));
int32_t code = -1;
char* path = taosMemoryCalloc(1, strlen(metaPath) + 256);
char idstr[64] = {0};
sprintf(idstr, "0x%" PRIx64 "-0x%x", pSnap->streamId, (int32_t)(pSnap->taskId));
sprintf(path, "%s%s%s%s%s%s%s%" PRId64 "", metaPath, TD_DIRSEP, idstr, TD_DIRSEP, "checkpoints", TD_DIRSEP,
"checkpoint", pSnap->chkpId);
if (taosIsDir(path)) {
char* path = taosMemoryCalloc(1, strlen(pSnap->dbPrefixPath) + 256);
// char idstr[64] = {0};
sprintf(path, "%s%s%s%s%s%" PRId64 "", pSnap->dbPrefixPath, TD_DIRSEP, "checkpoints", TD_DIRSEP, "checkpoint",
pSnap->chkpId);
if (!taosIsDir(path)) {
goto _ERROR;
}
@ -261,7 +258,7 @@ int32_t streamBackendSnapInitFile(char* metaPath, SStreamTaskSnap* pSnap, SBacke
}
snapFileDebugInfo(pSnapFile);
path = NULL;
code = 0;
_ERROR:
@ -294,8 +291,8 @@ void snapFileDestroy(SBackendSnapFile2* pSnap) {
int32_t streamSnapHandleInit(SStreamSnapHandle* pHandle, char* path, void* pMeta) {
// impl later
SArray* pSnapSet = NULL;
int32_t code = streamTaskDbGetSnapInfo(pMeta, path);
SArray* pSnapSet = taosArrayInit(4, sizeof(SStreamTaskSnap));
int32_t code = streamTaskDbGetSnapInfo(pMeta, path, pSnapSet);
if (code != 0) {
return -1;
}
@ -310,6 +307,11 @@ int32_t streamSnapHandleInit(SStreamSnapHandle* pHandle, char* path, void* pMeta
ASSERT(code == 0);
taosArrayPush(pDbSnapSet, &snapFile);
}
for (int i = 0; i < taosArrayGetSize(pSnapSet); i++) {
SStreamTaskSnap* pSnap = taosArrayGet(pSnapSet, i);
taosMemoryFree(pSnap->dbPrefixPath);
}
taosArrayDestroy(pSnapSet);
pHandle->pDbSnapSet = pDbSnapSet;
pHandle->currIdx = 0;
@ -389,6 +391,7 @@ _NEXT:
item->name, (int64_t)pSnapFile->offset, item->size, pSnapFile->currFileIdx);
}
}
item = taosArrayGet(pSnapFile->pFileList, pSnapFile->currFileIdx);
qDebug("%s start to read file %s, current offset:%" PRId64 ", size:%" PRId64 ", file no.%d", STREAM_STATE_TRANSFER,
item->name, (int64_t)pSnapFile->offset, item->size, pSnapFile->currFileIdx);

View File

@ -357,6 +357,7 @@ void tFreeStreamTask(SStreamTask* pTask) {
if (pTask->pBackend) {
taskDbRemoveRef(pTask->pBackend);
pTask->pBackend = NULL;
}
taosThreadMutexDestroy(&pTask->lock);
@ -390,6 +391,10 @@ int32_t streamTaskInit(SStreamTask* pTask, SStreamMeta* pMeta, SMsgCb* pMsgCb, i
streamTaskInitTokenBucket(&pTask->tokenBucket, 150, 100);
taosThreadMutexInit(&pTask->lock, NULL);
if (streamTaskSetDb(pMeta, pTask) != 0) {
return -1;
}
streamTaskOpenAllUpstreamInput(pTask);
return TSDB_CODE_SUCCESS;