Merge remote-tracking branch 'origin/enh/triggerCheckPoint2' into enh/triggerCheckPoint2
This commit is contained in:
commit
a049e310f6
|
@ -99,9 +99,6 @@ void qSetTaskId(qTaskInfo_t tinfo, uint64_t taskId, uint64_t queryId);
|
||||||
|
|
||||||
int32_t qSetStreamOpOpen(qTaskInfo_t tinfo);
|
int32_t qSetStreamOpOpen(qTaskInfo_t tinfo);
|
||||||
|
|
||||||
// todo refactor
|
|
||||||
void qGetCheckpointVersion(qTaskInfo_t tinfo, int64_t* dataVer, int64_t* ckId);
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Set multiple input data blocks for the stream scan.
|
* Set multiple input data blocks for the stream scan.
|
||||||
* @param tinfo
|
* @param tinfo
|
||||||
|
|
|
@ -69,8 +69,6 @@ typedef struct {
|
||||||
SVersionRange fillHistoryVer;
|
SVersionRange fillHistoryVer;
|
||||||
STimeWindow fillHistoryWindow;
|
STimeWindow fillHistoryWindow;
|
||||||
SStreamState* pState;
|
SStreamState* pState;
|
||||||
int64_t dataVersion;
|
|
||||||
int64_t checkPointId;
|
|
||||||
} SStreamTaskInfo;
|
} SStreamTaskInfo;
|
||||||
|
|
||||||
struct SExecTaskInfo {
|
struct SExecTaskInfo {
|
||||||
|
|
|
@ -223,12 +223,6 @@ int32_t qSetStreamOpOpen(qTaskInfo_t tinfo) {
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
void qGetCheckpointVersion(qTaskInfo_t tinfo, int64_t* dataVer, int64_t* ckId) {
|
|
||||||
SExecTaskInfo* pTaskInfo = tinfo;
|
|
||||||
*dataVer = pTaskInfo->streamInfo.dataVersion;
|
|
||||||
*ckId = pTaskInfo->streamInfo.checkPointId;
|
|
||||||
}
|
|
||||||
|
|
||||||
int32_t qSetMultiStreamInput(qTaskInfo_t tinfo, const void* pBlocks, size_t numOfBlocks, int32_t type) {
|
int32_t qSetMultiStreamInput(qTaskInfo_t tinfo, const void* pBlocks, size_t numOfBlocks, int32_t type) {
|
||||||
if (tinfo == NULL) {
|
if (tinfo == NULL) {
|
||||||
return TSDB_CODE_APP_ERROR;
|
return TSDB_CODE_APP_ERROR;
|
||||||
|
|
|
@ -2318,11 +2318,6 @@ static int32_t getNextQualifiedFinalWindow(SInterval* pInterval, STimeWindow* pN
|
||||||
return startPos;
|
return startPos;
|
||||||
}
|
}
|
||||||
|
|
||||||
static void setStreamDataVersion(SExecTaskInfo* pTaskInfo, int64_t version, int64_t ckId) {
|
|
||||||
pTaskInfo->streamInfo.dataVersion = version;
|
|
||||||
pTaskInfo->streamInfo.checkPointId = ckId;
|
|
||||||
}
|
|
||||||
|
|
||||||
static void doStreamIntervalAggImpl(SOperatorInfo* pOperatorInfo, SSDataBlock* pSDataBlock, uint64_t groupId,
|
static void doStreamIntervalAggImpl(SOperatorInfo* pOperatorInfo, SSDataBlock* pSDataBlock, uint64_t groupId,
|
||||||
SSHashObj* pUpdatedMap) {
|
SSHashObj* pUpdatedMap) {
|
||||||
SStreamIntervalOperatorInfo* pInfo = (SStreamIntervalOperatorInfo*)pOperatorInfo->info;
|
SStreamIntervalOperatorInfo* pInfo = (SStreamIntervalOperatorInfo*)pOperatorInfo->info;
|
||||||
|
@ -2823,7 +2818,6 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) {
|
||||||
} else if (pBlock->info.type == STREAM_CHECKPOINT) {
|
} else if (pBlock->info.type == STREAM_CHECKPOINT) {
|
||||||
doStreamIntervalSaveCheckpoint(pOperator);
|
doStreamIntervalSaveCheckpoint(pOperator);
|
||||||
pAPI->stateStore.streamStateCommit(pInfo->pState);
|
pAPI->stateStore.streamStateCommit(pInfo->pState);
|
||||||
setStreamDataVersion(pTaskInfo, pInfo->dataVersion, pInfo->pState->checkPointId);
|
|
||||||
copyDataBlock(pInfo->pCheckpointRes, pBlock);
|
copyDataBlock(pInfo->pCheckpointRes, pBlock);
|
||||||
pOperator->status = OP_RES_TO_RETURN;
|
pOperator->status = OP_RES_TO_RETURN;
|
||||||
qDebug("===stream===return data:%s. recv datablock num:%" PRIu64,
|
qDebug("===stream===return data:%s. recv datablock num:%" PRIu64,
|
||||||
|
@ -3086,7 +3080,6 @@ SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream,
|
||||||
if (res == TSDB_CODE_SUCCESS) {
|
if (res == TSDB_CODE_SUCCESS) {
|
||||||
doStreamIntervalDecodeOpState(buff, pOperator);
|
doStreamIntervalDecodeOpState(buff, pOperator);
|
||||||
taosMemoryFree(buff);
|
taosMemoryFree(buff);
|
||||||
setStreamDataVersion(pTaskInfo, pInfo->dataVersion, pInfo->pState->checkPointId);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
return pOperator;
|
return pOperator;
|
||||||
|
@ -3953,7 +3946,6 @@ static SSDataBlock* doStreamSessionAgg(SOperatorInfo* pOperator) {
|
||||||
} else if (pBlock->info.type == STREAM_CHECKPOINT) {
|
} else if (pBlock->info.type == STREAM_CHECKPOINT) {
|
||||||
doStreamSessionSaveCheckpoint(pOperator);
|
doStreamSessionSaveCheckpoint(pOperator);
|
||||||
pAggSup->stateStore.streamStateCommit(pAggSup->pState);
|
pAggSup->stateStore.streamStateCommit(pAggSup->pState);
|
||||||
setStreamDataVersion(pOperator->pTaskInfo, pInfo->dataVersion, pAggSup->pState->checkPointId);
|
|
||||||
copyDataBlock(pInfo->pCheckpointRes, pBlock);
|
copyDataBlock(pInfo->pCheckpointRes, pBlock);
|
||||||
continue;
|
continue;
|
||||||
} else {
|
} else {
|
||||||
|
@ -4154,7 +4146,6 @@ SOperatorInfo* createStreamSessionAggOperatorInfo(SOperatorInfo* downstream, SPh
|
||||||
if (res == TSDB_CODE_SUCCESS) {
|
if (res == TSDB_CODE_SUCCESS) {
|
||||||
doStreamSessionDecodeOpState(buff, pOperator);
|
doStreamSessionDecodeOpState(buff, pOperator);
|
||||||
taosMemoryFree(buff);
|
taosMemoryFree(buff);
|
||||||
setStreamDataVersion(pTaskInfo, pInfo->dataVersion, pInfo->streamAggSup.pState->checkPointId);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
setOperatorInfo(pOperator, "StreamSessionWindowAggOperator", QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION, true,
|
setOperatorInfo(pOperator, "StreamSessionWindowAggOperator", QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION, true,
|
||||||
|
@ -4256,7 +4247,6 @@ static SSDataBlock* doStreamSessionSemiAgg(SOperatorInfo* pOperator) {
|
||||||
} else if (pBlock->info.type == STREAM_CHECKPOINT) {
|
} else if (pBlock->info.type == STREAM_CHECKPOINT) {
|
||||||
doStreamSessionSaveCheckpoint(pOperator);
|
doStreamSessionSaveCheckpoint(pOperator);
|
||||||
pAggSup->stateStore.streamStateCommit(pAggSup->pState);
|
pAggSup->stateStore.streamStateCommit(pAggSup->pState);
|
||||||
setStreamDataVersion(pOperator->pTaskInfo, pInfo->dataVersion, pAggSup->pState->checkPointId);
|
|
||||||
pOperator->status = OP_RES_TO_RETURN;
|
pOperator->status = OP_RES_TO_RETURN;
|
||||||
continue;
|
continue;
|
||||||
} else {
|
} else {
|
||||||
|
@ -4681,7 +4671,6 @@ static SSDataBlock* doStreamStateAgg(SOperatorInfo* pOperator) {
|
||||||
} else if (pBlock->info.type == STREAM_CHECKPOINT) {
|
} else if (pBlock->info.type == STREAM_CHECKPOINT) {
|
||||||
doStreamSessionSaveCheckpoint(pOperator);
|
doStreamSessionSaveCheckpoint(pOperator);
|
||||||
pInfo->streamAggSup.stateStore.streamStateCommit(pInfo->streamAggSup.pState);
|
pInfo->streamAggSup.stateStore.streamStateCommit(pInfo->streamAggSup.pState);
|
||||||
setStreamDataVersion(pOperator->pTaskInfo, pInfo->dataVersion, pInfo->streamAggSup.pState->checkPointId);
|
|
||||||
copyDataBlock(pInfo->pCheckpointRes, pBlock);
|
copyDataBlock(pInfo->pCheckpointRes, pBlock);
|
||||||
continue;
|
continue;
|
||||||
} else {
|
} else {
|
||||||
|
@ -4878,7 +4867,6 @@ SOperatorInfo* createStreamStateAggOperatorInfo(SOperatorInfo* downstream, SPhys
|
||||||
if (res == TSDB_CODE_SUCCESS) {
|
if (res == TSDB_CODE_SUCCESS) {
|
||||||
doStreamStateDecodeOpState(buff, pOperator);
|
doStreamStateDecodeOpState(buff, pOperator);
|
||||||
taosMemoryFree(buff);
|
taosMemoryFree(buff);
|
||||||
setStreamDataVersion(pTaskInfo, pInfo->dataVersion, pInfo->streamAggSup.pState->checkPointId);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
setOperatorInfo(pOperator, "StreamStateAggOperator", QUERY_NODE_PHYSICAL_PLAN_STREAM_STATE, true, OP_NOT_OPENED,
|
setOperatorInfo(pOperator, "StreamStateAggOperator", QUERY_NODE_PHYSICAL_PLAN_STREAM_STATE, true, OP_NOT_OPENED,
|
||||||
|
@ -5548,7 +5536,6 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) {
|
||||||
} else if (pBlock->info.type == STREAM_CHECKPOINT) {
|
} else if (pBlock->info.type == STREAM_CHECKPOINT) {
|
||||||
doStreamIntervalSaveCheckpoint(pOperator);
|
doStreamIntervalSaveCheckpoint(pOperator);
|
||||||
pAPI->stateStore.streamStateCommit(pInfo->pState);
|
pAPI->stateStore.streamStateCommit(pInfo->pState);
|
||||||
setStreamDataVersion(pTaskInfo, pInfo->dataVersion, pInfo->pState->checkPointId);
|
|
||||||
pInfo->reCkBlock = true;
|
pInfo->reCkBlock = true;
|
||||||
copyDataBlock(pInfo->pCheckpointRes, pBlock);
|
copyDataBlock(pInfo->pCheckpointRes, pBlock);
|
||||||
qDebug("===stream===return data:single interval. recv datablock num:%" PRIu64, pInfo->numOfDatapack);
|
qDebug("===stream===return data:single interval. recv datablock num:%" PRIu64, pInfo->numOfDatapack);
|
||||||
|
@ -5735,7 +5722,6 @@ SOperatorInfo* createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SPhys
|
||||||
if (res == TSDB_CODE_SUCCESS) {
|
if (res == TSDB_CODE_SUCCESS) {
|
||||||
doStreamIntervalDecodeOpState(buff, pOperator);
|
doStreamIntervalDecodeOpState(buff, pOperator);
|
||||||
taosMemoryFree(buff);
|
taosMemoryFree(buff);
|
||||||
setStreamDataVersion(pTaskInfo, pInfo->dataVersion, pInfo->pState->checkPointId);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
initIntervalDownStream(downstream, pPhyNode->type, pInfo);
|
initIntervalDownStream(downstream, pPhyNode->type, pInfo);
|
||||||
|
|
|
@ -47,6 +47,7 @@ typedef struct {
|
||||||
void* streamBackendInit(const char* path);
|
void* streamBackendInit(const char* path);
|
||||||
void streamBackendCleanup(void* arg);
|
void streamBackendCleanup(void* arg);
|
||||||
void streamBackendHandleCleanup(void* arg);
|
void streamBackendHandleCleanup(void* arg);
|
||||||
|
int32_t streamBackendLoadCheckpointInfo(void* pMeta);
|
||||||
int32_t streamBackendDoCheckpoint(void* pMeta, uint64_t checkpointId);
|
int32_t streamBackendDoCheckpoint(void* pMeta, uint64_t checkpointId);
|
||||||
SListNode* streamBackendAddCompare(void* backend, void* arg);
|
SListNode* streamBackendAddCompare(void* backend, void* arg);
|
||||||
void streamBackendDelCompare(void* backend, void* arg);
|
void streamBackendDelCompare(void* backend, void* arg);
|
||||||
|
|
|
@ -393,7 +393,7 @@ int32_t delObsoleteCheckpoint(void* arg, const char* path) {
|
||||||
for (int i = 0; i < taosArrayGetSize(checkpointDel); i++) {
|
for (int i = 0; i < taosArrayGetSize(checkpointDel); i++) {
|
||||||
int64_t id = *(int64_t*)taosArrayGet(checkpointDel, i);
|
int64_t id = *(int64_t*)taosArrayGet(checkpointDel, i);
|
||||||
char tbuf[256] = {0};
|
char tbuf[256] = {0};
|
||||||
sprintf(tbuf, "%s/checkpoint_%" PRId64 "", path, id);
|
sprintf(tbuf, "%s/checkpoint-%" PRId64 "", path, id);
|
||||||
if (taosIsDir(tbuf)) {
|
if (taosIsDir(tbuf)) {
|
||||||
taosRemoveDir(tbuf);
|
taosRemoveDir(tbuf);
|
||||||
}
|
}
|
||||||
|
@ -402,11 +402,63 @@ int32_t delObsoleteCheckpoint(void* arg, const char* path) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static int32_t compareCheckpoint(const void* a, const void* b) {
|
||||||
|
int64_t x = *(int64_t*)a;
|
||||||
|
int64_t y = *(int64_t*)b;
|
||||||
|
return x < y ? -1 : 1;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t streamBackendLoadCheckpointInfo(void* arg) {
|
||||||
|
SStreamMeta* pMeta = arg;
|
||||||
|
int32_t code = 0;
|
||||||
|
|
||||||
|
int32_t len = strlen(pMeta->path) + 30;
|
||||||
|
char* checkpointPath = taosMemoryCalloc(1, len);
|
||||||
|
sprintf(checkpointPath, "%s/%s", pMeta->path, "checkpoints");
|
||||||
|
|
||||||
|
if (!taosDirExist(checkpointPath)) {
|
||||||
|
return 0;
|
||||||
|
// no checkpoint, nothing to load
|
||||||
|
}
|
||||||
|
|
||||||
|
TdDirPtr pDir = taosOpenDir(checkpointPath);
|
||||||
|
if (pDir == NULL) return 0;
|
||||||
|
|
||||||
|
TdDirEntryPtr de = NULL;
|
||||||
|
SArray* suffix = taosArrayInit(4, sizeof(int64_t));
|
||||||
|
|
||||||
|
while ((de = taosReadDir(pDir)) != NULL) {
|
||||||
|
if (strcmp(taosGetDirEntryName(de), ".") == 0 || strcmp(taosGetDirEntryName(de), "..") == 0) continue;
|
||||||
|
|
||||||
|
if (taosDirEntryIsDir(de)) {
|
||||||
|
char checkpointPrefix[32] = {0};
|
||||||
|
int64_t checkpointId = 0;
|
||||||
|
|
||||||
|
int ret = sscanf(taosGetDirEntryName(de), "checkpoint-%" PRId64 "", &checkpointId);
|
||||||
|
if (ret == 1) {
|
||||||
|
taosArrayPush(suffix, &checkpointId);
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
taosArraySort(suffix, compareCheckpoint);
|
||||||
|
|
||||||
|
for (int i = 0; i < taosArrayGetSize(suffix); i++) {
|
||||||
|
int64_t id = *(int64_t*)taosArrayGet(suffix, i);
|
||||||
|
taosArrayPush(pMeta->checkpointSaved, &id);
|
||||||
|
}
|
||||||
|
|
||||||
|
taosArrayDestroy(suffix);
|
||||||
|
taosCloseDir(&pDir);
|
||||||
|
taosMemoryFree(checkpointPath);
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
int32_t streamBackendDoCheckpoint(void* arg, uint64_t checkpointId) {
|
int32_t streamBackendDoCheckpoint(void* arg, uint64_t checkpointId) {
|
||||||
SStreamMeta* pMeta = arg;
|
SStreamMeta* pMeta = arg;
|
||||||
int64_t backendRid = pMeta->streamBackendRid;
|
int64_t backendRid = pMeta->streamBackendRid;
|
||||||
int64_t st = taosGetTimestampMs();
|
int64_t st = taosGetTimestampMs();
|
||||||
int32_t code = -1;
|
int32_t code = -1;
|
||||||
|
|
||||||
char path[256] = {0};
|
char path[256] = {0};
|
||||||
sprintf(path, "%s/%s", pMeta->path, "checkpoints");
|
sprintf(path, "%s/%s", pMeta->path, "checkpoints");
|
||||||
|
@ -417,7 +469,7 @@ int32_t streamBackendDoCheckpoint(void* arg, uint64_t checkpointId) {
|
||||||
}
|
}
|
||||||
|
|
||||||
char checkpointDir[256] = {0};
|
char checkpointDir[256] = {0};
|
||||||
snprintf(checkpointDir, tListLen(checkpointDir),"%s/checkpoint_%" PRIu64, path, checkpointId);
|
snprintf(checkpointDir, tListLen(checkpointDir), "%s/checkpoint-%" PRId64, path, checkpointId);
|
||||||
|
|
||||||
SBackendWrapper* pHandle = taosAcquireRef(streamBackendId, backendRid);
|
SBackendWrapper* pHandle = taosAcquireRef(streamBackendId, backendRid);
|
||||||
if (pHandle == NULL) {
|
if (pHandle == NULL) {
|
||||||
|
@ -1203,8 +1255,8 @@ bool streamStateIterSeekAndValid(rocksdb_iterator_t* iter, char* buf, size_t len
|
||||||
}
|
}
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
rocksdb_iterator_t* streamStateIterCreate(SStreamState* pState, const char* pChkptFileName, rocksdb_snapshot_t** snapshot,
|
rocksdb_iterator_t* streamStateIterCreate(SStreamState* pState, const char* pChkptFileName,
|
||||||
rocksdb_readoptions_t** readOpt) {
|
rocksdb_snapshot_t** snapshot, rocksdb_readoptions_t** readOpt) {
|
||||||
int idx = streamStateGetCfIdx(pState, pChkptFileName);
|
int idx = streamStateGetCfIdx(pState, pChkptFileName);
|
||||||
|
|
||||||
SBackendCfWrapper* wrapper = pState->pTdbState->pBackendCfWrapper;
|
SBackendCfWrapper* wrapper = pState->pTdbState->pBackendCfWrapper;
|
||||||
|
|
|
@ -18,8 +18,6 @@
|
||||||
// maximum allowed processed block batches. One block may include several submit blocks
|
// maximum allowed processed block batches. One block may include several submit blocks
|
||||||
#define MAX_STREAM_RESULT_DUMP_THRESHOLD 100
|
#define MAX_STREAM_RESULT_DUMP_THRESHOLD 100
|
||||||
|
|
||||||
static int32_t updateCheckPointInfo(SStreamTask* pTask, int64_t checkpointId);
|
|
||||||
|
|
||||||
bool streamTaskShouldStop(const SStreamStatus* pStatus) {
|
bool streamTaskShouldStop(const SStreamStatus* pStatus) {
|
||||||
int32_t status = atomic_load_8((int8_t*)&pStatus->taskStatus);
|
int32_t status = atomic_load_8((int8_t*)&pStatus->taskStatus);
|
||||||
return (status == TASK_STATUS__STOP) || (status == TASK_STATUS__DROPPING);
|
return (status == TASK_STATUS__STOP) || (status == TASK_STATUS__DROPPING);
|
||||||
|
|
|
@ -100,6 +100,7 @@ SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandF
|
||||||
if (pMeta->streamBackend == NULL) {
|
if (pMeta->streamBackend == NULL) {
|
||||||
goto _err;
|
goto _err;
|
||||||
}
|
}
|
||||||
|
|
||||||
pMeta->streamBackendRid = taosAddRef(streamBackendId, pMeta->streamBackend);
|
pMeta->streamBackendRid = taosAddRef(streamBackendId, pMeta->streamBackend);
|
||||||
pMeta->pTaskBackendUnique =
|
pMeta->pTaskBackendUnique =
|
||||||
taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK);
|
taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK);
|
||||||
|
@ -108,6 +109,12 @@ SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandF
|
||||||
pMeta->checkpointCap = 4;
|
pMeta->checkpointCap = 4;
|
||||||
taosInitRWLatch(&pMeta->checkpointDirLock);
|
taosInitRWLatch(&pMeta->checkpointDirLock);
|
||||||
|
|
||||||
|
code = streamBackendLoadCheckpointInfo(pMeta);
|
||||||
|
if (code != 0) {
|
||||||
|
terrno = TAOS_SYSTEM_ERROR(code);
|
||||||
|
goto _err;
|
||||||
|
}
|
||||||
|
|
||||||
taosMemoryFree(streamPath);
|
taosMemoryFree(streamPath);
|
||||||
|
|
||||||
taosInitRWLatch(&pMeta->lock);
|
taosInitRWLatch(&pMeta->lock);
|
||||||
|
@ -310,7 +317,7 @@ void streamMetaRemoveTask(SStreamMeta* pMeta, int32_t taskId) {
|
||||||
|
|
||||||
qDebug("s-task:0x%x set task status:%s", taskId, streamGetTaskStatusStr(TASK_STATUS__DROPPING));
|
qDebug("s-task:0x%x set task status:%s", taskId, streamGetTaskStatusStr(TASK_STATUS__DROPPING));
|
||||||
|
|
||||||
while(1) {
|
while (1) {
|
||||||
taosRLockLatch(&pMeta->lock);
|
taosRLockLatch(&pMeta->lock);
|
||||||
ppTask = (SStreamTask**)taosHashGet(pMeta->pTasks, &taskId, sizeof(int32_t));
|
ppTask = (SStreamTask**)taosHashGet(pMeta->pTasks, &taskId, sizeof(int32_t));
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue