avoid first tag index conflict

This commit is contained in:
yihaoDeng 2023-09-27 14:44:09 +08:00
parent ed172a939c
commit d4f87378fd
3 changed files with 64 additions and 56 deletions

View File

@ -50,7 +50,7 @@ void sndEnqueueStreamDispatch(SSnode *pSnode, SRpcMsg *pMsg) {
FAIL:
if (pMsg->info.handle == NULL) return;
SRpcMsg rsp = { .code = code, .info = pMsg->info};
SRpcMsg rsp = {.code = code, .info = pMsg->info};
tmsgSendRsp(&rsp);
rpcFreeCont(pMsg->pCont);
taosFreeQitem(pMsg);
@ -62,6 +62,8 @@ int32_t sndExpandTask(SSnode *pSnode, SStreamTask *pTask, int64_t ver) {
if (code != TSDB_CODE_SUCCESS) {
return code;
}
pTask->pBackend = NULL;
ASSERT(0);
streamTaskOpenAllUpstreamInput(pTask);
@ -73,8 +75,11 @@ int32_t sndExpandTask(SSnode *pSnode, SStreamTask *pTask, int64_t ver) {
qDebug("s-task:%s state:%p", pTask->id.idStr, pTask->pState);
}
int32_t numOfChildEp = taosArrayGetSize(pTask->pUpstreamInfoList);
SReadHandle handle = { .vnode = NULL, .numOfVgroups = numOfChildEp, .pStateBackend = pTask->pState, .fillHistory = pTask->info.fillHistory };
int32_t numOfChildEp = taosArrayGetSize(pTask->pUpstreamInfoList);
SReadHandle handle = {.vnode = NULL,
.numOfVgroups = numOfChildEp,
.pStateBackend = pTask->pState,
.fillHistory = pTask->info.fillHistory};
initStreamStateAPI(&handle.api);
pTask->exec.pExecutor = qCreateStreamExecTaskInfo(pTask->exec.qmsg, &handle, 0, pTask->id.taskId);
@ -83,23 +88,23 @@ int32_t sndExpandTask(SSnode *pSnode, SStreamTask *pTask, int64_t ver) {
streamTaskResetUpstreamStageInfo(pTask);
streamSetupScheduleTrigger(pTask);
SCheckpointInfo* pChkInfo = &pTask->chkInfo;
SCheckpointInfo *pChkInfo = &pTask->chkInfo;
// checkpoint ver is the kept version, handled data should be the next version.
if (pTask->chkInfo.checkpointId != 0) {
pTask->chkInfo.nextProcessVer = pTask->chkInfo.checkpointVer + 1;
qInfo("s-task:%s restore from the checkpointId:%" PRId64 " ver:%" PRId64 " nextProcessVer:%" PRId64, pTask->id.idStr,
pChkInfo->checkpointId, pChkInfo->checkpointVer, pChkInfo->nextProcessVer);
qInfo("s-task:%s restore from the checkpointId:%" PRId64 " ver:%" PRId64 " nextProcessVer:%" PRId64,
pTask->id.idStr, pChkInfo->checkpointId, pChkInfo->checkpointVer, pChkInfo->nextProcessVer);
} else {
if (pTask->chkInfo.nextProcessVer == -1) {
pTask->chkInfo.nextProcessVer = 0;
}
}
qInfo("snode:%d expand stream task, s-task:%s, checkpointId:%" PRId64 " checkpointVer:%" PRId64 " nextProcessVer:%" PRId64
" child id:%d, level:%d, status:%s fill-history:%d, trigger:%" PRId64 " ms",
qInfo("snode:%d expand stream task, s-task:%s, checkpointId:%" PRId64 " checkpointVer:%" PRId64
" nextProcessVer:%" PRId64 " child id:%d, level:%d, status:%s fill-history:%d, trigger:%" PRId64 " ms",
SNODE_HANDLE, pTask->id.idStr, pChkInfo->checkpointId, pChkInfo->checkpointVer, pChkInfo->nextProcessVer,
pTask->info.selfChildId, pTask->info.taskLevel, streamGetTaskStatusStr(pTask->status.taskStatus),
pTask->info.fillHistory, pTask->info.triggerParam);
pTask->info.selfChildId, pTask->info.taskLevel, streamGetTaskStatusStr(pTask->status.taskStatus),
pTask->info.fillHistory, pTask->info.triggerParam);
return 0;
}
@ -179,8 +184,8 @@ int32_t sndProcessTaskDeployReq(SSnode *pSnode, char *msg, int32_t msgLen) {
int32_t numOfTasks = streamMetaGetNumOfTasks(pSnode->pMeta);
taosWUnLockLatch(&pSnode->pMeta->lock);
qDebug("snode:%d s-task:%s is deployed on snode and add into meta, status:%s, numOfTasks:%d", SNODE_HANDLE, pTask->id.idStr,
streamGetTaskStatusStr(pTask->status.taskStatus), numOfTasks);
qDebug("snode:%d s-task:%s is deployed on snode and add into meta, status:%s, numOfTasks:%d", SNODE_HANDLE,
pTask->id.idStr, streamGetTaskStatusStr(pTask->status.taskStatus), numOfTasks);
streamTaskCheckDownstream(pTask);
return 0;
@ -190,7 +195,7 @@ int32_t sndProcessTaskDropReq(SSnode *pSnode, char *msg, int32_t msgLen) {
SVDropStreamTaskReq *pReq = (SVDropStreamTaskReq *)msg;
qDebug("snode:%d receive msg to drop stream task:0x%x", pSnode->pMeta->vgId, pReq->taskId);
SStreamTask* pTask = streamMetaAcquireTask(pSnode->pMeta, pReq->streamId, pReq->taskId);
SStreamTask *pTask = streamMetaAcquireTask(pSnode->pMeta, pReq->streamId, pReq->taskId);
if (pTask == NULL) {
qError("vgId:%d failed to acquire s-task:0x%x when dropping it", pSnode->pMeta->vgId, pReq->taskId);
return 0;
@ -355,12 +360,13 @@ int32_t sndProcessStreamTaskCheckReq(SSnode *pSnode, SRpcMsg *pMsg) {
rsp.status = streamTaskCheckStatus(pTask, req.upstreamTaskId, req.upstreamNodeId, req.stage);
streamMetaReleaseTask(pSnode->pMeta, pTask);
const char* pStatus = streamGetTaskStatusStr(pTask->status.taskStatus);
const char *pStatus = streamGetTaskStatusStr(pTask->status.taskStatus);
qDebug("s-task:%s status:%s, recv task check req(reqId:0x%" PRIx64 ") task:0x%x (vgId:%d), ready:%d",
pTask->id.idStr, pStatus, rsp.reqId, rsp.upstreamTaskId, rsp.upstreamNodeId, rsp.status);
pTask->id.idStr, pStatus, rsp.reqId, rsp.upstreamTaskId, rsp.upstreamNodeId, rsp.status);
} else {
rsp.status = 0;
qDebug("recv task check(taskId:0x%x not built yet) req(reqId:0x%" PRIx64 ") from task:0x%x (vgId:%d), rsp status %d",
qDebug("recv task check(taskId:0x%x not built yet) req(reqId:0x%" PRIx64
") from task:0x%x (vgId:%d), rsp status %d",
taskId, rsp.reqId, rsp.upstreamTaskId, rsp.upstreamNodeId, rsp.status);
}
@ -388,15 +394,15 @@ int32_t sndProcessStreamTaskCheckReq(SSnode *pSnode, SRpcMsg *pMsg) {
return 0;
}
int32_t sndProcessStreamTaskCheckRsp(SSnode* pSnode, SRpcMsg* pMsg) {
char* pReq = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
int32_t sndProcessStreamTaskCheckRsp(SSnode *pSnode, SRpcMsg *pMsg) {
char *pReq = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
int32_t len = pMsg->contLen - sizeof(SMsgHead);
int32_t code;
SStreamTaskCheckRsp rsp;
SDecoder decoder;
tDecoderInit(&decoder, (uint8_t*)pReq, len);
tDecoderInit(&decoder, (uint8_t *)pReq, len);
code = tDecodeStreamTaskCheckRsp(&decoder, &rsp);
if (code < 0) {
@ -405,13 +411,13 @@ int32_t sndProcessStreamTaskCheckRsp(SSnode* pSnode, SRpcMsg* pMsg) {
}
tDecoderClear(&decoder);
qDebug("tq task:0x%x (vgId:%d) recv check rsp(reqId:0x%" PRIx64 ") from 0x%x (vgId:%d) status %d",
rsp.upstreamTaskId, rsp.upstreamNodeId, rsp.reqId, rsp.downstreamTaskId, rsp.downstreamNodeId, rsp.status);
qDebug("tq task:0x%x (vgId:%d) recv check rsp(reqId:0x%" PRIx64 ") from 0x%x (vgId:%d) status %d", rsp.upstreamTaskId,
rsp.upstreamNodeId, rsp.reqId, rsp.downstreamTaskId, rsp.downstreamNodeId, rsp.status);
SStreamTask* pTask = streamMetaAcquireTask(pSnode->pMeta, rsp.streamId, rsp.upstreamTaskId);
SStreamTask *pTask = streamMetaAcquireTask(pSnode->pMeta, rsp.streamId, rsp.upstreamTaskId);
if (pTask == NULL) {
qError("tq failed to locate the stream task:0x%x (vgId:%d), it may have been destroyed", rsp.upstreamTaskId,
pSnode->pMeta->vgId);
pSnode->pMeta->vgId);
return -1;
}

View File

@ -221,14 +221,16 @@ int32_t streamMetaMayConvertBackendFormat(SStreamMeta* pMeta) {
}
void* streamMetaGetBackendByTaskKey(SStreamMeta* pMeta, char* key) {
taosThreadMutexLock(&pMeta->backendMutex);
void** ppBackend = taosHashGet(pMeta->pTaskBackendUnique, key, strlen(key));
if (ppBackend != NULL && *ppBackend != NULL) {
// add ref later
taosThreadMutexUnlock(&pMeta->backendMutex);
return *ppBackend;
}
void* pBackend = streamStateOpenTaskBackend(pMeta->path, key);
taosHashPut(pMeta->pTaskBackendUnique, key, strlen(key), &pBackend, sizeof(void*));
taosThreadMutexLock(&pMeta->backendMutex);
return pBackend;
}
SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandFunc, int32_t vgId, int64_t stage) {

View File

@ -106,43 +106,43 @@ SStreamState* streamStateOpen(char* path, void* pTask, bool specPath, int32_t sz
}
SStreamTask* pStreamTask = pTask;
char statePath[1024];
if (!specPath) {
sprintf(statePath, "%s%s%d", path, TD_DIRSEP, pStreamTask->id.taskId);
} else {
memset(statePath, 0, 1024);
tstrncpy(statePath, path, 1024);
}
// char statePath[1024];
// if (!specPath) {
// sprintf(statePath, "%s%s%d", path, TD_DIRSEP, pStreamTask->id.taskId);
// } else {
// memset(statePath, 0, 1024);
// tstrncpy(statePath, path, 1024);
// }
pState->taskId = pStreamTask->id.taskId;
pState->streamId = pStreamTask->id.streamId;
sprintf(pState->pTdbState->idstr, "0x%" PRIx64 "-%d", pState->streamId, pState->taskId);
#ifdef USE_ROCKSDB
SStreamMeta* pMeta = pStreamTask->pMeta;
pState->streamBackendRid = pMeta->streamBackendRid;
// taosWLockLatch(&pMeta->lock);
taosThreadMutexLock(&pMeta->backendMutex);
void* uniqueId =
taosHashGet(pMeta->pTaskBackendUnique, pState->pTdbState->idstr, strlen(pState->pTdbState->idstr) + 1);
if (uniqueId == NULL) {
int code = streamStateOpenBackend(pMeta->streamBackend, pState);
if (code == -1) {
taosThreadMutexUnlock(&pMeta->backendMutex);
taosMemoryFree(pState);
return NULL;
}
taosHashPut(pMeta->pTaskBackendUnique, pState->pTdbState->idstr, strlen(pState->pTdbState->idstr) + 1,
&pState->pTdbState->backendCfWrapperId, sizeof(pState->pTdbState->backendCfWrapperId));
} else {
int64_t id = *(int64_t*)uniqueId;
pState->pTdbState->backendCfWrapperId = id;
pState->pTdbState->pBackendCfWrapper = taosAcquireRef(streamBackendCfWrapperId, id);
// already exist stream task for
qInfo("already exist stream-state for %s", pState->pTdbState->idstr);
// taosAcquireRef(streamBackendId, pState->streamBackendRid);
}
taosThreadMutexUnlock(&pMeta->backendMutex);
// SStreamMeta* pMeta = pStreamTask->pMeta;
// pState->streamBackendRid = pMeta->streamBackendRid;
// taosWLockLatch(&pMeta->lock);
// taosThreadMutexLock(&pMeta->backendMutex);
// void* uniqueId =
// taosHashGet(pMeta->pTaskBackendUnique, pState->pTdbState->idstr, strlen(pState->pTdbState->idstr) + 1);
// if (uniqueId == NULL) {
// int code = streamStateOpenBackend(pMeta->streamBackend, pState);
// if (code == -1) {
// taosThreadMutexUnlock(&pMeta->backendMutex);
// taosMemoryFree(pState);
// return NULL;
// }
// taosHashPut(pMeta->pTaskBackendUnique, pState->pTdbState->idstr, strlen(pState->pTdbState->idstr) + 1,
// &pState->pTdbState->backendCfWrapperId, sizeof(pState->pTdbState->backendCfWrapperId));
// } else {
// int64_t id = *(int64_t*)uniqueId;
// pState->pTdbState->backendCfWrapperId = id;
// pState->pTdbState->pBackendCfWrapper = taosAcquireRef(streamBackendCfWrapperId, id);
// // already exist stream task for
// qInfo("already exist stream-state for %s", pState->pTdbState->idstr);
// // taosAcquireRef(streamBackendId, pState->streamBackendRid);
// }
// taosThreadMutexUnlock(&pMeta->backendMutex);
pState->pTdbState->pOwner = pTask;
pState->pFileState = NULL;
@ -1125,7 +1125,7 @@ int32_t streamStateGetParName(SStreamState* pState, int64_t groupId, void** pVal
void streamStateDestroy(SStreamState* pState, bool remove) {
#ifdef USE_ROCKSDB
streamFileStateDestroy(pState->pFileState);
streamStateDestroy_rocksdb(pState, remove);
// streamStateDestroy_rocksdb(pState, remove);
tSimpleHashCleanup(pState->parNameMap);
// do nothong
#endif