add cvt state
This commit is contained in:
parent
970b1cb840
commit
11da631bb5
|
@ -31,7 +31,7 @@ extern "C" {
|
||||||
|
|
||||||
typedef struct SStreamTask SStreamTask;
|
typedef struct SStreamTask SStreamTask;
|
||||||
|
|
||||||
#define SSTREAM_TASK_VER 3
|
#define SSTREAM_TASK_VER 3
|
||||||
#define SSTREAM_TASK_INCOMPATIBLE_VER 1
|
#define SSTREAM_TASK_INCOMPATIBLE_VER 1
|
||||||
#define SSTREAM_TASK_NEED_CONVERT_VER 2
|
#define SSTREAM_TASK_NEED_CONVERT_VER 2
|
||||||
|
|
||||||
|
@ -382,6 +382,7 @@ struct SStreamTask {
|
||||||
int32_t transferStateAlignCnt;
|
int32_t transferStateAlignCnt;
|
||||||
struct SStreamMeta* pMeta;
|
struct SStreamMeta* pMeta;
|
||||||
SSHashObj* pNameMap;
|
SSHashObj* pNameMap;
|
||||||
|
void* pBackend;
|
||||||
char reserve[256];
|
char reserve[256];
|
||||||
};
|
};
|
||||||
|
|
||||||
|
@ -422,6 +423,8 @@ typedef struct SStreamMeta {
|
||||||
int32_t chkpCap;
|
int32_t chkpCap;
|
||||||
SRWLatch chkpDirLock;
|
SRWLatch chkpDirLock;
|
||||||
int32_t pauseTaskNum;
|
int32_t pauseTaskNum;
|
||||||
|
|
||||||
|
// SHashObj* pTaskBackend;
|
||||||
} SStreamMeta;
|
} SStreamMeta;
|
||||||
|
|
||||||
int32_t tEncodeStreamEpInfo(SEncoder* pEncoder, const SStreamChildEpInfo* pInfo);
|
int32_t tEncodeStreamEpInfo(SEncoder* pEncoder, const SStreamChildEpInfo* pInfo);
|
||||||
|
@ -722,6 +725,7 @@ int32_t streamMetaReopen(SStreamMeta* pMeta, int64_t chkpId);
|
||||||
int32_t streamMetaCommit(SStreamMeta* pMeta);
|
int32_t streamMetaCommit(SStreamMeta* pMeta);
|
||||||
int32_t streamMetaLoadAllTasks(SStreamMeta* pMeta);
|
int32_t streamMetaLoadAllTasks(SStreamMeta* pMeta);
|
||||||
void streamMetaNotifyClose(SStreamMeta* pMeta);
|
void streamMetaNotifyClose(SStreamMeta* pMeta);
|
||||||
|
void* streamMetaGetBackendByTaskKey(SStreamMeta* pMeta, char* key);
|
||||||
|
|
||||||
// checkpoint
|
// checkpoint
|
||||||
int32_t streamProcessCheckpointSourceReq(SStreamTask* pTask, SStreamCheckpointSourceReq* pReq);
|
int32_t streamProcessCheckpointSourceReq(SStreamTask* pTask, SStreamCheckpointSourceReq* pReq);
|
||||||
|
|
|
@ -582,9 +582,9 @@ int32_t tqProcessDeleteSubReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg
|
||||||
taosWLockLatch(&pTq->lock);
|
taosWLockLatch(&pTq->lock);
|
||||||
bool exec = tqIsHandleExec(pHandle);
|
bool exec = tqIsHandleExec(pHandle);
|
||||||
|
|
||||||
if(exec){
|
if (exec) {
|
||||||
tqInfo("vgId:%d, topic:%s, subscription is executing, delete wait for 10ms and retry, pHandle:%p", vgId,
|
tqInfo("vgId:%d, topic:%s, subscription is executing, delete wait for 10ms and retry, pHandle:%p", vgId,
|
||||||
pHandle->subKey, pHandle);
|
pHandle->subKey, pHandle);
|
||||||
taosWUnLockLatch(&pTq->lock);
|
taosWUnLockLatch(&pTq->lock);
|
||||||
taosMsleep(10);
|
taosMsleep(10);
|
||||||
continue;
|
continue;
|
||||||
|
@ -699,12 +699,12 @@ int32_t tqProcessSubscribeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg
|
||||||
}
|
}
|
||||||
ret = tqMetaSaveHandle(pTq, req.subKey, &handle);
|
ret = tqMetaSaveHandle(pTq, req.subKey, &handle);
|
||||||
} else {
|
} else {
|
||||||
while(1){
|
while (1) {
|
||||||
taosWLockLatch(&pTq->lock);
|
taosWLockLatch(&pTq->lock);
|
||||||
bool exec = tqIsHandleExec(pHandle);
|
bool exec = tqIsHandleExec(pHandle);
|
||||||
if(exec){
|
if (exec) {
|
||||||
tqInfo("vgId:%d, topic:%s, subscription is executing, sub wait for 10ms and retry, pHandle:%p", pTq->pVnode->config.vgId,
|
tqInfo("vgId:%d, topic:%s, subscription is executing, sub wait for 10ms and retry, pHandle:%p",
|
||||||
pHandle->subKey, pHandle);
|
pTq->pVnode->config.vgId, pHandle->subKey, pHandle);
|
||||||
taosWUnLockLatch(&pTq->lock);
|
taosWUnLockLatch(&pTq->lock);
|
||||||
taosMsleep(10);
|
taosMsleep(10);
|
||||||
continue;
|
continue;
|
||||||
|
@ -713,7 +713,7 @@ int32_t tqProcessSubscribeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg
|
||||||
tqInfo("vgId:%d no switch consumer:0x%" PRIx64 " remains, because redo wal log", req.vgId, req.newConsumerId);
|
tqInfo("vgId:%d no switch consumer:0x%" PRIx64 " remains, because redo wal log", req.vgId, req.newConsumerId);
|
||||||
} else {
|
} else {
|
||||||
tqInfo("vgId:%d switch consumer from Id:0x%" PRIx64 " to Id:0x%" PRIx64, req.vgId, pHandle->consumerId,
|
tqInfo("vgId:%d switch consumer from Id:0x%" PRIx64 " to Id:0x%" PRIx64, req.vgId, pHandle->consumerId,
|
||||||
req.newConsumerId);
|
req.newConsumerId);
|
||||||
atomic_store_64(&pHandle->consumerId, req.newConsumerId);
|
atomic_store_64(&pHandle->consumerId, req.newConsumerId);
|
||||||
atomic_store_32(&pHandle->epoch, 0);
|
atomic_store_32(&pHandle->epoch, 0);
|
||||||
tqUnregisterPushHandle(pTq, pHandle);
|
tqUnregisterPushHandle(pTq, pHandle);
|
||||||
|
@ -736,6 +736,9 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver) {
|
||||||
tqDebug("s-task:0x%x start to expand task", pTask->id.taskId);
|
tqDebug("s-task:0x%x start to expand task", pTask->id.taskId);
|
||||||
|
|
||||||
int32_t code = streamTaskInit(pTask, pTq->pStreamMeta, &pTq->pVnode->msgCb, ver);
|
int32_t code = streamTaskInit(pTask, pTq->pStreamMeta, &pTq->pVnode->msgCb, ver);
|
||||||
|
|
||||||
|
pTask->pBackend = streamMetaGetBackendByTaskKey(pTq->pStreamMeta, (char*)pTask->id.idStr);
|
||||||
|
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
@ -859,8 +862,8 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver) {
|
||||||
pChkInfo->checkpointId, pChkInfo->checkpointVer, pChkInfo->nextProcessVer);
|
pChkInfo->checkpointId, pChkInfo->checkpointVer, pChkInfo->nextProcessVer);
|
||||||
}
|
}
|
||||||
|
|
||||||
tqInfo("vgId:%d expand stream task, s-task:%s, checkpointId:%" PRId64 " checkpointVer:%" PRId64 " nextProcessVer:%" PRId64
|
tqInfo("vgId:%d expand stream task, s-task:%s, checkpointId:%" PRId64 " checkpointVer:%" PRId64
|
||||||
" child id:%d, level:%d, status:%s fill-history:%d, trigger:%" PRId64 " ms",
|
" nextProcessVer:%" PRId64 " child id:%d, level:%d, status:%s fill-history:%d, trigger:%" PRId64 " ms",
|
||||||
vgId, pTask->id.idStr, pChkInfo->checkpointId, pChkInfo->checkpointVer, pChkInfo->nextProcessVer,
|
vgId, pTask->id.idStr, pChkInfo->checkpointId, pChkInfo->checkpointVer, pChkInfo->nextProcessVer,
|
||||||
pTask->info.selfChildId, pTask->info.taskLevel, streamGetTaskStatusStr(pTask->status.taskStatus),
|
pTask->info.selfChildId, pTask->info.taskLevel, streamGetTaskStatusStr(pTask->status.taskStatus),
|
||||||
pTask->info.fillHistory, pTask->info.triggerParam);
|
pTask->info.fillHistory, pTask->info.triggerParam);
|
||||||
|
@ -1004,7 +1007,7 @@ int32_t tqProcessTaskDeployReq(STQ* pTq, int64_t sversion, char* msg, int32_t ms
|
||||||
bool restored = pTq->pVnode->restored;
|
bool restored = pTq->pVnode->restored;
|
||||||
if (p != NULL && restored) {
|
if (p != NULL && restored) {
|
||||||
p->tsInfo.init = taosGetTimestampMs();
|
p->tsInfo.init = taosGetTimestampMs();
|
||||||
tqDebug("s-task:%s set the init ts:%"PRId64, p->id.idStr, p->tsInfo.init);
|
tqDebug("s-task:%s set the init ts:%" PRId64, p->id.idStr, p->tsInfo.init);
|
||||||
|
|
||||||
streamTaskCheckDownstream(p);
|
streamTaskCheckDownstream(p);
|
||||||
} else if (!restored) {
|
} else if (!restored) {
|
||||||
|
@ -1443,7 +1446,7 @@ int32_t tqProcessTaskResumeImpl(STQ* pTq, SStreamTask* pTask, int64_t sversion,
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
int8_t status = pTask->status.taskStatus;
|
int8_t status = pTask->status.taskStatus;
|
||||||
if (status == TASK_STATUS__NORMAL || status == TASK_STATUS__SCAN_HISTORY || status == TASK_STATUS__CK) {
|
if (status == TASK_STATUS__NORMAL || status == TASK_STATUS__SCAN_HISTORY || status == TASK_STATUS__CK) {
|
||||||
// no lock needs to secure the access of the version
|
// no lock needs to secure the access of the version
|
||||||
if (igUntreated && level == TASK_LEVEL__SOURCE && !pTask->info.fillHistory) {
|
if (igUntreated && level == TASK_LEVEL__SOURCE && !pTask->info.fillHistory) {
|
||||||
|
@ -1622,12 +1625,13 @@ int32_t tqProcessStreamCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg) {
|
||||||
// downstream not ready, current the stream tasks are not all ready. Ignore this checkpoint req.
|
// downstream not ready, current the stream tasks are not all ready. Ignore this checkpoint req.
|
||||||
if (pTask->status.downstreamReady != 1) {
|
if (pTask->status.downstreamReady != 1) {
|
||||||
qError("s-task:%s not ready for checkpoint, since downstream not ready, ignore this checkpoint:%" PRId64
|
qError("s-task:%s not ready for checkpoint, since downstream not ready, ignore this checkpoint:%" PRId64
|
||||||
", set it failure", pTask->id.idStr, req.checkpointId);
|
", set it failure",
|
||||||
|
pTask->id.idStr, req.checkpointId);
|
||||||
streamMetaReleaseTask(pMeta, pTask);
|
streamMetaReleaseTask(pMeta, pTask);
|
||||||
|
|
||||||
SRpcMsg rsp = {0};
|
SRpcMsg rsp = {0};
|
||||||
buildCheckpointSourceRsp(&req, &pMsg->info, &rsp, 0);
|
buildCheckpointSourceRsp(&req, &pMsg->info, &rsp, 0);
|
||||||
tmsgSendRsp(&rsp); // error occurs
|
tmsgSendRsp(&rsp); // error occurs
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1784,7 +1788,6 @@ _end:
|
||||||
tDecoderClear(&decoder);
|
tDecoderClear(&decoder);
|
||||||
|
|
||||||
if (allStopped) {
|
if (allStopped) {
|
||||||
|
|
||||||
if (!pTq->pVnode->restored) {
|
if (!pTq->pVnode->restored) {
|
||||||
tqDebug("vgId:%d vnode restore not completed, not restart the tasks", vgId);
|
tqDebug("vgId:%d vnode restore not completed, not restart the tasks", vgId);
|
||||||
} else {
|
} else {
|
||||||
|
@ -1816,4 +1819,3 @@ _end:
|
||||||
|
|
||||||
return rsp.code;
|
return rsp.code;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -220,6 +220,17 @@ int32_t streamMetaMayDoStateBackendConvert(SStreamMeta* pMeta) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void* streamMetaGetBackendByTaskKey(SStreamMeta* pMeta, char* key) {
|
||||||
|
void** ppBackend = taosHashGet(pMeta->pTaskBackendUnique, key, strlen(key));
|
||||||
|
if (ppBackend != NULL && *ppBackend != NULL) {
|
||||||
|
// add ref later
|
||||||
|
return *ppBackend;
|
||||||
|
}
|
||||||
|
void* pBackend = streamStateOpenTaskBackend(pMeta->path, key);
|
||||||
|
taosHashPut(pMeta->pTaskBackendUnique, key, strlen(key), &pBackend, sizeof(void*));
|
||||||
|
|
||||||
|
return pBackend;
|
||||||
|
}
|
||||||
SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandFunc, int32_t vgId, int64_t stage) {
|
SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandFunc, int32_t vgId, int64_t stage) {
|
||||||
int32_t code = -1;
|
int32_t code = -1;
|
||||||
SStreamMeta* pMeta = taosMemoryCalloc(1, sizeof(SStreamMeta));
|
SStreamMeta* pMeta = taosMemoryCalloc(1, sizeof(SStreamMeta));
|
||||||
|
@ -277,11 +288,12 @@ SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandF
|
||||||
pMeta->hbInfo.tickCounter = 0;
|
pMeta->hbInfo.tickCounter = 0;
|
||||||
pMeta->hbInfo.stopFlag = 0;
|
pMeta->hbInfo.stopFlag = 0;
|
||||||
|
|
||||||
|
pMeta->pTaskBackendUnique =
|
||||||
|
taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK);
|
||||||
|
|
||||||
// start backend
|
// start backend
|
||||||
// taosInitRWLatch(&pMeta->chkpDirLock);
|
// taosInitRWLatch(&pMeta->chkpDirLock);
|
||||||
|
|
||||||
// pMeta->pTaskBackendUnique =
|
|
||||||
// taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK);
|
|
||||||
// pMeta->chkpSaved = taosArrayInit(4, sizeof(int64_t));
|
// pMeta->chkpSaved = taosArrayInit(4, sizeof(int64_t));
|
||||||
// pMeta->chkpInUse = taosArrayInit(4, sizeof(int64_t));
|
// pMeta->chkpInUse = taosArrayInit(4, sizeof(int64_t));
|
||||||
// pMeta->chkpCap = 8;
|
// pMeta->chkpCap = 8;
|
||||||
|
|
|
@ -388,6 +388,8 @@ int32_t streamTaskInit(SStreamTask* pTask, SStreamMeta* pMeta, SMsgCb* pMsgCb, i
|
||||||
taosThreadMutexInit(&pTask->lock, NULL);
|
taosThreadMutexInit(&pTask->lock, NULL);
|
||||||
streamTaskOpenAllUpstreamInput(pTask);
|
streamTaskOpenAllUpstreamInput(pTask);
|
||||||
|
|
||||||
|
//pTask->pBackend = streamStateOpenTaskBackend(pMeta->path, (char*)pTask->id.idStr);
|
||||||
|
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue