diff --git a/source/libs/stream/inc/streamInt.h b/source/libs/stream/inc/streamInt.h index 095461bd92..418c4c2848 100644 --- a/source/libs/stream/inc/streamInt.h +++ b/source/libs/stream/inc/streamInt.h @@ -152,7 +152,7 @@ void streamFreeQitem(SStreamQueueItem* data); int32_t streamQueueGetItemSize(const SStreamQueue* pQueue); //#define CHECKPOINT_PATH_LEN 128 -//typedef struct SChekpointDataHeader{ +// typedef struct SChekpointDataHeader{ // int64_t size; // char name[CHECKPOINT_PATH_LEN]; // char id[CHECKPOINT_PATH_LEN]; @@ -162,6 +162,9 @@ int uploadCheckpoint(char* id, char* path); int downloadCheckpoint(char* id, char* path); int deleteCheckpoint(char* id); +typedef int32_t (*__stream_async_exec_fn_t)(void* param); + +int32_t streamMetaAsyncExec(SStreamMeta* pMeta, __stream_async_exec_fn_t fn, void* param, int32_t* code); #ifdef __cplusplus } #endif diff --git a/source/libs/stream/src/streamCheckpoint.c b/source/libs/stream/src/streamCheckpoint.c index bf2c89bea4..3f1be03e75 100644 --- a/source/libs/stream/src/streamCheckpoint.c +++ b/source/libs/stream/src/streamCheckpoint.c @@ -13,8 +13,8 @@ * along with this program. If not, see . */ -#include "streamInt.h" #include "rsync.h" +#include "streamInt.h" int32_t tEncodeStreamCheckpointSourceReq(SEncoder* pEncoder, const SStreamCheckpointSourceReq* pReq) { if (tStartEncode(pEncoder) < 0) return -1; @@ -199,11 +199,11 @@ int32_t streamProcessCheckpointBlock(SStreamTask* pTask, SStreamDataBlock* pBloc if (type == TASK_OUTPUT__FIXED_DISPATCH || type == TASK_OUTPUT__SHUFFLE_DISPATCH) { stDebug("s-task:%s set childIdx:%d, and add checkpoint-trigger block into outputQ", id, pTask->info.selfChildId); continueDispatchCheckpointBlock(pBlock, pTask); - } else { // only one task exists, no need to dispatch downstream info - atomic_add_fetch_32(&pTask->checkpointNotReadyTasks, 1); - streamProcessCheckpointReadyMsg(pTask); - streamFreeQitem((SStreamQueueItem*)pBlock); - } + } else { // only one task exists, no need to dispatch downstream info + atomic_add_fetch_32(&pTask->checkpointNotReadyTasks, 1); + streamProcessCheckpointReadyMsg(pTask); + streamFreeQitem((SStreamQueueItem*)pBlock); + } } else if (taskLevel == TASK_LEVEL__SINK || taskLevel == TASK_LEVEL__AGG) { ASSERT(taosArrayGetSize(pTask->upstreamInfo.pList) > 0); if (pTask->chkInfo.startTs == 0) { @@ -229,7 +229,7 @@ int32_t streamProcessCheckpointBlock(SStreamTask* pTask, SStreamDataBlock* pBloc id, num); streamFreeQitem((SStreamQueueItem*)pBlock); streamTaskBuildCheckpoint(pTask); - } else { // source & agg tasks need to forward the checkpoint msg downwards + } else { // source & agg tasks need to forward the checkpoint msg downwards stDebug("s-task:%s process checkpoint block, all %d upstreams sent checkpoint msgs, continue forwards msg", id, num); @@ -331,29 +331,29 @@ int32_t streamSaveAllTaskStatus(SStreamMeta* pMeta, SStreamTask* p, int64_t chec return code; } -int32_t streamTaskBuildCheckpoint(SStreamTask* pTask) { - int32_t code = 0; - +int32_t streamTaskBuildCheckpointImpl(void* arg) { + int32_t code = 0; + SStreamTask* pTask = arg; // check for all tasks, and do generate the vnode-wide checkpoint data. SStreamMeta* pMeta = pTask->pMeta; -// int32_t remain = atomic_sub_fetch_32(&pMeta->chkptNotReadyTasks, 1); -// ASSERT(remain >= 0); + // int32_t remain = atomic_sub_fetch_32(&pMeta->chkptNotReadyTasks, 1); + // ASSERT(remain >= 0); double el = (taosGetTimestampMs() - pTask->chkInfo.startTs) / 1000.0; -// if (remain == 0) { // all tasks are ready - stDebug("s-task:%s all downstreams are ready, ready for do checkpoint", pTask->id.idStr); - streamBackendDoCheckpoint(pTask->pBackend, pTask->checkpointingId); - streamSaveAllTaskStatus(pMeta, pTask, pTask->checkpointingId); - stInfo( - "vgId:%d vnode wide checkpoint completed, save all tasks status, last:%s, level:%d elapsed time:%.2f Sec " - "checkpointId:%" PRId64, - pMeta->vgId, pTask->id.idStr, pTask->info.taskLevel, el, pTask->checkpointingId); -// } else { -// stInfo( -// "vgId:%d vnode wide tasks not reach checkpoint ready status, ready s-task:%s, level:%d elapsed time:%.2f Sec " -// "not ready:%d/%d", -// pMeta->vgId, pTask->id.idStr, pTask->info.taskLevel, el, remain, pMeta->numOfStreamTasks); -// } + // if (remain == 0) { // all tasks are ready + stDebug("s-task:%s all downstreams are ready, ready for do checkpoint", pTask->id.idStr); + streamBackendDoCheckpoint(pTask->pBackend, pTask->checkpointingId); + streamSaveAllTaskStatus(pMeta, pTask, pTask->checkpointingId); + stInfo( + "vgId:%d vnode wide checkpoint completed, save all tasks status, last:%s, level:%d elapsed time:%.2f Sec " + "checkpointId:%" PRId64, + pMeta->vgId, pTask->id.idStr, pTask->info.taskLevel, el, pTask->checkpointingId); + // } else { + // stInfo( + // "vgId:%d vnode wide tasks not reach checkpoint ready status, ready s-task:%s, level:%d elapsed time:%.2f Sec + // " "not ready:%d/%d", pMeta->vgId, pTask->id.idStr, pTask->info.taskLevel, el, remain, + // pMeta->numOfStreamTasks); + // } // send check point response to upstream task if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) { @@ -373,14 +373,17 @@ int32_t streamTaskBuildCheckpoint(SStreamTask* pTask) { return code; } +int32_t streamTaskBuildCheckpoint(SStreamTask* pTask) { + int32_t code = 0; + return streamMetaAsyncExec(pTask->pMeta, streamTaskBuildCheckpointImpl, pTask, NULL); +} - -//static int64_t kBlockSize = 64 * 1024; -//static int sendCheckpointToS3(char* id, SArray* fileList){ +// static int64_t kBlockSize = 64 * 1024; +// static int sendCheckpointToS3(char* id, SArray* fileList){ // code = s3PutObjectFromFile2(from->fname, object_name); // return 0; //} -//static int sendCheckpointToSnode(char* id, SArray* fileList){ +// static int sendCheckpointToSnode(char* id, SArray* fileList){ // if(strlen(id) >= CHECKPOINT_PATH_LEN){ // tqError("uploadCheckpoint id name too long, name:%s", id); // return -1; @@ -466,41 +469,38 @@ int32_t streamTaskBuildCheckpoint(SStreamTask* pTask) { //} -int uploadCheckpoint(char* id, char* path){ - if(id == NULL || path == NULL || strlen(id) == 0 || strlen(path) == 0 || strlen(path) >= PATH_MAX){ +int uploadCheckpoint(char* id, char* path) { + if (id == NULL || path == NULL || strlen(id) == 0 || strlen(path) == 0 || strlen(path) >= PATH_MAX) { stError("uploadCheckpoint parameters invalid"); return -1; } - if(strlen(tsSnodeIp) != 0){ + if (strlen(tsSnodeIp) != 0) { uploadRsync(id, path); -// }else if(tsS3StreamEnabled){ - + // }else if(tsS3StreamEnabled){ } return 0; } -int downloadCheckpoint(char* id, char* path){ - if(id == NULL || path == NULL || strlen(id) == 0 || strlen(path) == 0 || strlen(path) >= PATH_MAX){ +int downloadCheckpoint(char* id, char* path) { + if (id == NULL || path == NULL || strlen(id) == 0 || strlen(path) == 0 || strlen(path) >= PATH_MAX) { stError("downloadCheckpoint parameters invalid"); return -1; } - if(strlen(tsSnodeIp) != 0){ + if (strlen(tsSnodeIp) != 0) { downloadRsync(id, path); -// }else if(tsS3StreamEnabled){ - + // }else if(tsS3StreamEnabled){ } return 0; } -int deleteCheckpoint(char* id){ - if(id == NULL || strlen(id) == 0){ +int deleteCheckpoint(char* id) { + if (id == NULL || strlen(id) == 0) { stError("deleteCheckpoint parameters invalid"); return -1; } - if(strlen(tsSnodeIp) != 0){ + if (strlen(tsSnodeIp) != 0) { deleteRsync(id); -// }else if(tsS3StreamEnabled){ - + // }else if(tsS3StreamEnabled){ } return 0; } diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index 5e5166cc34..4b344478a1 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -490,6 +490,7 @@ void streamMetaCloseImpl(void* arg) { taosMemoryFree(pMeta->path); taosThreadMutexDestroy(&pMeta->backendMutex); + taosCleanUpScheduler(pMeta->qHandle); pMeta->role = NODE_ROLE_UNINIT; taosMemoryFree(pMeta); stDebug("end to close stream meta"); @@ -1261,3 +1262,19 @@ void streamMetaWUnLock(SStreamMeta* pMeta) { stTrace("vgId:%d meta-wunlock", pMeta->vgId); taosWUnLockLatch(&pMeta->lock); } +static void execHelper(struct SSchedMsg* pSchedMsg) { + __async_exec_fn_t execFn = (__async_exec_fn_t)pSchedMsg->ahandle; + int32_t code = execFn(pSchedMsg->thandle); + if (code != 0 && pSchedMsg->msg != NULL) { + *(int32_t*)pSchedMsg->msg = code; + } +} + +int32_t streamMetaAsyncExec(SStreamMeta* pMeta, __stream_async_exec_fn_t fn, void* param, int32_t* code) { + SSchedMsg schedMsg = {0}; + schedMsg.fp = execHelper; + schedMsg.ahandle = fn; + schedMsg.thandle = param; + schedMsg.msg = code; + return taosScheduleTask(pMeta->qHandle, &schedMsg); +}