add other
This commit is contained in:
parent
f4a3567935
commit
483f9ab896
|
@ -1016,7 +1016,7 @@ static int32_t mndProcessStreamCheckpointTrans(SMnode *pMnode, SStreamObj *pStre
|
||||||
|
|
||||||
void *buf;
|
void *buf;
|
||||||
int32_t tlen;
|
int32_t tlen;
|
||||||
if (mndBuildStreamCheckpointSourceReq2(&buf, &tlen, pTask->nodeId, checkpointId) < 0) {
|
if (mndBuildStreamCheckpointSourceReq2(&buf, &tlen, pTask->info.nodeId, checkpointId) < 0) {
|
||||||
mndReleaseVgroup(pMnode, pVgObj);
|
mndReleaseVgroup(pMnode, pVgObj);
|
||||||
taosRUnLockLatch(&pStream->lock);
|
taosRUnLockLatch(&pStream->lock);
|
||||||
mndTransDrop(pTrans);
|
mndTransDrop(pTrans);
|
||||||
|
@ -1364,7 +1364,7 @@ static int32_t mndRetrieveStreamTask(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock
|
||||||
// task id
|
// task id
|
||||||
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
||||||
|
|
||||||
char idstr[128] = {0};
|
char idstr[128] = {0};
|
||||||
int32_t len = tintToHex(pTask->id.taskId, &idstr[4]);
|
int32_t len = tintToHex(pTask->id.taskId, &idstr[4]);
|
||||||
idstr[2] = '0';
|
idstr[2] = '0';
|
||||||
idstr[3] = 'x';
|
idstr[3] = 'x';
|
||||||
|
@ -1404,7 +1404,7 @@ static int32_t mndRetrieveStreamTask(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock
|
||||||
colDataSetVal(pColInfo, numOfRows, (const char *)&level, false);
|
colDataSetVal(pColInfo, numOfRows, (const char *)&level, false);
|
||||||
|
|
||||||
// status
|
// status
|
||||||
char status[20 + VARSTR_HEADER_SIZE] = {0};
|
char status[20 + VARSTR_HEADER_SIZE] = {0};
|
||||||
int8_t taskStatus = atomic_load_8(&pTask->status.taskStatus);
|
int8_t taskStatus = atomic_load_8(&pTask->status.taskStatus);
|
||||||
if (taskStatus == TASK_STATUS__NORMAL) {
|
if (taskStatus == TASK_STATUS__NORMAL) {
|
||||||
memcpy(varDataVal(status), "normal", 6);
|
memcpy(varDataVal(status), "normal", 6);
|
||||||
|
@ -1470,7 +1470,7 @@ static int32_t mndPauseStreamTask(STrans *pTrans, SStreamTask *pTask) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t mndPauseAllStreamTaskImpl(STrans *pTrans, SArray* tasks) {
|
int32_t mndPauseAllStreamTaskImpl(STrans *pTrans, SArray *tasks) {
|
||||||
int32_t size = taosArrayGetSize(tasks);
|
int32_t size = taosArrayGetSize(tasks);
|
||||||
for (int32_t i = 0; i < size; i++) {
|
for (int32_t i = 0; i < size; i++) {
|
||||||
SArray *pTasks = taosArrayGetP(tasks, i);
|
SArray *pTasks = taosArrayGetP(tasks, i);
|
||||||
|
|
|
@ -401,12 +401,12 @@ int32_t delObsoleteCheckpoint(void* arg, const char* path) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
int32_t streamBackendDoCheckpoint(void* arg, const char* path) {
|
int32_t streamBackendDoCheckpoint(void* arg, const char* path) {
|
||||||
SStreamMeta* pMeta = arg;
|
SStreamMeta* pMeta = arg;
|
||||||
int64_t backendRid = pMeta->streamBackendRid;
|
int64_t backendRid = pMeta->streamBackendRid;
|
||||||
int64_t checkpointId = pMeta->checkpointTs;
|
int64_t checkpointId = pMeta->checkpointTs;
|
||||||
int64_t st = taosGetTimestampMs();
|
int64_t st = taosGetTimestampMs();
|
||||||
int32_t code = -1;
|
int32_t code = -1;
|
||||||
SBackendHandle* pHandle = taosAcquireRef(streamBackendId, backendRid);
|
SBackendWrapper* pHandle = taosAcquireRef(streamBackendId, backendRid);
|
||||||
|
|
||||||
char checkpointDir[256] = {0};
|
char checkpointDir[256] = {0};
|
||||||
sprintf(checkpointDir, "%s/checkpoint_%" PRId64 "", path, checkpointId);
|
sprintf(checkpointDir, "%s/checkpoint_%" PRId64 "", path, checkpointId);
|
||||||
|
|
Loading…
Reference in New Issue