|
|
|
@ -62,7 +62,12 @@ static void streamMetaEnvInit() {
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
void streamMetaInit() { (void)taosThreadOnce(&streamMetaModuleInit, streamMetaEnvInit); }
|
|
|
|
|
void streamMetaInit() {
|
|
|
|
|
int32_t code = taosThreadOnce(&streamMetaModuleInit, streamMetaEnvInit);
|
|
|
|
|
if (code) {
|
|
|
|
|
stError("failed to init stream Meta model, code:%s", tstrerror(code));
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
void streamMetaCleanup() {
|
|
|
|
|
taosCloseRef(streamBackendId);
|
|
|
|
@ -114,13 +119,17 @@ int32_t metaRefMgtAdd(int64_t vgId, int64_t* rid) {
|
|
|
|
|
|
|
|
|
|
p = taosHashGet(gMetaRefMgt.pTable, &vgId, sizeof(vgId));
|
|
|
|
|
if (p == NULL) {
|
|
|
|
|
SArray* list = taosArrayInit(8, sizeof(void*));
|
|
|
|
|
p = taosArrayPush(list, &rid);
|
|
|
|
|
SArray* pList = taosArrayInit(8, POINTER_BYTES);
|
|
|
|
|
if (pList == NULL) {
|
|
|
|
|
return terrno;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
p = taosArrayPush(pList, &rid);
|
|
|
|
|
if (p == NULL) {
|
|
|
|
|
return TSDB_CODE_OUT_OF_MEMORY;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
code = taosHashPut(gMetaRefMgt.pTable, &vgId, sizeof(vgId), &list, sizeof(void*));
|
|
|
|
|
code = taosHashPut(gMetaRefMgt.pTable, &vgId, sizeof(vgId), &pList, sizeof(void*));
|
|
|
|
|
if (code) {
|
|
|
|
|
stError("vgId:%d failed to put into metaRef table, rid:%" PRId64, (int32_t)vgId, *rid);
|
|
|
|
|
return code;
|
|
|
|
@ -180,8 +189,13 @@ int32_t streamMetaCheckBackendCompatible(SStreamMeta* pMeta) {
|
|
|
|
|
|
|
|
|
|
code = tdbTbcMoveToFirst(pCur);
|
|
|
|
|
if (code) {
|
|
|
|
|
(void)tdbTbcClose(pCur);
|
|
|
|
|
stError("vgId:%d failed to open stream meta file cursor, not perform compatible check", pMeta->vgId);
|
|
|
|
|
stError("vgId:%d failed to open stream meta file cursor, not perform compatible check, code:%s", pMeta->vgId,
|
|
|
|
|
tstrerror(code));
|
|
|
|
|
code = tdbTbcClose(pCur);
|
|
|
|
|
if (code) {
|
|
|
|
|
stError("vgId:%d failed to close meta file cursor, code:%s", pMeta->vgId, tstrerror(code));
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
return ret;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
@ -209,7 +223,10 @@ int32_t streamMetaCheckBackendCompatible(SStreamMeta* pMeta) {
|
|
|
|
|
|
|
|
|
|
tdbFree(pKey);
|
|
|
|
|
tdbFree(pVal);
|
|
|
|
|
(void)tdbTbcClose(pCur);
|
|
|
|
|
code = tdbTbcClose(pCur);
|
|
|
|
|
if (code != 0) {
|
|
|
|
|
stError("vgId:%d failed to close meta file cursor, code:%s", pMeta->vgId, tstrerror(code));
|
|
|
|
|
}
|
|
|
|
|
return ret;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
@ -351,8 +368,8 @@ void streamMetaRemoveDB(void* arg, char* key) {
|
|
|
|
|
|
|
|
|
|
int32_t streamMetaOpen(const char* path, void* ahandle, FTaskBuild buildTaskFn, FTaskExpand expandTaskFn, int32_t vgId,
|
|
|
|
|
int64_t stage, startComplete_fn_t fn, SStreamMeta** p) {
|
|
|
|
|
*p = NULL;
|
|
|
|
|
int32_t code = 0;
|
|
|
|
|
QRY_PARAM_CHECK(p);
|
|
|
|
|
|
|
|
|
|
SStreamMeta* pMeta = taosMemoryCalloc(1, sizeof(SStreamMeta));
|
|
|
|
|
if (pMeta == NULL) {
|
|
|
|
@ -484,9 +501,26 @@ _err:
|
|
|
|
|
taosMemoryFree(pMeta->path);
|
|
|
|
|
if (pMeta->pTasksMap) taosHashCleanup(pMeta->pTasksMap);
|
|
|
|
|
if (pMeta->pTaskList) taosArrayDestroy(pMeta->pTaskList);
|
|
|
|
|
if (pMeta->pTaskDb) (void)tdbTbClose(pMeta->pTaskDb);
|
|
|
|
|
if (pMeta->pCheckpointDb) (void)tdbTbClose(pMeta->pCheckpointDb);
|
|
|
|
|
if (pMeta->db) (void)tdbClose(pMeta->db);
|
|
|
|
|
if (pMeta->pTaskDb) {
|
|
|
|
|
int32_t ret = tdbTbClose(pMeta->pTaskDb);
|
|
|
|
|
if (ret) {
|
|
|
|
|
stError("vgId:%d tdb failed close task db, code:%s", pMeta->vgId, tstrerror(ret));
|
|
|
|
|
}
|
|
|
|
|
pMeta->pTaskDb = NULL;
|
|
|
|
|
}
|
|
|
|
|
if (pMeta->pCheckpointDb) {
|
|
|
|
|
int32_t ret = tdbTbClose(pMeta->pCheckpointDb);
|
|
|
|
|
if (ret) {
|
|
|
|
|
stError("vgId:%d tdb failed close task checkpointDb, code:%s", pMeta->vgId, tstrerror(ret));
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
if (pMeta->db) {
|
|
|
|
|
int32_t ret = tdbClose(pMeta->db);
|
|
|
|
|
if (ret) {
|
|
|
|
|
stError("vgId:%d tdb failed close meta db, code:%s", pMeta->vgId, tstrerror(ret));
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if (pMeta->pHbInfo) taosMemoryFreeClear(pMeta->pHbInfo);
|
|
|
|
|
if (pMeta->updateInfo.pTasks) taosHashCleanup(pMeta->updateInfo.pTasks);
|
|
|
|
|
if (pMeta->startInfo.pReadyTaskSet) taosHashCleanup(pMeta->startInfo.pReadyTaskSet);
|
|
|
|
@ -532,7 +566,7 @@ void streamMetaClear(SStreamMeta* pMeta) {
|
|
|
|
|
// release the ref by timer
|
|
|
|
|
if (p->info.delaySchedParam != 0 && p->info.fillHistory == 0) { // one more ref in timer
|
|
|
|
|
stDebug("s-task:%s stop schedTimer, and (before) desc ref:%d", p->id.idStr, p->refCnt);
|
|
|
|
|
(void)taosTmrStop(p->schedInfo.pDelayTimer);
|
|
|
|
|
streamTmrStop(p->schedInfo.pDelayTimer);
|
|
|
|
|
p->info.delaySchedParam = 0;
|
|
|
|
|
streamMetaReleaseTask(pMeta, p);
|
|
|
|
|
}
|
|
|
|
@ -567,7 +601,10 @@ void streamMetaClose(SStreamMeta* pMeta) {
|
|
|
|
|
if (pMeta == NULL) {
|
|
|
|
|
return;
|
|
|
|
|
}
|
|
|
|
|
(void)taosRemoveRef(streamMetaId, pMeta->rid);
|
|
|
|
|
int32_t code = taosRemoveRef(streamMetaId, pMeta->rid);
|
|
|
|
|
if (code) {
|
|
|
|
|
stError("vgId:%d failed to remove ref:%d, code:%s", pMeta->vgId, pMeta->rid, tstrerror(code));
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
void streamMetaCloseImpl(void* arg) {
|
|
|
|
@ -576,6 +613,7 @@ void streamMetaCloseImpl(void* arg) {
|
|
|
|
|
return;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
int32_t code = 0;
|
|
|
|
|
int32_t vgId = pMeta->vgId;
|
|
|
|
|
stDebug("vgId:%d start to do-close stream meta", vgId);
|
|
|
|
|
|
|
|
|
@ -584,10 +622,22 @@ void streamMetaCloseImpl(void* arg) {
|
|
|
|
|
streamMetaWUnLock(pMeta);
|
|
|
|
|
|
|
|
|
|
// already log the error, ignore here
|
|
|
|
|
(void)tdbAbort(pMeta->db, pMeta->txn);
|
|
|
|
|
(void)tdbTbClose(pMeta->pTaskDb);
|
|
|
|
|
(void)tdbTbClose(pMeta->pCheckpointDb);
|
|
|
|
|
(void)tdbClose(pMeta->db);
|
|
|
|
|
code = tdbAbort(pMeta->db, pMeta->txn);
|
|
|
|
|
if (code) {
|
|
|
|
|
stError("vgId:%d failed to jump of trans for tdb, code:%s", vgId, tstrerror(code));
|
|
|
|
|
}
|
|
|
|
|
code = tdbTbClose(pMeta->pTaskDb);
|
|
|
|
|
if (code) {
|
|
|
|
|
stError("vgId:%d failed to close taskDb, code:%s", vgId, tstrerror(code));
|
|
|
|
|
}
|
|
|
|
|
code = tdbTbClose(pMeta->pCheckpointDb);
|
|
|
|
|
if (code) {
|
|
|
|
|
stError("vgId:%d failed to close checkpointDb, code:%s", vgId, tstrerror(code));
|
|
|
|
|
}
|
|
|
|
|
code = tdbClose(pMeta->db);
|
|
|
|
|
if (code) {
|
|
|
|
|
stError("vgId:%d failed to close db, code:%s", vgId, tstrerror(code));
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
taosArrayDestroy(pMeta->pTaskList);
|
|
|
|
|
taosArrayDestroy(pMeta->chkpSaved);
|
|
|
|
@ -611,7 +661,10 @@ void streamMetaCloseImpl(void* arg) {
|
|
|
|
|
bkdMgtDestroy(pMeta->bkdChkptMgt);
|
|
|
|
|
|
|
|
|
|
pMeta->role = NODE_ROLE_UNINIT;
|
|
|
|
|
(void)taosThreadRwlockDestroy(&pMeta->lock);
|
|
|
|
|
code = taosThreadRwlockDestroy(&pMeta->lock);
|
|
|
|
|
if (code) {
|
|
|
|
|
stError("vgId:%d destroy rwlock, code:%s", vgId, tstrerror(code));
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
taosMemoryFree(pMeta);
|
|
|
|
|
stDebug("vgId:%d end to close stream meta", vgId);
|
|
|
|
@ -711,7 +764,7 @@ int32_t streamMetaRegisterTask(SStreamMeta* pMeta, int64_t ver, SStreamTask* pTa
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if (pTask->info.fillHistory == 0) {
|
|
|
|
|
(void)atomic_add_fetch_32(&pMeta->numOfStreamTasks, 1);
|
|
|
|
|
int32_t val = atomic_add_fetch_32(&pMeta->numOfStreamTasks, 1);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
*pAdded = true;
|
|
|
|
@ -786,20 +839,26 @@ static void doRemoveIdFromList(SArray* pTaskList, int32_t num, SStreamTaskId* id
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
static int32_t streamTaskSendTransSuccessMsg(SStreamTask* pTask, void* param) {
|
|
|
|
|
int32_t code = 0;
|
|
|
|
|
if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) {
|
|
|
|
|
(void)streamTaskSendCheckpointSourceRsp(pTask);
|
|
|
|
|
code = streamTaskSendCheckpointSourceRsp(pTask);
|
|
|
|
|
if (code) {
|
|
|
|
|
stError("s-task:%s vgId:%d failed to send checkpoint-source rsp, code:%s", pTask->id.idStr, pTask->pMeta->vgId,
|
|
|
|
|
tstrerror(code));
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
return 0;
|
|
|
|
|
return code;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
int32_t streamMetaUnregisterTask(SStreamMeta* pMeta, int64_t streamId, int32_t taskId) {
|
|
|
|
|
SStreamTask* pTask = NULL;
|
|
|
|
|
int32_t vgId = pMeta->vgId;
|
|
|
|
|
int32_t code = 0;
|
|
|
|
|
STaskId id = {.streamId = streamId, .taskId = taskId};
|
|
|
|
|
|
|
|
|
|
// pre-delete operation
|
|
|
|
|
streamMetaWLock(pMeta);
|
|
|
|
|
|
|
|
|
|
STaskId id = {.streamId = streamId, .taskId = taskId};
|
|
|
|
|
SStreamTask** ppTask = (SStreamTask**)taosHashGet(pMeta->pTasksMap, &id, sizeof(id));
|
|
|
|
|
if (ppTask) {
|
|
|
|
|
pTask = *ppTask;
|
|
|
|
@ -811,12 +870,16 @@ int32_t streamMetaUnregisterTask(SStreamMeta* pMeta, int64_t streamId, int32_t t
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// handle the dropping event
|
|
|
|
|
(void)streamTaskHandleEventAsync(pTask->status.pSM, TASK_EVENT_DROPPING, streamTaskSendTransSuccessMsg, NULL);
|
|
|
|
|
code = streamTaskHandleEventAsync(pTask->status.pSM, TASK_EVENT_DROPPING, streamTaskSendTransSuccessMsg, NULL);
|
|
|
|
|
if (code) {
|
|
|
|
|
stError("s-task:0x%" PRIx64 " failed to handle dropping event async, code:%s", id.taskId, tstrerror(code));
|
|
|
|
|
}
|
|
|
|
|
} else {
|
|
|
|
|
stDebug("vgId:%d failed to find the task:0x%x, it may be dropped already", vgId, taskId);
|
|
|
|
|
streamMetaWUnLock(pMeta);
|
|
|
|
|
return 0;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
streamMetaWUnLock(pMeta);
|
|
|
|
|
|
|
|
|
|
stDebug("s-task:0x%x vgId:%d set task status:dropping and start to unregister it", taskId, vgId);
|
|
|
|
@ -850,12 +913,15 @@ int32_t streamMetaUnregisterTask(SStreamMeta* pMeta, int64_t streamId, int32_t t
|
|
|
|
|
pTask = *ppTask;
|
|
|
|
|
// it is an fill-history task, remove the related stream task's id that points to it
|
|
|
|
|
if (pTask->info.fillHistory == 0) {
|
|
|
|
|
(void)atomic_sub_fetch_32(&pMeta->numOfStreamTasks, 1);
|
|
|
|
|
int32_t ret = atomic_sub_fetch_32(&pMeta->numOfStreamTasks, 1);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
int32_t code = taosHashRemove(pMeta->pTasksMap, &id, sizeof(id));
|
|
|
|
|
code = taosHashRemove(pMeta->pTasksMap, &id, sizeof(id));
|
|
|
|
|
doRemoveIdFromList(pMeta->pTaskList, (int32_t)taosArrayGetSize(pMeta->pTaskList), &pTask->id);
|
|
|
|
|
(void)streamMetaRemoveTask(pMeta, &id);
|
|
|
|
|
code = streamMetaRemoveTask(pMeta, &id);
|
|
|
|
|
if (code) {
|
|
|
|
|
stError("vgId:%d failed to remove task:0x%" PRIx64 ", code:%s", pMeta->vgId, id.taskId, tstrerror(code));
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
int32_t size = (int32_t) taosHashGetSize(pMeta->pTasksMap);
|
|
|
|
|
int32_t sizeInList = taosArrayGetSize(pMeta->pTaskList);
|
|
|
|
@ -871,7 +937,7 @@ int32_t streamMetaUnregisterTask(SStreamMeta* pMeta, int64_t streamId, int32_t t
|
|
|
|
|
|
|
|
|
|
if (pTask->info.delaySchedParam != 0 && pTask->info.fillHistory == 0) {
|
|
|
|
|
stDebug("s-task:%s stop schedTimer, and (before) desc ref:%d", pTask->id.idStr, pTask->refCnt);
|
|
|
|
|
(void)taosTmrStop(pTask->schedInfo.pDelayTimer);
|
|
|
|
|
streamTmrStop(pTask->schedInfo.pDelayTimer);
|
|
|
|
|
pTask->info.delaySchedParam = 0;
|
|
|
|
|
streamMetaReleaseTask(pMeta, pTask);
|
|
|
|
|
}
|
|
|
|
@ -936,8 +1002,11 @@ int64_t streamMetaGetLatestCheckpointId(SStreamMeta* pMeta) {
|
|
|
|
|
|
|
|
|
|
code = tdbTbcMoveToFirst(pCur);
|
|
|
|
|
if (code) {
|
|
|
|
|
(void)tdbTbcClose(pCur);
|
|
|
|
|
stError("failed to open stream meta file cursor, the latest checkpointId is 0, vgId:%d", pMeta->vgId);
|
|
|
|
|
stError("failed to move stream meta file cursor, the latest checkpointId is 0, vgId:%d", pMeta->vgId);
|
|
|
|
|
int32_t ret = tdbTbcClose(pCur);
|
|
|
|
|
if (ret != 0) {
|
|
|
|
|
stError("vgId:%d failed to close meta file cursor, code:%s", pMeta->vgId, tstrerror(ret));
|
|
|
|
|
}
|
|
|
|
|
return checkpointId;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
@ -960,7 +1029,11 @@ int64_t streamMetaGetLatestCheckpointId(SStreamMeta* pMeta) {
|
|
|
|
|
tdbFree(pKey);
|
|
|
|
|
tdbFree(pVal);
|
|
|
|
|
|
|
|
|
|
(void)tdbTbcClose(pCur);
|
|
|
|
|
int32_t ret = tdbTbcClose(pCur);
|
|
|
|
|
if (ret != 0) {
|
|
|
|
|
stError("vgId:%d failed to close meta file cursor, code:%s", pMeta->vgId, tstrerror(ret));
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
return checkpointId;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
@ -981,6 +1054,10 @@ void streamMetaLoadAllTasks(SStreamMeta* pMeta) {
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
pRecycleList = taosArrayInit(4, sizeof(STaskId));
|
|
|
|
|
if (pRecycleList == NULL) {
|
|
|
|
|
stError("vgId:%d failed prepare load all tasks, code:out of memory", vgId);
|
|
|
|
|
return;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
vgId = pMeta->vgId;
|
|
|
|
|
stInfo("vgId:%d load stream tasks from meta files", vgId);
|
|
|
|
@ -996,7 +1073,10 @@ void streamMetaLoadAllTasks(SStreamMeta* pMeta) {
|
|
|
|
|
if (code) {
|
|
|
|
|
stError("vgId:%d failed to open stream meta cursor, code:%s, not load any stream tasks", vgId, tstrerror(terrno));
|
|
|
|
|
taosArrayDestroy(pRecycleList);
|
|
|
|
|
(void)tdbTbcClose(pCur);
|
|
|
|
|
int32_t ret = tdbTbcClose(pCur);
|
|
|
|
|
if (ret != 0) {
|
|
|
|
|
stError("vgId:%d failed to close meta file cursor, code:%s", pMeta->vgId, tstrerror(ret));
|
|
|
|
|
}
|
|
|
|
|
return;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
@ -1072,11 +1152,11 @@ void streamMetaLoadAllTasks(SStreamMeta* pMeta) {
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if (pTask->info.fillHistory == 0) {
|
|
|
|
|
(void)atomic_add_fetch_32(&pMeta->numOfStreamTasks, 1);
|
|
|
|
|
int32_t val = atomic_add_fetch_32(&pMeta->numOfStreamTasks, 1);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if (streamTaskShouldPause(pTask)) {
|
|
|
|
|
(void)atomic_add_fetch_32(&pMeta->numOfPausedTasks, 1);
|
|
|
|
|
int32_t val = atomic_add_fetch_32(&pMeta->numOfPausedTasks, 1);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
@ -1090,7 +1170,10 @@ void streamMetaLoadAllTasks(SStreamMeta* pMeta) {
|
|
|
|
|
if (taosArrayGetSize(pRecycleList) > 0) {
|
|
|
|
|
for (int32_t i = 0; i < taosArrayGetSize(pRecycleList); ++i) {
|
|
|
|
|
STaskId* pId = taosArrayGet(pRecycleList, i);
|
|
|
|
|
(void)streamMetaRemoveTask(pMeta, pId);
|
|
|
|
|
code = streamMetaRemoveTask(pMeta, pId);
|
|
|
|
|
if (code) {
|
|
|
|
|
stError("s-task:0x%" PRIx64 " failed to remove task, code:%s", pId->taskId, tstrerror(code));
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
@ -1099,8 +1182,10 @@ void streamMetaLoadAllTasks(SStreamMeta* pMeta) {
|
|
|
|
|
pMeta->numOfStreamTasks, pMeta->numOfPausedTasks);
|
|
|
|
|
|
|
|
|
|
taosArrayDestroy(pRecycleList);
|
|
|
|
|
|
|
|
|
|
(void)streamMetaCommit(pMeta);
|
|
|
|
|
code = streamMetaCommit(pMeta);
|
|
|
|
|
if (code) {
|
|
|
|
|
stError("vgId:%d failed to commit, code:%s", pMeta->vgId, tstrerror(code));
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
bool streamMetaTaskInTimer(SStreamMeta* pMeta) {
|
|
|
|
@ -1117,7 +1202,10 @@ bool streamMetaTaskInTimer(SStreamMeta* pMeta) {
|
|
|
|
|
SStreamTask* pTask = *(SStreamTask**)pIter;
|
|
|
|
|
if (pTask->status.timerActive >= 1) {
|
|
|
|
|
stDebug("s-task:%s in timer, blocking tasks in vgId:%d restart, set closing again", pTask->id.idStr, pMeta->vgId);
|
|
|
|
|
(void)streamTaskStop(pTask);
|
|
|
|
|
int32_t code = streamTaskStop(pTask);
|
|
|
|
|
if (code) {
|
|
|
|
|
stError("s-task:%s failed to stop task, code:%s", pTask->id.idStr, tstrerror(code));
|
|
|
|
|
}
|
|
|
|
|
inTimer = true;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
@ -1150,7 +1238,10 @@ void streamMetaNotifyClose(SStreamMeta* pMeta) {
|
|
|
|
|
|
|
|
|
|
SStreamTask* pTask = *(SStreamTask**)pIter;
|
|
|
|
|
stDebug("vgId:%d s-task:%s set task closing flag", vgId, pTask->id.idStr);
|
|
|
|
|
(void)streamTaskStop(pTask);
|
|
|
|
|
int32_t code = streamTaskStop(pTask);
|
|
|
|
|
if (code) {
|
|
|
|
|
stError("vgId:%d failed to stop task:0x%x, code:%s", vgId, pTask->id.taskId, tstrerror(code));
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
streamMetaWUnLock(pMeta);
|
|
|
|
@ -1168,7 +1259,6 @@ void streamMetaNotifyClose(SStreamMeta* pMeta) {
|
|
|
|
|
SArray* pTaskList = NULL;
|
|
|
|
|
int32_t code = streamMetaSendMsgBeforeCloseTasks(pMeta, &pTaskList);
|
|
|
|
|
if (code != TSDB_CODE_SUCCESS) {
|
|
|
|
|
// return code;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
streamMetaRUnLock(pMeta);
|
|
|
|
@ -1199,14 +1289,17 @@ void streamMetaStartHb(SStreamMeta* pMeta) {
|
|
|
|
|
|
|
|
|
|
void streamMetaRLock(SStreamMeta* pMeta) {
|
|
|
|
|
// stTrace("vgId:%d meta-rlock", pMeta->vgId);
|
|
|
|
|
(void)taosThreadRwlockRdlock(&pMeta->lock);
|
|
|
|
|
int32_t code = taosThreadRwlockRdlock(&pMeta->lock);
|
|
|
|
|
if (code) {
|
|
|
|
|
stError("vgId:%d meta-rlock failed, code:%s", pMeta->vgId, tstrerror(code));
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
void streamMetaRUnLock(SStreamMeta* pMeta) {
|
|
|
|
|
// stTrace("vgId:%d meta-runlock", pMeta->vgId);
|
|
|
|
|
int32_t code = taosThreadRwlockUnlock(&pMeta->lock);
|
|
|
|
|
if (code != TSDB_CODE_SUCCESS) {
|
|
|
|
|
stError("vgId:%d meta-runlock failed, code:%d", pMeta->vgId, code);
|
|
|
|
|
stError("vgId:%d meta-runlock failed, code:%s", pMeta->vgId, tstrerror(code));
|
|
|
|
|
} else {
|
|
|
|
|
// stTrace("vgId:%d meta-runlock completed", pMeta->vgId);
|
|
|
|
|
}
|
|
|
|
@ -1214,13 +1307,18 @@ void streamMetaRUnLock(SStreamMeta* pMeta) {
|
|
|
|
|
|
|
|
|
|
void streamMetaWLock(SStreamMeta* pMeta) {
|
|
|
|
|
// stTrace("vgId:%d meta-wlock", pMeta->vgId);
|
|
|
|
|
(void)taosThreadRwlockWrlock(&pMeta->lock);
|
|
|
|
|
// stTrace("vgId:%d meta-wlock completed", pMeta->vgId);
|
|
|
|
|
int32_t code = taosThreadRwlockWrlock(&pMeta->lock);
|
|
|
|
|
if (code) {
|
|
|
|
|
stError("vgId:%d failed to apply wlock, code:%s", pMeta->vgId, tstrerror(code));
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
void streamMetaWUnLock(SStreamMeta* pMeta) {
|
|
|
|
|
// stTrace("vgId:%d meta-wunlock", pMeta->vgId);
|
|
|
|
|
(void)taosThreadRwlockUnlock(&pMeta->lock);
|
|
|
|
|
int32_t code = taosThreadRwlockUnlock(&pMeta->lock);
|
|
|
|
|
if (code) {
|
|
|
|
|
stError("vgId:%d failed to apply wunlock, code:%s", pMeta->vgId, tstrerror(code));
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
int32_t streamMetaSendMsgBeforeCloseTasks(SStreamMeta* pMeta, SArray** pList) {
|
|
|
|
@ -1258,7 +1356,7 @@ int32_t streamMetaSendMsgBeforeCloseTasks(SStreamMeta* pMeta, SArray** pList) {
|
|
|
|
|
streamMetaReleaseTask(pMeta, pTask);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
(void)streamMetaSendHbHelper(pMeta);
|
|
|
|
|
code = streamMetaSendHbHelper(pMeta);
|
|
|
|
|
pMeta->sendMsgBeforeClosing = false;
|
|
|
|
|
return TSDB_CODE_SUCCESS; // always return true
|
|
|
|
|
}
|
|
|
|
@ -1348,9 +1446,9 @@ int32_t streamMetaAddFailedTask(SStreamMeta* pMeta, int64_t streamId, int32_t ta
|
|
|
|
|
streamMetaRUnLock(pMeta);
|
|
|
|
|
|
|
|
|
|
// add the failed task info, along with the related fill-history task info into tasks list.
|
|
|
|
|
(void)streamMetaAddTaskLaunchResult(pMeta, streamId, taskId, startTs, now, false);
|
|
|
|
|
code = streamMetaAddTaskLaunchResult(pMeta, streamId, taskId, startTs, now, false);
|
|
|
|
|
if (hasFillhistoryTask) {
|
|
|
|
|
(void)streamMetaAddTaskLaunchResult(pMeta, hId.streamId, hId.taskId, startTs, now, false);
|
|
|
|
|
code = streamMetaAddTaskLaunchResult(pMeta, hId.streamId, hId.taskId, startTs, now, false);
|
|
|
|
|
}
|
|
|
|
|
} else {
|
|
|
|
|
streamMetaRUnLock(pMeta);
|
|
|
|
@ -1365,12 +1463,18 @@ int32_t streamMetaAddFailedTask(SStreamMeta* pMeta, int64_t streamId, int32_t ta
|
|
|
|
|
|
|
|
|
|
void streamMetaAddFailedTaskSelf(SStreamTask* pTask, int64_t failedTs) {
|
|
|
|
|
int32_t startTs = pTask->execInfo.checkTs;
|
|
|
|
|
(void)streamMetaAddTaskLaunchResult(pTask->pMeta, pTask->id.streamId, pTask->id.taskId, startTs, failedTs, false);
|
|
|
|
|
int32_t code = streamMetaAddTaskLaunchResult(pTask->pMeta, pTask->id.streamId, pTask->id.taskId, startTs, failedTs, false);
|
|
|
|
|
if (code) {
|
|
|
|
|
stError("s-task:%s failed to add self task failed to start", pTask->id.idStr, tstrerror(code));
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// automatically set the related fill-history task to be failed.
|
|
|
|
|
if (HAS_RELATED_FILLHISTORY_TASK(pTask)) {
|
|
|
|
|
STaskId* pId = &pTask->hTaskInfo.id;
|
|
|
|
|
(void)streamMetaAddTaskLaunchResult(pTask->pMeta, pId->streamId, pId->taskId, startTs, failedTs, false);
|
|
|
|
|
code = streamMetaAddTaskLaunchResult(pTask->pMeta, pId->streamId, pId->taskId, startTs, failedTs, false);
|
|
|
|
|
if (code) {
|
|
|
|
|
stError("s-task:0x%" PRIx64 " failed to add self task failed to start", pId->taskId, tstrerror(code));
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|