fix stream transfer error

This commit is contained in:
yihaoDeng 2023-11-08 17:10:54 +08:00
parent 7b0891981e
commit 76fcc49c6d
3 changed files with 66 additions and 46 deletions

View File

@ -152,7 +152,7 @@ void streamFreeQitem(SStreamQueueItem* data);
int32_t streamQueueGetItemSize(const SStreamQueue* pQueue); int32_t streamQueueGetItemSize(const SStreamQueue* pQueue);
//#define CHECKPOINT_PATH_LEN 128 //#define CHECKPOINT_PATH_LEN 128
//typedef struct SChekpointDataHeader{ // typedef struct SChekpointDataHeader{
// int64_t size; // int64_t size;
// char name[CHECKPOINT_PATH_LEN]; // char name[CHECKPOINT_PATH_LEN];
// char id[CHECKPOINT_PATH_LEN]; // char id[CHECKPOINT_PATH_LEN];
@ -162,6 +162,9 @@ int uploadCheckpoint(char* id, char* path);
int downloadCheckpoint(char* id, char* path); int downloadCheckpoint(char* id, char* path);
int deleteCheckpoint(char* id); int deleteCheckpoint(char* id);
typedef int32_t (*__stream_async_exec_fn_t)(void* param);
int32_t streamMetaAsyncExec(SStreamMeta* pMeta, __stream_async_exec_fn_t fn, void* param, int32_t* code);
#ifdef __cplusplus #ifdef __cplusplus
} }
#endif #endif

View File

@ -13,8 +13,8 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>. * along with this program. If not, see <http://www.gnu.org/licenses/>.
*/ */
#include "streamInt.h"
#include "rsync.h" #include "rsync.h"
#include "streamInt.h"
int32_t tEncodeStreamCheckpointSourceReq(SEncoder* pEncoder, const SStreamCheckpointSourceReq* pReq) { int32_t tEncodeStreamCheckpointSourceReq(SEncoder* pEncoder, const SStreamCheckpointSourceReq* pReq) {
if (tStartEncode(pEncoder) < 0) return -1; if (tStartEncode(pEncoder) < 0) return -1;
@ -199,11 +199,11 @@ int32_t streamProcessCheckpointBlock(SStreamTask* pTask, SStreamDataBlock* pBloc
if (type == TASK_OUTPUT__FIXED_DISPATCH || type == TASK_OUTPUT__SHUFFLE_DISPATCH) { if (type == TASK_OUTPUT__FIXED_DISPATCH || type == TASK_OUTPUT__SHUFFLE_DISPATCH) {
stDebug("s-task:%s set childIdx:%d, and add checkpoint-trigger block into outputQ", id, pTask->info.selfChildId); stDebug("s-task:%s set childIdx:%d, and add checkpoint-trigger block into outputQ", id, pTask->info.selfChildId);
continueDispatchCheckpointBlock(pBlock, pTask); continueDispatchCheckpointBlock(pBlock, pTask);
} else { // only one task exists, no need to dispatch downstream info } else { // only one task exists, no need to dispatch downstream info
atomic_add_fetch_32(&pTask->checkpointNotReadyTasks, 1); atomic_add_fetch_32(&pTask->checkpointNotReadyTasks, 1);
streamProcessCheckpointReadyMsg(pTask); streamProcessCheckpointReadyMsg(pTask);
streamFreeQitem((SStreamQueueItem*)pBlock); streamFreeQitem((SStreamQueueItem*)pBlock);
} }
} else if (taskLevel == TASK_LEVEL__SINK || taskLevel == TASK_LEVEL__AGG) { } else if (taskLevel == TASK_LEVEL__SINK || taskLevel == TASK_LEVEL__AGG) {
ASSERT(taosArrayGetSize(pTask->upstreamInfo.pList) > 0); ASSERT(taosArrayGetSize(pTask->upstreamInfo.pList) > 0);
if (pTask->chkInfo.startTs == 0) { if (pTask->chkInfo.startTs == 0) {
@ -229,7 +229,7 @@ int32_t streamProcessCheckpointBlock(SStreamTask* pTask, SStreamDataBlock* pBloc
id, num); id, num);
streamFreeQitem((SStreamQueueItem*)pBlock); streamFreeQitem((SStreamQueueItem*)pBlock);
streamTaskBuildCheckpoint(pTask); streamTaskBuildCheckpoint(pTask);
} else { // source & agg tasks need to forward the checkpoint msg downwards } else { // source & agg tasks need to forward the checkpoint msg downwards
stDebug("s-task:%s process checkpoint block, all %d upstreams sent checkpoint msgs, continue forwards msg", id, stDebug("s-task:%s process checkpoint block, all %d upstreams sent checkpoint msgs, continue forwards msg", id,
num); num);
@ -331,29 +331,29 @@ int32_t streamSaveAllTaskStatus(SStreamMeta* pMeta, SStreamTask* p, int64_t chec
return code; return code;
} }
int32_t streamTaskBuildCheckpoint(SStreamTask* pTask) { int32_t streamTaskBuildCheckpointImpl(void* arg) {
int32_t code = 0; int32_t code = 0;
SStreamTask* pTask = arg;
// check for all tasks, and do generate the vnode-wide checkpoint data. // check for all tasks, and do generate the vnode-wide checkpoint data.
SStreamMeta* pMeta = pTask->pMeta; SStreamMeta* pMeta = pTask->pMeta;
// int32_t remain = atomic_sub_fetch_32(&pMeta->chkptNotReadyTasks, 1); // int32_t remain = atomic_sub_fetch_32(&pMeta->chkptNotReadyTasks, 1);
// ASSERT(remain >= 0); // ASSERT(remain >= 0);
double el = (taosGetTimestampMs() - pTask->chkInfo.startTs) / 1000.0; double el = (taosGetTimestampMs() - pTask->chkInfo.startTs) / 1000.0;
// if (remain == 0) { // all tasks are ready // if (remain == 0) { // all tasks are ready
stDebug("s-task:%s all downstreams are ready, ready for do checkpoint", pTask->id.idStr); stDebug("s-task:%s all downstreams are ready, ready for do checkpoint", pTask->id.idStr);
streamBackendDoCheckpoint(pTask->pBackend, pTask->checkpointingId); streamBackendDoCheckpoint(pTask->pBackend, pTask->checkpointingId);
streamSaveAllTaskStatus(pMeta, pTask, pTask->checkpointingId); streamSaveAllTaskStatus(pMeta, pTask, pTask->checkpointingId);
stInfo( stInfo(
"vgId:%d vnode wide checkpoint completed, save all tasks status, last:%s, level:%d elapsed time:%.2f Sec " "vgId:%d vnode wide checkpoint completed, save all tasks status, last:%s, level:%d elapsed time:%.2f Sec "
"checkpointId:%" PRId64, "checkpointId:%" PRId64,
pMeta->vgId, pTask->id.idStr, pTask->info.taskLevel, el, pTask->checkpointingId); pMeta->vgId, pTask->id.idStr, pTask->info.taskLevel, el, pTask->checkpointingId);
// } else { // } else {
// stInfo( // stInfo(
// "vgId:%d vnode wide tasks not reach checkpoint ready status, ready s-task:%s, level:%d elapsed time:%.2f Sec " // "vgId:%d vnode wide tasks not reach checkpoint ready status, ready s-task:%s, level:%d elapsed time:%.2f Sec
// "not ready:%d/%d", // " "not ready:%d/%d", pMeta->vgId, pTask->id.idStr, pTask->info.taskLevel, el, remain,
// pMeta->vgId, pTask->id.idStr, pTask->info.taskLevel, el, remain, pMeta->numOfStreamTasks); // pMeta->numOfStreamTasks);
// } // }
// send check point response to upstream task // send check point response to upstream task
if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) { if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) {
@ -373,14 +373,17 @@ int32_t streamTaskBuildCheckpoint(SStreamTask* pTask) {
return code; return code;
} }
int32_t streamTaskBuildCheckpoint(SStreamTask* pTask) {
int32_t code = 0;
return streamMetaAsyncExec(pTask->pMeta, streamTaskBuildCheckpointImpl, pTask, NULL);
}
// static int64_t kBlockSize = 64 * 1024;
//static int64_t kBlockSize = 64 * 1024; // static int sendCheckpointToS3(char* id, SArray* fileList){
//static int sendCheckpointToS3(char* id, SArray* fileList){
// code = s3PutObjectFromFile2(from->fname, object_name); // code = s3PutObjectFromFile2(from->fname, object_name);
// return 0; // return 0;
//} //}
//static int sendCheckpointToSnode(char* id, SArray* fileList){ // static int sendCheckpointToSnode(char* id, SArray* fileList){
// if(strlen(id) >= CHECKPOINT_PATH_LEN){ // if(strlen(id) >= CHECKPOINT_PATH_LEN){
// tqError("uploadCheckpoint id name too long, name:%s", id); // tqError("uploadCheckpoint id name too long, name:%s", id);
// return -1; // return -1;
@ -466,41 +469,38 @@ int32_t streamTaskBuildCheckpoint(SStreamTask* pTask) {
//} //}
int uploadCheckpoint(char* id, char* path){ int uploadCheckpoint(char* id, char* path) {
if(id == NULL || path == NULL || strlen(id) == 0 || strlen(path) == 0 || strlen(path) >= PATH_MAX){ if (id == NULL || path == NULL || strlen(id) == 0 || strlen(path) == 0 || strlen(path) >= PATH_MAX) {
stError("uploadCheckpoint parameters invalid"); stError("uploadCheckpoint parameters invalid");
return -1; return -1;
} }
if(strlen(tsSnodeIp) != 0){ if (strlen(tsSnodeIp) != 0) {
uploadRsync(id, path); uploadRsync(id, path);
// }else if(tsS3StreamEnabled){ // }else if(tsS3StreamEnabled){
} }
return 0; return 0;
} }
int downloadCheckpoint(char* id, char* path){ int downloadCheckpoint(char* id, char* path) {
if(id == NULL || path == NULL || strlen(id) == 0 || strlen(path) == 0 || strlen(path) >= PATH_MAX){ if (id == NULL || path == NULL || strlen(id) == 0 || strlen(path) == 0 || strlen(path) >= PATH_MAX) {
stError("downloadCheckpoint parameters invalid"); stError("downloadCheckpoint parameters invalid");
return -1; return -1;
} }
if(strlen(tsSnodeIp) != 0){ if (strlen(tsSnodeIp) != 0) {
downloadRsync(id, path); downloadRsync(id, path);
// }else if(tsS3StreamEnabled){ // }else if(tsS3StreamEnabled){
} }
return 0; return 0;
} }
int deleteCheckpoint(char* id){ int deleteCheckpoint(char* id) {
if(id == NULL || strlen(id) == 0){ if (id == NULL || strlen(id) == 0) {
stError("deleteCheckpoint parameters invalid"); stError("deleteCheckpoint parameters invalid");
return -1; return -1;
} }
if(strlen(tsSnodeIp) != 0){ if (strlen(tsSnodeIp) != 0) {
deleteRsync(id); deleteRsync(id);
// }else if(tsS3StreamEnabled){ // }else if(tsS3StreamEnabled){
} }
return 0; return 0;
} }

View File

@ -490,6 +490,7 @@ void streamMetaCloseImpl(void* arg) {
taosMemoryFree(pMeta->path); taosMemoryFree(pMeta->path);
taosThreadMutexDestroy(&pMeta->backendMutex); taosThreadMutexDestroy(&pMeta->backendMutex);
taosCleanUpScheduler(pMeta->qHandle);
pMeta->role = NODE_ROLE_UNINIT; pMeta->role = NODE_ROLE_UNINIT;
taosMemoryFree(pMeta); taosMemoryFree(pMeta);
stDebug("end to close stream meta"); stDebug("end to close stream meta");
@ -1261,3 +1262,19 @@ void streamMetaWUnLock(SStreamMeta* pMeta) {
stTrace("vgId:%d meta-wunlock", pMeta->vgId); stTrace("vgId:%d meta-wunlock", pMeta->vgId);
taosWUnLockLatch(&pMeta->lock); taosWUnLockLatch(&pMeta->lock);
} }
static void execHelper(struct SSchedMsg* pSchedMsg) {
__async_exec_fn_t execFn = (__async_exec_fn_t)pSchedMsg->ahandle;
int32_t code = execFn(pSchedMsg->thandle);
if (code != 0 && pSchedMsg->msg != NULL) {
*(int32_t*)pSchedMsg->msg = code;
}
}
int32_t streamMetaAsyncExec(SStreamMeta* pMeta, __stream_async_exec_fn_t fn, void* param, int32_t* code) {
SSchedMsg schedMsg = {0};
schedMsg.fp = execHelper;
schedMsg.ahandle = fn;
schedMsg.thandle = param;
schedMsg.msg = code;
return taosScheduleTask(pMeta->qHandle, &schedMsg);
}