diff --git a/cmake/cmake.version b/cmake/cmake.version index 4abc854e71..5bb1c61ac2 100644 --- a/cmake/cmake.version +++ b/cmake/cmake.version @@ -2,7 +2,7 @@ IF (DEFINED VERNUMBER) SET(TD_VER_NUMBER ${VERNUMBER}) ELSE () - SET(TD_VER_NUMBER "3.2.4.0.alpha") + SET(TD_VER_NUMBER "3.3.1.0.alpha") ENDIF () IF (DEFINED VERCOMPATIBLE) diff --git a/docs/en/28-releases/01-tdengine.md b/docs/en/28-releases/01-tdengine.md index 902e62de73..d4487e6148 100644 --- a/docs/en/28-releases/01-tdengine.md +++ b/docs/en/28-releases/01-tdengine.md @@ -10,6 +10,10 @@ For TDengine 2.x installation packages by version, please visit [here](https://t import Release from "/components/ReleaseV3"; +## 3.3.0.0 + + + ## 3.2.3.0 diff --git a/docs/zh/28-releases/01-tdengine.md b/docs/zh/28-releases/01-tdengine.md index 1c51f934fe..93b24c9b69 100644 --- a/docs/zh/28-releases/01-tdengine.md +++ b/docs/zh/28-releases/01-tdengine.md @@ -10,6 +10,10 @@ TDengine 2.x 各版本安装包请访问[这里](https://www.taosdata.com/all-do import Release from "/components/ReleaseV3"; +## 3.3.0.0 + + + ## 3.2.3.0 diff --git a/include/common/rsync.h b/include/common/rsync.h index 6cce645d1e..d570311694 100644 --- a/include/common/rsync.h +++ b/include/common/rsync.h @@ -13,9 +13,9 @@ extern "C" { void stopRsync(); void startRsync(); -int uploadRsync(char* id, char* path); -int downloadRsync(char* id, char* path); -int deleteRsync(char* id); +int uploadRsync(const char* id, const char* path); +int downloadRsync(const char* id, const char* path); +int deleteRsync(const char* id); #ifdef __cplusplus } diff --git a/include/dnode/vnode/tqCommon.h b/include/dnode/vnode/tqCommon.h index cb616f7afc..ce04ec6953 100644 --- a/include/dnode/vnode/tqCommon.h +++ b/include/dnode/vnode/tqCommon.h @@ -33,7 +33,6 @@ int32_t tqStreamTaskProcessDeployReq(SStreamMeta* pMeta, SMsgCb* cb, int64_t sve bool isLeader, bool restored); int32_t tqStreamTaskProcessDropReq(SStreamMeta* pMeta, char* msg, int32_t msgLen); int32_t tqStreamTaskProcessRunReq(SStreamMeta* pMeta, SRpcMsg* pMsg, bool isLeader); -int32_t tqStreamTaskResetStatus(SStreamMeta* pMeta); int32_t tqStartTaskCompleteCallback(SStreamMeta* pMeta); int32_t tqStreamTasksGetTotalNum(SStreamMeta* pMeta); int32_t tqStreamTaskProcessTaskResetReq(SStreamMeta* pMeta, SRpcMsg* pMsg); diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index 14aae0b96a..1db62abfc0 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -530,6 +530,7 @@ typedef struct SStreamMeta { int32_t vgId; int64_t stage; int32_t role; + bool closeFlag; bool sendMsgBeforeClosing; // send hb to mnode before close all tasks when switch to follower. STaskStartInfo startInfo; TdThreadRwlock lock; @@ -553,6 +554,12 @@ typedef struct SStreamMeta { void* bkdChkptMgt; } SStreamMeta; +typedef struct STaskUpdateEntry { + int64_t streamId; + int32_t taskId; + int32_t transId; +} STaskUpdateEntry; + int32_t tEncodeStreamEpInfo(SEncoder* pEncoder, const SStreamChildEpInfo* pInfo); int32_t tDecodeStreamEpInfo(SDecoder* pDecoder, SStreamChildEpInfo* pInfo); @@ -792,7 +799,12 @@ SStreamChildEpInfo* streamTaskGetUpstreamTaskEpInfo(SStreamTask* pTask, int32_t void streamTaskInputFail(SStreamTask* pTask); int32_t streamExecTask(SStreamTask* pTask); int32_t streamResumeTask(SStreamTask* pTask); -int32_t streamSchedExec(SStreamTask* pTask); +int32_t streamTrySchedExec(SStreamTask* pTask); +int32_t streamTaskSchedTask(SMsgCb* pMsgCb, int32_t vgId, int64_t streamId, int32_t taskId, int32_t execType); +int32_t streamTaskResumeInFuture(SStreamTask* pTask); +void streamTaskClearSchedIdleInfo(SStreamTask* pTask); +void streamTaskSetIdleInfo(SStreamTask* pTask, int32_t idleTime); + bool streamTaskShouldStop(const SStreamTask* pStatus); bool streamTaskShouldPause(const SStreamTask* pStatus); bool streamTaskIsIdle(const SStreamTask* pTask); @@ -888,6 +900,8 @@ int32_t streamMetaAddTaskLaunchResult(SStreamMeta* pMeta, int64_t streamId, int64_t endTs, bool ready); int32_t streamMetaResetTaskStatus(SStreamMeta* pMeta); int32_t streamMetaAddFailedTask(SStreamMeta* pMeta, int64_t streamId, int32_t taskId); +void streamMetaAddIntoUpdateTaskList(SStreamMeta* pMeta, SStreamTask* pTask, SStreamTask* pHTask, int32_t transId, + int64_t startTs); void streamMetaRLock(SStreamMeta* pMeta); void streamMetaRUnLock(SStreamMeta* pMeta); diff --git a/source/common/src/rsync.c b/source/common/src/rsync.c index ffab85761e..7aec0077e7 100644 --- a/source/common/src/rsync.c +++ b/source/common/src/rsync.c @@ -9,7 +9,7 @@ #define ERRNO_ERR_DATA errno,strerror(errno) // deleteRsync function produce empty directories, traverse base directory to remove them -static void removeEmptyDir(){ +static void removeEmptyDir() { TdDirPtr pDir = taosOpenDir(tsCheckpointBackupDir); if (pDir == NULL) return; @@ -53,7 +53,7 @@ static void changeDirFromWindowsToLinux(char* from, char* to){ } #endif -static int generateConfigFile(char* confDir){ +static int generateConfigFile(char* confDir) { TdFilePtr pFile = taosOpenFile(confDir, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_TRUNC); if (pFile == NULL) { uError("[rsync] open conf file error, dir:%s,"ERRNO_ERR_FORMAT, confDir, ERRNO_ERR_DATA); @@ -111,7 +111,7 @@ static int execCommand(char* command){ return code; } -void stopRsync(){ +void stopRsync() { int code = #ifdef WINDOWS system("taskkill /f /im rsync.exe"); @@ -125,7 +125,7 @@ void stopRsync(){ uDebug("[rsync] stop rsync server successful"); } -void startRsync(){ +void startRsync() { if(taosMulMkDir(tsCheckpointBackupDir) != 0){ uError("[rsync] build checkpoint backup dir failed, dir:%s,"ERRNO_ERR_FORMAT, tsCheckpointBackupDir, ERRNO_ERR_DATA); return; @@ -151,7 +151,7 @@ void startRsync(){ uDebug("[rsync] start server successful"); } -int uploadRsync(char* id, char* path){ +int uploadRsync(const char* id, const char* path) { #ifdef WINDOWS char pathTransform[PATH_MAX] = {0}; changeDirFromWindowsToLinux(path, pathTransform); @@ -188,7 +188,7 @@ int uploadRsync(char* id, char* path){ return 0; } -int downloadRsync(char* id, char* path){ +int downloadRsync(const char* id, const char* path) { #ifdef WINDOWS char pathTransform[PATH_MAX] = {0}; changeDirFromWindowsToLinux(path, pathTransform); @@ -212,7 +212,7 @@ int downloadRsync(char* id, char* path){ return 0; } -int deleteRsync(char* id){ +int deleteRsync(const char* id) { char* tmp = "./tmp_empty/"; int code = taosMkDir(tmp); if(code != 0){ diff --git a/source/dnode/mnode/impl/src/mndVgroup.c b/source/dnode/mnode/impl/src/mndVgroup.c index 3fc89c3d84..5322540670 100644 --- a/source/dnode/mnode/impl/src/mndVgroup.c +++ b/source/dnode/mnode/impl/src/mndVgroup.c @@ -1490,6 +1490,7 @@ static int32_t mndAddAlterVnodeHashRangeAction(SMnode *pMnode, STrans *pTrans, i return -1; } + mInfo("trans:%d, add alter vnode hash range action for from vgId:%d to vgId:%d", pTrans->id, srcVgId, pVgroup->vgId); return 0; } diff --git a/source/dnode/vnode/src/tq/tqStreamTask.c b/source/dnode/vnode/src/tq/tqStreamTask.c index 19e53c7d15..5a29f67ae3 100644 --- a/source/dnode/vnode/src/tq/tqStreamTask.c +++ b/source/dnode/vnode/src/tq/tqStreamTask.c @@ -62,29 +62,14 @@ typedef struct SBuildScanWalMsgParam { } SBuildScanWalMsgParam; static void doStartScanWal(void* param, void* tmrId) { - SBuildScanWalMsgParam* pParam = (SBuildScanWalMsgParam*) param; - - int32_t vgId = pParam->pTq->pStreamMeta->vgId; - - SStreamTaskRunReq* pRunReq = rpcMallocCont(sizeof(SStreamTaskRunReq)); - if (pRunReq == NULL) { - taosMemoryFree(pParam); - terrno = TSDB_CODE_OUT_OF_MEMORY; - tqError("vgId:%d failed to create msg to start wal scanning to launch stream tasks, code:%s", vgId, terrstr()); - return; - } + SBuildScanWalMsgParam* pParam = (SBuildScanWalMsgParam*)param; + STQ* pTq = pParam->pTq; + int32_t vgId = pTq->pStreamMeta->vgId; tqDebug("vgId:%d create msg to start wal scan, numOfTasks:%d, vnd restored:%d", vgId, pParam->numOfTasks, - pParam->pTq->pVnode->restored); - - pRunReq->head.vgId = vgId; - pRunReq->streamId = 0; - pRunReq->taskId = 0; - pRunReq->reqType = STREAM_EXEC_T_EXTRACT_WAL_DATA; - - SRpcMsg msg = {.msgType = TDMT_STREAM_TASK_RUN, .pCont = pRunReq, .contLen = sizeof(SStreamTaskRunReq)}; - tmsgPutToQueue(&pParam->pTq->pVnode->msgCb, STREAM_QUEUE, &msg); + pTq->pVnode->restored); + /*int32_t code = */ streamTaskSchedTask(&pTq->pVnode->msgCb, vgId, 0, 0, STREAM_EXEC_T_EXTRACT_WAL_DATA); taosMemoryFree(pParam); } @@ -149,50 +134,19 @@ int32_t tqScanWalAsync(STQ* pTq, bool ckPause) { return 0; } - SStreamTaskRunReq* pRunReq = rpcMallocCont(sizeof(SStreamTaskRunReq)); - if (pRunReq == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - tqError("vgId:%d failed to create msg to start wal scanning to launch stream tasks, code:%s", vgId, terrstr()); - streamMetaWUnLock(pMeta); - return -1; - } - tqDebug("vgId:%d create msg to start wal scan to launch stream tasks, numOfTasks:%d, vnd restored:%d", vgId, numOfTasks, alreadyRestored); - pRunReq->head.vgId = vgId; - pRunReq->streamId = 0; - pRunReq->taskId = 0; - pRunReq->reqType = STREAM_EXEC_T_EXTRACT_WAL_DATA; - - SRpcMsg msg = {.msgType = TDMT_STREAM_TASK_RUN, .pCont = pRunReq, .contLen = sizeof(SStreamTaskRunReq)}; - tmsgPutToQueue(&pTq->pVnode->msgCb, STREAM_QUEUE, &msg); + int32_t code = streamTaskSchedTask(&pTq->pVnode->msgCb, vgId, 0, 0, STREAM_EXEC_T_EXTRACT_WAL_DATA); streamMetaWUnLock(pMeta); - return 0; + return code; } int32_t tqStopStreamTasksAsync(STQ* pTq) { SStreamMeta* pMeta = pTq->pStreamMeta; int32_t vgId = pMeta->vgId; - - SStreamTaskRunReq* pRunReq = rpcMallocCont(sizeof(SStreamTaskRunReq)); - if (pRunReq == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - tqError("vgId:%d failed to create msg to stop tasks async, code:%s", vgId, terrstr()); - return -1; - } - - tqDebug("vgId:%d create msg to stop all tasks async", vgId); - - pRunReq->head.vgId = vgId; - pRunReq->streamId = 0; - pRunReq->taskId = 0; - pRunReq->reqType = STREAM_EXEC_T_STOP_ALL_TASKS; - - SRpcMsg msg = {.msgType = TDMT_STREAM_TASK_RUN, .pCont = pRunReq, .contLen = sizeof(SStreamTaskRunReq)}; - tmsgPutToQueue(&pTq->pVnode->msgCb, STREAM_QUEUE, &msg); - return 0; + return streamTaskSchedTask(&pTq->pVnode->msgCb, vgId, 0, 0, STREAM_EXEC_T_STOP_ALL_TASKS); } int32_t setWalReaderStartOffset(SStreamTask* pTask, int32_t vgId) { @@ -408,7 +362,7 @@ int32_t doScanWalForAllTasks(SStreamMeta* pStreamMeta, bool* pScanIdle) { if ((numOfItems > 0) || hasNewData) { noDataInWal = false; - code = streamSchedExec(pTask); + code = streamTrySchedExec(pTask); if (code != TSDB_CODE_SUCCESS) { streamMetaReleaseTask(pStreamMeta, pTask); return -1; diff --git a/source/dnode/vnode/src/tqCommon/tqCommon.c b/source/dnode/vnode/src/tqCommon/tqCommon.c index e404f1e7b9..be3023122f 100644 --- a/source/dnode/vnode/src/tqCommon/tqCommon.c +++ b/source/dnode/vnode/src/tqCommon/tqCommon.c @@ -17,12 +17,6 @@ #include "tq.h" #include "tstream.h" -typedef struct STaskUpdateEntry { - int64_t streamId; - int32_t taskId; - int32_t transId; -} STaskUpdateEntry; - typedef struct SMStreamCheckpointReadyRspMsg { SMsgHead head; } SMStreamCheckpointReadyRspMsg; @@ -116,22 +110,10 @@ int32_t tqStreamTaskStartAsync(SStreamMeta* pMeta, SMsgCb* cb, bool restart) { return 0; } - SStreamTaskRunReq* pRunReq = rpcMallocCont(sizeof(SStreamTaskRunReq)); - if (pRunReq == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - tqError("vgId:%d failed to create msg to start wal scanning to launch stream tasks, code:%s", vgId, terrstr()); - return -1; - } - tqDebug("vgId:%d start all %d stream task(s) async", vgId, numOfTasks); - pRunReq->head.vgId = vgId; - pRunReq->streamId = 0; - pRunReq->taskId = 0; - pRunReq->reqType = restart ? STREAM_EXEC_T_RESTART_ALL_TASKS : STREAM_EXEC_T_START_ALL_TASKS; - SRpcMsg msg = {.msgType = TDMT_STREAM_TASK_RUN, .pCont = pRunReq, .contLen = sizeof(SStreamTaskRunReq)}; - tmsgPutToQueue(cb, STREAM_QUEUE, &msg); - return 0; + int32_t type = restart ? STREAM_EXEC_T_RESTART_ALL_TASKS : STREAM_EXEC_T_START_ALL_TASKS; + return streamTaskSchedTask(cb, vgId, 0, 0, type); } int32_t tqStreamStartOneTaskAsync(SStreamMeta* pMeta, SMsgCb* cb, int64_t streamId, int32_t taskId) { @@ -143,22 +125,8 @@ int32_t tqStreamStartOneTaskAsync(SStreamMeta* pMeta, SMsgCb* cb, int64_t stream return 0; } - SStreamTaskRunReq* pRunReq = rpcMallocCont(sizeof(SStreamTaskRunReq)); - if (pRunReq == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - tqError("vgId:%d failed to create msg to start task:0x%x, code:%s", vgId, taskId, terrstr()); - return -1; - } - tqDebug("vgId:%d start task:0x%x async", vgId, taskId); - pRunReq->head.vgId = vgId; - pRunReq->streamId = streamId; - pRunReq->taskId = taskId; - pRunReq->reqType = STREAM_EXEC_T_START_ONE_TASK; - - SRpcMsg msg = {.msgType = TDMT_STREAM_TASK_RUN, .pCont = pRunReq, .contLen = sizeof(SStreamTaskRunReq)}; - tmsgPutToQueue(cb, STREAM_QUEUE, &msg); - return 0; + return streamTaskSchedTask(cb, vgId, streamId, taskId, STREAM_EXEC_T_START_ONE_TASK); } int32_t tqStreamTaskProcessUpdateReq(SStreamMeta* pMeta, SMsgCb* cb, SRpcMsg* pMsg, bool restored) { @@ -259,6 +227,7 @@ int32_t tqStreamTaskProcessUpdateReq(SStreamMeta* pMeta, SMsgCb* cb, SRpcMsg* pM } } + // save if (updated) { tqDebug("s-task:%s vgId:%d save task after update epset, and stop task", idstr, vgId); streamMetaSaveTask(pMeta, pTask); @@ -269,22 +238,15 @@ int32_t tqStreamTaskProcessUpdateReq(SStreamMeta* pMeta, SMsgCb* cb, SRpcMsg* pM tqDebug("s-task:%s vgId:%d not save task since not update epset actually, stop task", idstr, vgId); } + // stop streamTaskStop(pTask); - - // keep the already updated info - taosHashPut(pMeta->updateInfo.pTasks, &entry, sizeof(entry), NULL, 0); - - int64_t now = taosGetTimestampMs(); if (ppHTask != NULL) { streamTaskStop(*ppHTask); - tqDebug("s-task:%s vgId:%d task nodeEp update completed, streamTask/fill-history closed, elapsed:%" PRId64 " ms", - idstr, vgId, now - st); - taosHashPut(pMeta->updateInfo.pTasks, &(*ppHTask)->id, sizeof(pTask->id), NULL, 0); - } else { - tqDebug("s-task:%s vgId:%d, task nodeEp update completed, streamTask closed, elapsed time:%" PRId64 "ms", idstr, - vgId, now - st); } + // keep info + streamMetaAddIntoUpdateTaskList(pMeta, pTask, (ppHTask != NULL) ? (*ppHTask) : NULL, req.transId, st); + rsp.code = 0; // possibly only handle the stream task. @@ -307,10 +269,8 @@ int32_t tqStreamTaskProcessUpdateReq(SStreamMeta* pMeta, SMsgCb* cb, SRpcMsg* pM } else { tqDebug("vgId:%d all %d task(s) nodeEp updated and closed, transId:%d", vgId, numOfTasks, req.transId); #if 0 - // for test purpose, to trigger the leader election - taosMSleep(5000); + taosMSleep(5000);// for test purpose, to trigger the leader election #endif - tqStreamTaskStartAsync(pMeta, cb, true); } } @@ -712,26 +672,6 @@ int32_t tqStreamTaskProcessDropReq(SStreamMeta* pMeta, char* msg, int32_t msgLen return 0; } -int32_t tqStreamTaskResetStatus(SStreamMeta* pMeta) { - int32_t vgId = pMeta->vgId; - int32_t numOfTasks = taosArrayGetSize(pMeta->pTaskList); - - tqDebug("vgId:%d reset all %d stream task(s) status to be uninit", vgId, numOfTasks); - if (numOfTasks == 0) { - return TSDB_CODE_SUCCESS; - } - - for (int32_t i = 0; i < numOfTasks; ++i) { - SStreamTaskId* pTaskId = taosArrayGet(pMeta->pTaskList, i); - - STaskId id = {.streamId = pTaskId->streamId, .taskId = pTaskId->taskId}; - SStreamTask** pTask = taosHashGet(pMeta->pTasksMap, &id, sizeof(id)); - streamTaskResetStatus(*pTask); - } - - return 0; -} - static int32_t restartStreamTasks(SStreamMeta* pMeta, bool isLeader) { int32_t vgId = pMeta->vgId; int32_t code = 0; @@ -781,7 +721,7 @@ static int32_t restartStreamTasks(SStreamMeta* pMeta, bool isLeader) { } else { streamMetaResetStartInfo(&pMeta->startInfo); streamMetaWUnLock(pMeta); - tqInfo("vgId:%d, follower node not start stream tasks", vgId); + tqInfo("vgId:%d, follower node not start stream tasks or stream is disabled", vgId); } code = terrno; @@ -1006,7 +946,7 @@ static int32_t tqProcessTaskResumeImpl(void* handle, SStreamTask* pTask, int64_t } else if (level == TASK_LEVEL__SOURCE && (streamQueueGetNumOfItems(pTask->inputq.queue) == 0)) { tqScanWalAsync((STQ*)handle, false); } else { - streamSchedExec(pTask); + streamTrySchedExec(pTask); } } else if (status == TASK_STATUS__UNINIT) { // todo: fill-history task init ? diff --git a/source/dnode/vnode/src/tsdb/tsdbCache.c b/source/dnode/vnode/src/tsdb/tsdbCache.c index fd8b73b1f0..d72c1dec6a 100644 --- a/source/dnode/vnode/src/tsdb/tsdbCache.c +++ b/source/dnode/vnode/src/tsdb/tsdbCache.c @@ -374,9 +374,9 @@ static SLastCol *tsdbCacheDeserializeV2(char const *value) { SLastCol *pLastCol = taosMemoryMalloc(sizeof(SLastCol)); *pLastCol = *(SLastCol *)(value); - char* currentPos = (char *)value + sizeof(*pLastCol); + char *currentPos = (char *)value + sizeof(*pLastCol); for (int8_t i = 0; i < pLastCol->rowKey.numOfPKs; i++) { - SValue* pValue = &pLastCol->rowKey.pks[i]; + SValue *pValue = &pLastCol->rowKey.pks[i]; if (IS_VAR_DATA_TYPE(pValue->type)) { if (pValue->nData > 0) { pValue->pData = currentPos; @@ -434,11 +434,10 @@ static void tsdbCacheSerialize(SLastCol *pLastCol, char **value, size_t *size) { length += pColVal->value.nData; } - // set version *value = taosMemoryMalloc(length); // copy last col - SLastCol* pToLastCol = (SLastCol *)(*value); + SLastCol *pToLastCol = (SLastCol *)(*value); *pToLastCol = *pLastCol; char *currentPos = *value + sizeof(*pLastCol); @@ -545,9 +544,7 @@ static void reallocVarDataVal(SValue *pValue) { } } -static void reallocVarData(SColVal *pColVal) { - reallocVarDataVal(&pColVal->value); -} +static void reallocVarData(SColVal *pColVal) { reallocVarDataVal(&pColVal->value); } static void tsdbCacheDeleter(const void *key, size_t klen, void *value, void *ud) { SLastCol *pLastCol = (SLastCol *)value; @@ -1095,7 +1092,9 @@ int32_t tsdbCacheUpdate(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSDBROW *pRow if (NULL == pLastCol || (tRowKeyCompare(&pLastCol->rowKey, pRowKey) != 1)) { char *value = NULL; size_t vlen = 0; - tsdbCacheSerialize(&(SLastCol){.rowKey = *pRowKey, .colVal = *pColVal}, &value, &vlen); + tsdbCacheSerialize(&(SLastCol){.version = LAST_COL_VERSION, .rowKey = *pRowKey, .colVal = *pColVal}, &value, + &vlen); + // SLastKey key = (SLastKey){.ltype = 0, .uid = uid, .cid = pColVal->cid}; taosThreadMutexLock(&pTsdb->rCache.rMutex); @@ -1135,7 +1134,9 @@ int32_t tsdbCacheUpdate(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSDBROW *pRow if (NULL == pLastCol || (tRowKeyCompare(&pLastCol->rowKey, pRowKey) != 1)) { char *value = NULL; size_t vlen = 0; - tsdbCacheSerialize(&(SLastCol){.rowKey = *pRowKey, .colVal = *pColVal}, &value, &vlen); + tsdbCacheSerialize(&(SLastCol){.version = LAST_COL_VERSION, .rowKey = *pRowKey, .colVal = *pColVal}, &value, + &vlen); + // SLastKey key = (SLastKey){.ltype = 1, .uid = uid, .cid = pColVal->cid}; taosThreadMutexLock(&pTsdb->rCache.rMutex); @@ -1409,7 +1410,7 @@ static int32_t tsdbCacheLoadFromRaw(STsdb *pTsdb, tb_uid_t uid, SArray *pLastArr for (int i = 0; i < num_keys; ++i) { SIdxKey *idxKey = taosArrayGet(remainCols, i); slotIds[i] = pr->pSlotIds[idxKey->idx]; - if (idxKey->key.lflag == CACHESCAN_RETRIEVE_LAST >> 3) { + if (IS_LAST_KEY(idxKey->key)) { if (NULL == lastTmpIndexArray) { lastTmpIndexArray = taosArrayInit(num_keys, sizeof(int32_t)); } @@ -1548,6 +1549,8 @@ static int32_t tsdbCacheLoadFromRocks(STsdb *pTsdb, tb_uid_t uid, SArray *pLastA keys_list_sizes, values_list, values_list_sizes, errs); for (int i = 0; i < num_keys; ++i) { if (errs[i]) { + tsdbError("vgId:%d, %s failed at line %d since %s, index:%d", TD_VID(pTsdb->pVnode), __func__, __LINE__, errs[i], + i); rocksdb_free(errs[i]); } } @@ -1559,7 +1562,7 @@ static int32_t tsdbCacheLoadFromRocks(STsdb *pTsdb, tb_uid_t uid, SArray *pLastA SLRUCache *pCache = pTsdb->lruCache; for (int i = 0, j = 0; i < num_keys && j < TARRAY_SIZE(remainCols); ++i) { SLastCol *pLastCol = tsdbCacheDeserialize(values_list[i]); - SLastCol* PToFree = pLastCol; + SLastCol *PToFree = pLastCol; SIdxKey *idxKey = &((SIdxKey *)TARRAY_DATA(remainCols))[j]; if (pLastCol) { SLastCol *pTmpLastCol = taosMemoryCalloc(1, sizeof(SLastCol)); @@ -1621,7 +1624,7 @@ int32_t tsdbCacheGetBatch(STsdb *pTsdb, tb_uid_t uid, SArray *pLastArray, SCache for (int i = 0; i < num_keys; ++i) { int16_t cid = ((int16_t *)TARRAY_DATA(pCidList))[i]; - SLastKey *key = &(SLastKey){.lflag = ltype, .uid = uid, .cid = cid}; + SLastKey key = {.lflag = ltype, .uid = uid, .cid = cid}; // for select last_row, last case int32_t funcType = FUNCTION_TYPE_CACHE_LAST; if (pr->pFuncTypeList != NULL && taosArrayGetSize(pr->pFuncTypeList) > i) { @@ -1629,10 +1632,10 @@ int32_t tsdbCacheGetBatch(STsdb *pTsdb, tb_uid_t uid, SArray *pLastArray, SCache } if (((pr->type & CACHESCAN_RETRIEVE_LAST) == CACHESCAN_RETRIEVE_LAST) && FUNCTION_TYPE_CACHE_LAST_ROW == funcType) { int8_t tempType = CACHESCAN_RETRIEVE_LAST_ROW | (pr->type ^ CACHESCAN_RETRIEVE_LAST); - key->lflag = (tempType & CACHESCAN_RETRIEVE_LAST) >> 3; + key.lflag = (tempType & CACHESCAN_RETRIEVE_LAST) >> 3; } - LRUHandle *h = taosLRUCacheLookup(pCache, key, ROCKS_KEY_LEN); + LRUHandle *h = taosLRUCacheLookup(pCache, &key, ROCKS_KEY_LEN); if (h) { SLastCol *pLastCol = (SLastCol *)taosLRUCacheValue(pCache, h); @@ -1654,7 +1657,7 @@ int32_t tsdbCacheGetBatch(STsdb *pTsdb, tb_uid_t uid, SArray *pLastArray, SCache if (!remainCols) { remainCols = taosArrayInit(num_keys, sizeof(SIdxKey)); } - taosArrayPush(remainCols, &(SIdxKey){i, *key}); + taosArrayPush(remainCols, &(SIdxKey){i, key}); } } diff --git a/source/libs/function/src/builtins.c b/source/libs/function/src/builtins.c index 67c4a8d875..498b46dcfe 100644 --- a/source/libs/function/src/builtins.c +++ b/source/libs/function/src/builtins.c @@ -2679,7 +2679,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = { { .name = "_avg_partial", .type = FUNCTION_TYPE_AVG_PARTIAL, - .classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_IGNORE_NULL_FUNC, + .classification = FUNC_MGT_AGG_FUNC, .translateFunc = translateAvgPartial, .dataRequiredFunc = statisDataRequired, .getEnvFunc = getAvgFuncEnv, @@ -2694,7 +2694,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = { { .name = "_avg_merge", .type = FUNCTION_TYPE_AVG_MERGE, - .classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_IGNORE_NULL_FUNC, + .classification = FUNC_MGT_AGG_FUNC, .translateFunc = translateAvgMerge, .getEnvFunc = getAvgFuncEnv, .initFunc = avgFunctionSetup, diff --git a/source/libs/planner/src/planOptimizer.c b/source/libs/planner/src/planOptimizer.c index 687a9c6d4c..1ec3e6192d 100644 --- a/source/libs/planner/src/planOptimizer.c +++ b/source/libs/planner/src/planOptimizer.c @@ -1998,6 +1998,82 @@ static int32_t pdcOptimize(SOptimizeContext* pCxt, SLogicSubplan* pLogicSubplan) return pdcOptimizeImpl(pCxt, pLogicSubplan->pNode); } + +static bool eliminateNotNullCondMayBeOptimized(SLogicNode* pNode) { + if (QUERY_NODE_LOGIC_PLAN_AGG != nodeType(pNode)) { + return false; + } + + SAggLogicNode* pAgg = (SAggLogicNode*)pNode; + if (pNode->pChildren->length != 1 || NULL != pAgg->pGroupKeys) { + return false; + } + + SLogicNode* pChild = (SLogicNode*)nodesListGetNode(pAgg->node.pChildren, 0); + if (QUERY_NODE_LOGIC_PLAN_SCAN != nodeType(pChild)) { + return false; + } + + SScanLogicNode* pScan = (SScanLogicNode*)pChild; + if (NULL == pScan->node.pConditions || QUERY_NODE_OPERATOR != nodeType(pScan->node.pConditions)) { + return false; + } + + SOperatorNode* pOp = (SOperatorNode*)pScan->node.pConditions; + if (OP_TYPE_IS_NOT_NULL != pOp->opType) { + return false; + } + + if (QUERY_NODE_COLUMN != nodeType(pOp->pLeft)) { + return false; + } + + SNode* pTmp = NULL; + FOREACH(pTmp, pAgg->pAggFuncs) { + SFunctionNode* pFunc = (SFunctionNode*)pTmp; + if (!fmIsIgnoreNullFunc(pFunc->funcId)) { + return false; + } + if (fmIsMultiResFunc(pFunc->funcId)) { + SNode* pParam = NULL; + FOREACH(pParam, pFunc->pParameterList) { + if (QUERY_NODE_COLUMN != nodeType(pParam)) { + return false; + } + if (!nodesEqualNode(pParam, pOp->pLeft)) { + return false; + } + } + } else { + SNode* pParam = nodesListGetNode(pFunc->pParameterList, 0); + if (QUERY_NODE_COLUMN != nodeType(pParam)) { + return false; + } + if (!nodesEqualNode(pParam, pOp->pLeft)) { + return false; + } + } + } + + return true; +} + +static int32_t eliminateNotNullCondOptimize(SOptimizeContext* pCxt, SLogicSubplan* pLogicSubplan) { + SLogicNode* pNode = (SLogicNode*)optFindPossibleNode(pLogicSubplan->pNode, eliminateNotNullCondMayBeOptimized); + if (NULL == pNode) { + return TSDB_CODE_SUCCESS; + } + + SScanLogicNode* pScan = (SScanLogicNode*)nodesListGetNode(pNode->pChildren, 0); + nodesDestroyNode(pScan->node.pConditions); + pScan->node.pConditions = NULL; + + pCxt->optimized = true; + + return TSDB_CODE_SUCCESS; +} + + static bool sortPriKeyOptIsPriKeyOrderBy(SNodeList* pSortKeys) { if (1 != LIST_LENGTH(pSortKeys)) { return false; @@ -6670,6 +6746,7 @@ static int32_t tsmaOptimize(SOptimizeContext* pCxt, SLogicSubplan* pLogicSubplan static const SOptimizeRule optimizeRuleSet[] = { {.pName = "ScanPath", .optimizeFunc = scanPathOptimize}, {.pName = "PushDownCondition", .optimizeFunc = pdcOptimize}, + {.pName = "EliminateNotNullCond", .optimizeFunc = eliminateNotNullCondOptimize}, {.pName = "JoinCondOptimize", .optimizeFunc = joinCondOptimize}, {.pName = "HashJoin", .optimizeFunc = hashJoinOptimize}, {.pName = "StableJoin", .optimizeFunc = stableJoinOptimize}, diff --git a/source/libs/scheduler/inc/schInt.h b/source/libs/scheduler/inc/schInt.h index d129b0024f..c7f6c20b7d 100644 --- a/source/libs/scheduler/inc/schInt.h +++ b/source/libs/scheduler/inc/schInt.h @@ -287,6 +287,7 @@ typedef struct SSchJob { SExplainCtx *explainCtx; int8_t status; + int8_t inRetry; SQueryNodeAddr resNode; tsem_t rspSem; SSchOpStatus opStatus; diff --git a/source/libs/scheduler/src/schJob.c b/source/libs/scheduler/src/schJob.c index 48aab63ba3..380862f745 100644 --- a/source/libs/scheduler/src/schJob.c +++ b/source/libs/scheduler/src/schJob.c @@ -851,9 +851,18 @@ int32_t schChkResetJobRetry(SSchJob *pJob, int32_t rspCode) { return TSDB_CODE_SUCCESS; } -int32_t schResetJobForRetry(SSchJob *pJob, int32_t rspCode) { +int32_t schResetJobForRetry(SSchJob *pJob, int32_t rspCode, bool *inRetry) { + int8_t origInRetry = atomic_val_compare_exchange_8(&pJob->inRetry, 0, 1); + if (0 != origInRetry) { + SCH_JOB_DLOG("job already in retry, origInRetry: %d", pJob->inRetry); + return TSDB_CODE_SCH_IGNORE_ERROR; + } + + *inRetry = true; + SCH_ERR_RET(schChkResetJobRetry(pJob, rspCode)); + int32_t code = 0; int32_t numOfLevels = taosArrayGetSize(pJob->levels); for (int32_t i = 0; i < numOfLevels; ++i) { SSchLevel *pLevel = taosArrayGet(pJob->levels, i); @@ -865,7 +874,11 @@ int32_t schResetJobForRetry(SSchJob *pJob, int32_t rspCode) { for (int32_t j = 0; j < numOfTasks; ++j) { SSchTask *pTask = taosArrayGet(pLevel->subTasks, j); SCH_LOCK_TASK(pTask); - SCH_ERR_RET(schChkUpdateRedirectCtx(pJob, pTask, NULL, rspCode)); + code = schChkUpdateRedirectCtx(pJob, pTask, NULL, rspCode); + if (TSDB_CODE_SUCCESS != code) { + SCH_UNLOCK_TASK(pTask); + SCH_RET(code); + } qClearSubplanExecutionNode(pTask->plan); schResetTaskForRetry(pJob, pTask); SCH_UNLOCK_TASK(pTask); @@ -880,6 +893,7 @@ int32_t schResetJobForRetry(SSchJob *pJob, int32_t rspCode) { int32_t schHandleJobRetry(SSchJob *pJob, SSchTask *pTask, SDataBuf *pMsg, int32_t rspCode) { int32_t code = 0; + bool inRetry = false; taosMemoryFreeClear(pMsg->pData); taosMemoryFreeClear(pMsg->pEpSet); @@ -888,19 +902,27 @@ int32_t schHandleJobRetry(SSchJob *pJob, SSchTask *pTask, SDataBuf *pMsg, int32_ SCH_TASK_DLOG("start to redirect all job tasks cause of error: %s", tstrerror(rspCode)); - SCH_ERR_JRET(schResetJobForRetry(pJob, rspCode)); + SCH_ERR_JRET(schResetJobForRetry(pJob, rspCode, &inRetry)); SCH_ERR_JRET(schLaunchJob(pJob)); SCH_LOCK_TASK(pTask); + atomic_store_8(&pJob->inRetry, 0); + SCH_RET(code); _return: SCH_LOCK_TASK(pTask); - SCH_RET(schProcessOnTaskFailure(pJob, pTask, code)); + code = schProcessOnTaskFailure(pJob, pTask, code); + + if (inRetry) { + atomic_store_8(&pJob->inRetry, 0); + } + + SCH_RET(code); } bool schChkCurrentOp(SSchJob *pJob, int32_t op, int8_t sync) { diff --git a/source/libs/stream/src/stream.c b/source/libs/stream/src/stream.c deleted file mode 100644 index 7830bbdd39..0000000000 --- a/source/libs/stream/src/stream.c +++ /dev/null @@ -1,308 +0,0 @@ -/* - * Copyright (c) 2019 TAOS Data, Inc. - * - * This program is free software: you can use, redistribute, and/or modify - * it under the terms of the GNU Affero General Public License, version 3 - * or later ("AGPL"), as published by the Free Software Foundation. - * - * This program is distributed in the hope that it will be useful, but WITHOUT - * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or - * FITNESS FOR A PARTICULAR PURPOSE. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see . - */ - -#include "streamInt.h" -#include "ttimer.h" - -void* streamTimer = NULL; - -int32_t streamTimerInit() { - streamTimer = taosTmrInit(1000, 100, 10000, "STREAM"); - if (streamTimer == NULL) { - stError("init stream timer failed, code:%s", tstrerror(terrno)); - return -1; - } - - stInfo("init stream timer, %p", streamTimer); - return 0; -} - -void streamTimerCleanUp() { - stInfo("cleanup stream timer, %p", streamTimer); - taosTmrCleanUp(streamTimer); - streamTimer = NULL; -} - -tmr_h streamTimerGetInstance() { - return streamTimer; -} - -char* createStreamTaskIdStr(int64_t streamId, int32_t taskId) { - char buf[128] = {0}; - sprintf(buf, "0x%" PRIx64 "-0x%x", streamId, taskId); - return taosStrdup(buf); -} - -static void streamSchedByTimer(void* param, void* tmrId) { - SStreamTask* pTask = (void*)param; - const char* id = pTask->id.idStr; - int32_t nextTrigger = (int32_t)pTask->info.triggerParam; - - int8_t status = atomic_load_8(&pTask->schedInfo.status); - stTrace("s-task:%s in scheduler, trigger status:%d, next:%dms", id, status, nextTrigger); - - if (streamTaskShouldStop(pTask) || streamTaskShouldPause(pTask)) { - stDebug("s-task:%s jump out of schedTimer", id); - return; - } - - if (streamTaskGetStatus(pTask)->state == TASK_STATUS__CK) { - stDebug("s-task:%s in checkpoint procedure, not retrieve result, next:%dms", id, nextTrigger); - } else { - if (status == TASK_TRIGGER_STATUS__ACTIVE) { - SStreamTrigger* pTrigger = taosAllocateQitem(sizeof(SStreamTrigger), DEF_QITEM, 0); - if (pTrigger == NULL) { - stError("s-task:%s failed to prepare retrieve data trigger, code:%s, try again in %dms", id, "out of memory", - nextTrigger); - taosTmrReset(streamSchedByTimer, nextTrigger, pTask, streamTimer, &pTask->schedInfo.pDelayTimer); - return; - } - - pTrigger->type = STREAM_INPUT__GET_RES; - pTrigger->pBlock = taosMemoryCalloc(1, sizeof(SSDataBlock)); - if (pTrigger->pBlock == NULL) { - taosFreeQitem(pTrigger); - - stError("s-task:%s failed to prepare retrieve data trigger, code:%s, try again in %dms", id, "out of memory", - nextTrigger); - taosTmrReset(streamSchedByTimer, nextTrigger, pTask, streamTimer, &pTask->schedInfo.pDelayTimer); - return; - } - - atomic_store_8(&pTask->schedInfo.status, TASK_TRIGGER_STATUS__INACTIVE); - pTrigger->pBlock->info.type = STREAM_GET_ALL; - - int32_t code = streamTaskPutDataIntoInputQ(pTask, (SStreamQueueItem*)pTrigger); - if (code != TSDB_CODE_SUCCESS) { - taosTmrReset(streamSchedByTimer, nextTrigger, pTask, streamTimer, &pTask->schedInfo.pDelayTimer); - return; - } - - streamSchedExec(pTask); - } - } - - taosTmrReset(streamSchedByTimer, nextTrigger, pTask, streamTimer, &pTask->schedInfo.pDelayTimer); -} - -int32_t streamSetupScheduleTrigger(SStreamTask* pTask) { - if (pTask->info.triggerParam != 0 && pTask->info.fillHistory == 0) { - int32_t ref = atomic_add_fetch_32(&pTask->refCnt, 1); - ASSERT(ref == 2 && pTask->schedInfo.pDelayTimer == NULL); - - stDebug("s-task:%s setup scheduler trigger, delay:%" PRId64 " ms", pTask->id.idStr, pTask->info.triggerParam); - - pTask->schedInfo.pDelayTimer = taosTmrStart(streamSchedByTimer, (int32_t)pTask->info.triggerParam, pTask, streamTimer); - pTask->schedInfo.status = TASK_TRIGGER_STATUS__INACTIVE; - } - - return 0; -} - -int32_t streamSchedExec(SStreamTask* pTask) { - if (streamTaskSetSchedStatusWait(pTask)) { - SStreamTaskRunReq* pRunReq = rpcMallocCont(sizeof(SStreamTaskRunReq)); - if (pRunReq == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - /*int8_t status = */streamTaskSetSchedStatusInactive(pTask); - stError("failed to create msg to aunch s-task:%s, reason out of memory", pTask->id.idStr); - return -1; - } - - pRunReq->head.vgId = pTask->info.nodeId; - pRunReq->streamId = pTask->id.streamId; - pRunReq->taskId = pTask->id.taskId; - - stDebug("trigger to run s-task:%s", pTask->id.idStr); - - SRpcMsg msg = {.msgType = TDMT_STREAM_TASK_RUN, .pCont = pRunReq, .contLen = sizeof(SStreamTaskRunReq)}; - tmsgPutToQueue(pTask->pMsgCb, STREAM_QUEUE, &msg); - } else { - stTrace("s-task:%s not launch task since sched status:%d", pTask->id.idStr, pTask->status.schedStatus); - } - - return 0; -} - -static int32_t buildDispatchRsp(const SStreamTask* pTask, const SStreamDispatchReq* pReq, int32_t status, void** pBuf) { - *pBuf = rpcMallocCont(sizeof(SMsgHead) + sizeof(SStreamDispatchRsp)); - if (*pBuf == NULL) { - return TSDB_CODE_OUT_OF_MEMORY; - } - - ((SMsgHead*)(*pBuf))->vgId = htonl(pReq->upstreamNodeId); - ASSERT(((SMsgHead*)(*pBuf))->vgId != 0); - - SStreamDispatchRsp* pDispatchRsp = POINTER_SHIFT((*pBuf), sizeof(SMsgHead)); - - pDispatchRsp->stage = htobe64(pReq->stage); - pDispatchRsp->msgId = htonl(pReq->msgId); - pDispatchRsp->inputStatus = status; - pDispatchRsp->streamId = htobe64(pReq->streamId); - pDispatchRsp->upstreamNodeId = htonl(pReq->upstreamNodeId); - pDispatchRsp->upstreamTaskId = htonl(pReq->upstreamTaskId); - pDispatchRsp->downstreamNodeId = htonl(pTask->info.nodeId); - pDispatchRsp->downstreamTaskId = htonl(pTask->id.taskId); - - return TSDB_CODE_SUCCESS; -} - -static int32_t streamTaskAppendInputBlocks(SStreamTask* pTask, const SStreamDispatchReq* pReq) { - int8_t status = 0; - - SStreamDataBlock* pBlock = createStreamBlockFromDispatchMsg(pReq, pReq->type, pReq->srcVgId); - if (pBlock == NULL) { - streamTaskInputFail(pTask); - status = TASK_INPUT_STATUS__FAILED; - stError("vgId:%d, s-task:%s failed to receive dispatch msg, reason: out of memory", pTask->pMeta->vgId, - pTask->id.idStr); - } else { - if (pBlock->type == STREAM_INPUT__TRANS_STATE) { - pTask->status.appendTranstateBlock = true; - } - - int32_t code = streamTaskPutDataIntoInputQ(pTask, (SStreamQueueItem*)pBlock); - // input queue is full, upstream is blocked now - status = (code == TSDB_CODE_SUCCESS) ? TASK_INPUT_STATUS__NORMAL : TASK_INPUT_STATUS__BLOCKED; - } - - return status; -} - -int32_t streamTaskEnqueueRetrieve(SStreamTask* pTask, SStreamRetrieveReq* pReq) { - SStreamDataBlock* pData = taosAllocateQitem(sizeof(SStreamDataBlock), DEF_QITEM, sizeof(SStreamDataBlock)); - int8_t status = TASK_INPUT_STATUS__NORMAL; - - // enqueue - if (pData != NULL) { - stDebug("s-task:%s (child %d) recv retrieve req from task:0x%x(vgId:%d), reqId:0x%" PRIx64, pTask->id.idStr, - pTask->info.selfChildId, pReq->srcTaskId, pReq->srcNodeId, pReq->reqId); - - pData->type = STREAM_INPUT__DATA_RETRIEVE; - pData->srcVgId = 0; - streamRetrieveReqToData(pReq, pData); - if (streamTaskPutDataIntoInputQ(pTask, (SStreamQueueItem*)pData) == 0) { - status = TASK_INPUT_STATUS__NORMAL; - } else { - status = TASK_INPUT_STATUS__FAILED; - } - } else { // todo handle oom - /*streamTaskInputFail(pTask);*/ - /*status = TASK_INPUT_STATUS__FAILED;*/ - } - - return status == TASK_INPUT_STATUS__NORMAL ? 0 : -1; -} - -int32_t streamProcessDispatchMsg(SStreamTask* pTask, SStreamDispatchReq* pReq, SRpcMsg* pRsp) { - int32_t status = 0; - SStreamMeta* pMeta = pTask->pMeta; - const char* id = pTask->id.idStr; - - stDebug("s-task:%s receive dispatch msg from taskId:0x%x(vgId:%d), msgLen:%" PRId64 ", msgId:%d", id, - pReq->upstreamTaskId, pReq->upstreamNodeId, pReq->totalLen, pReq->msgId); - - SStreamChildEpInfo* pInfo = streamTaskGetUpstreamTaskEpInfo(pTask, pReq->upstreamTaskId); - ASSERT(pInfo != NULL); - - if (pMeta->role == NODE_ROLE_FOLLOWER) { - stError("s-task:%s task on follower received dispatch msgs, dispatch msg rejected", id); - status = TASK_INPUT_STATUS__REFUSED; - } else { - if (pReq->stage > pInfo->stage) { - // upstream task has restarted/leader-follower switch/transferred to other dnodes - stError("s-task:%s upstream task:0x%x (vgId:%d) has restart/leader-switch/vnode-transfer, prev stage:%" PRId64 - ", current:%" PRId64 " dispatch msg rejected", - id, pReq->upstreamTaskId, pReq->upstreamNodeId, pInfo->stage, pReq->stage); - status = TASK_INPUT_STATUS__REFUSED; - } else { - if (!pInfo->dataAllowed) { - stWarn("s-task:%s data from task:0x%x is denied, since inputQ is closed for it", id, pReq->upstreamTaskId); - status = TASK_INPUT_STATUS__BLOCKED; - } else { - // This task has received the checkpoint req from the upstream task, from which all the messages should be - // blocked. Note that there is no race condition here. - if (pReq->type == STREAM_INPUT__CHECKPOINT_TRIGGER) { - atomic_add_fetch_32(&pTask->upstreamInfo.numOfClosed, 1); - streamTaskCloseUpstreamInput(pTask, pReq->upstreamTaskId); - stDebug("s-task:%s close inputQ for upstream:0x%x, msgId:%d", id, pReq->upstreamTaskId, pReq->msgId); - } else if (pReq->type == STREAM_INPUT__TRANS_STATE) { - atomic_add_fetch_32(&pTask->upstreamInfo.numOfClosed, 1); - streamTaskCloseUpstreamInput(pTask, pReq->upstreamTaskId); - - // disable the related stream task here to avoid it to receive the newly arrived data after the transfer-state - STaskId* pRelTaskId = &pTask->streamTaskId; - SStreamTask* pStreamTask = streamMetaAcquireTask(pMeta, pRelTaskId->streamId, pRelTaskId->taskId); - if (pStreamTask != NULL) { - atomic_add_fetch_32(&pStreamTask->upstreamInfo.numOfClosed, 1); - streamTaskCloseUpstreamInput(pStreamTask, pReq->upstreamRelTaskId); - streamMetaReleaseTask(pMeta, pStreamTask); - } - - stDebug("s-task:%s close inputQ for upstream:0x%x since trans-state msgId:%d recv, rel stream-task:0x%" PRIx64 - " close inputQ for upstream:0x%x", - id, pReq->upstreamTaskId, pReq->msgId, pTask->streamTaskId.taskId, pReq->upstreamRelTaskId); - } - - status = streamTaskAppendInputBlocks(pTask, pReq); - } - } - } - - // disable the data from upstream tasks -// if (streamTaskGetStatus(pTask)->state == TASK_STATUS__HALT) { -// status = TASK_INPUT_STATUS__BLOCKED; -// } - - { - // do send response with the input status - int32_t code = buildDispatchRsp(pTask, pReq, status, &pRsp->pCont); - if (code != TSDB_CODE_SUCCESS) { - stError("s-task:%s failed to build dispatch rsp, msgId:%d, code:%s", id, pReq->msgId, tstrerror(code)); - terrno = code; - return code; - } - - pRsp->contLen = sizeof(SMsgHead) + sizeof(SStreamDispatchRsp); - tmsgSendRsp(pRsp); - } - - streamSchedExec(pTask); - - return 0; -} - -int32_t streamProcessRetrieveReq(SStreamTask* pTask, SStreamRetrieveReq* pReq) { - int32_t code = streamTaskEnqueueRetrieve(pTask, pReq); - if(code != 0){ - return code; - } - return streamSchedExec(pTask); -} - -void streamTaskInputFail(SStreamTask* pTask) { atomic_store_8(&pTask->inputq.status, TASK_INPUT_STATUS__FAILED); } - -SStreamChildEpInfo* streamTaskGetUpstreamTaskEpInfo(SStreamTask* pTask, int32_t taskId) { - int32_t num = taosArrayGetSize(pTask->upstreamInfo.pList); - for (int32_t i = 0; i < num; ++i) { - SStreamChildEpInfo* pInfo = taosArrayGetP(pTask->upstreamInfo.pList, i); - if (pInfo->taskId == taskId) { - return pInfo; - } - } - - stError("s-task:%s failed to find upstream task:0x%x", pTask->id.idStr, taskId); - return NULL; -} \ No newline at end of file diff --git a/source/libs/stream/src/streamBackendRocksdb.c b/source/libs/stream/src/streamBackendRocksdb.c index 08118f5231..358795a4a2 100644 --- a/source/libs/stream/src/streamBackendRocksdb.c +++ b/source/libs/stream/src/streamBackendRocksdb.c @@ -1964,6 +1964,7 @@ STaskDbWrapper* taskDbOpenImpl(char* key, char* statePath, char* dbPath) { stInfo("newly create db, need to restart"); // pre create db pTaskDb->db = rocksdb_open(pTaskDb->pCfOpts[0], dbPath, &err); + if (pTaskDb->db == NULL) goto _EXIT; rocksdb_close(pTaskDb->db); if (cfNames != NULL) { diff --git a/source/libs/stream/src/streamCheckStatus.c b/source/libs/stream/src/streamCheckStatus.c index d356a504c6..f0f12cae2b 100644 --- a/source/libs/stream/src/streamCheckStatus.c +++ b/source/libs/stream/src/streamCheckStatus.c @@ -341,24 +341,24 @@ int32_t streamTaskCompleteCheckRsp(STaskCheckInfo* pInfo, bool lock, const char* taosThreadMutexLock(&pInfo->checkInfoLock); } - if (!pInfo->inCheckProcess) { -// stWarn("s-task:%s already not in-check-procedure", id); + if (pInfo->inCheckProcess) { + int64_t el = (pInfo->startTs != 0) ? (taosGetTimestampMs() - pInfo->startTs) : 0; + stDebug("s-task:%s clear the in check-rsp flag, set the check-rsp done, elapsed time:%" PRId64 " ms", id, el); + + pInfo->startTs = 0; + pInfo->timeoutStartTs = 0; + pInfo->notReadyTasks = 0; + pInfo->inCheckProcess = 0; + pInfo->stopCheckProcess = 0; + + pInfo->notReadyRetryCount = 0; + pInfo->timeoutRetryCount = 0; + + taosArrayClear(pInfo->pList); + } else { + stDebug("s-task:%s already not in check-rsp procedure", id); } - int64_t el = (pInfo->startTs != 0) ? (taosGetTimestampMs() - pInfo->startTs) : 0; - stDebug("s-task:%s clear the in check-rsp flag, not in check-rsp anymore, elapsed time:%" PRId64 " ms", id, el); - - pInfo->startTs = 0; - pInfo->timeoutStartTs = 0; - pInfo->notReadyTasks = 0; - pInfo->inCheckProcess = 0; - pInfo->stopCheckProcess = 0; - - pInfo->notReadyRetryCount = 0; - pInfo->timeoutRetryCount = 0; - - taosArrayClear(pInfo->pList); - if (lock) { taosThreadMutexUnlock(&pInfo->checkInfoLock); } @@ -527,23 +527,7 @@ void handleNotReadyDownstreamTask(SStreamTask* pTask, SArray* pNotReadyList) { // The restart of all tasks requires that all tasks should not have active timer for now. Therefore, the execution // of restart in timer thread will result in a dead lock. int32_t addDownstreamFailedStatusResultAsync(SMsgCb* pMsgCb, int32_t vgId, int64_t streamId, int32_t taskId) { - SStreamTaskRunReq* pRunReq = rpcMallocCont(sizeof(SStreamTaskRunReq)); - if (pRunReq == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - stError("vgId:%d failed to create msg to stop tasks async, code:%s", vgId, terrstr()); - return -1; - } - - stDebug("vgId:%d create msg add failed s-task:0x%x", vgId, taskId); - - pRunReq->head.vgId = vgId; - pRunReq->streamId = streamId; - pRunReq->taskId = taskId; - pRunReq->reqType = STREAM_EXEC_T_ADD_FAILED_TASK; - - SRpcMsg msg = {.msgType = TDMT_STREAM_TASK_RUN, .pCont = pRunReq, .contLen = sizeof(SStreamTaskRunReq)}; - tmsgPutToQueue(pMsgCb, STREAM_QUEUE, &msg); - return 0; + return streamTaskSchedTask(pMsgCb, vgId, streamId, taskId, STREAM_EXEC_T_ADD_FAILED_TASK); } // this function is executed in timer thread diff --git a/source/libs/stream/src/streamCheckpoint.c b/source/libs/stream/src/streamCheckpoint.c index f3e83eb190..d2f9a0cbc3 100644 --- a/source/libs/stream/src/streamCheckpoint.c +++ b/source/libs/stream/src/streamCheckpoint.c @@ -30,8 +30,8 @@ typedef struct { static int32_t downloadCheckpointDataByName(const char* id, const char* fname, const char* dstName); static int32_t deleteCheckpointFile(const char* id, const char* name); -static int32_t streamTaskBackupCheckpoint(char* id, char* path); -static int32_t deleteCheckpoint(char* id); +static int32_t streamTaskBackupCheckpoint(const char* id, const char* path); +static int32_t deleteCheckpoint(const char* id); int32_t tEncodeStreamCheckpointSourceReq(SEncoder* pEncoder, const SStreamCheckpointSourceReq* pReq) { if (tStartEncode(pEncoder) < 0) return -1; @@ -157,7 +157,7 @@ static int32_t appendCheckpointIntoInputQ(SStreamTask* pTask, int32_t checkpoint return TSDB_CODE_OUT_OF_MEMORY; } - streamSchedExec(pTask); + streamTrySchedExec(pTask); return TSDB_CODE_SUCCESS; } @@ -578,7 +578,7 @@ int32_t streamTaskBuildCheckpoint(SStreamTask* pTask) { return code; } -static int32_t uploadCheckpointToS3(char* id, char* path) { +static int32_t uploadCheckpointToS3(const char* id, const char* path) { TdDirPtr pDir = taosOpenDir(path); if (pDir == NULL) return -1; @@ -631,7 +631,7 @@ ECHECKPOINT_BACKUP_TYPE streamGetCheckpointBackupType() { } } -int32_t streamTaskBackupCheckpoint(char* id, char* path) { +int32_t streamTaskBackupCheckpoint(const char* id, const char* path) { if (id == NULL || path == NULL || strlen(id) == 0 || strlen(path) == 0 || strlen(path) >= PATH_MAX) { stError("streamTaskBackupCheckpoint parameters invalid"); return -1; @@ -675,7 +675,7 @@ int32_t streamTaskDownloadCheckpointData(char* id, char* path) { return 0; } -int32_t deleteCheckpoint(char* id) { +int32_t deleteCheckpoint(const char* id) { if (id == NULL || strlen(id) == 0) { stError("deleteCheckpoint parameters invalid"); return -1; diff --git a/source/libs/stream/src/streamDispatch.c b/source/libs/stream/src/streamDispatch.c index baf5ebf8cb..d56b347f4c 100644 --- a/source/libs/stream/src/streamDispatch.c +++ b/source/libs/stream/src/streamDispatch.c @@ -1137,6 +1137,129 @@ int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp, i return 0; } +static int32_t buildDispatchRsp(const SStreamTask* pTask, const SStreamDispatchReq* pReq, int32_t status, void** pBuf) { + *pBuf = rpcMallocCont(sizeof(SMsgHead) + sizeof(SStreamDispatchRsp)); + if (*pBuf == NULL) { + return TSDB_CODE_OUT_OF_MEMORY; + } + + ((SMsgHead*)(*pBuf))->vgId = htonl(pReq->upstreamNodeId); + ASSERT(((SMsgHead*)(*pBuf))->vgId != 0); + + SStreamDispatchRsp* pDispatchRsp = POINTER_SHIFT((*pBuf), sizeof(SMsgHead)); + + pDispatchRsp->stage = htobe64(pReq->stage); + pDispatchRsp->msgId = htonl(pReq->msgId); + pDispatchRsp->inputStatus = status; + pDispatchRsp->streamId = htobe64(pReq->streamId); + pDispatchRsp->upstreamNodeId = htonl(pReq->upstreamNodeId); + pDispatchRsp->upstreamTaskId = htonl(pReq->upstreamTaskId); + pDispatchRsp->downstreamNodeId = htonl(pTask->info.nodeId); + pDispatchRsp->downstreamTaskId = htonl(pTask->id.taskId); + + return TSDB_CODE_SUCCESS; +} + +static int32_t streamTaskAppendInputBlocks(SStreamTask* pTask, const SStreamDispatchReq* pReq) { + int8_t status = 0; + + SStreamDataBlock* pBlock = createStreamBlockFromDispatchMsg(pReq, pReq->type, pReq->srcVgId); + if (pBlock == NULL) { + streamTaskInputFail(pTask); + status = TASK_INPUT_STATUS__FAILED; + stError("vgId:%d, s-task:%s failed to receive dispatch msg, reason: out of memory", pTask->pMeta->vgId, + pTask->id.idStr); + } else { + if (pBlock->type == STREAM_INPUT__TRANS_STATE) { + pTask->status.appendTranstateBlock = true; + } + + int32_t code = streamTaskPutDataIntoInputQ(pTask, (SStreamQueueItem*)pBlock); + // input queue is full, upstream is blocked now + status = (code == TSDB_CODE_SUCCESS) ? TASK_INPUT_STATUS__NORMAL : TASK_INPUT_STATUS__BLOCKED; + } + + return status; +} + +int32_t streamProcessDispatchMsg(SStreamTask* pTask, SStreamDispatchReq* pReq, SRpcMsg* pRsp) { + int32_t status = 0; + SStreamMeta* pMeta = pTask->pMeta; + const char* id = pTask->id.idStr; + + stDebug("s-task:%s receive dispatch msg from taskId:0x%x(vgId:%d), msgLen:%" PRId64 ", msgId:%d", id, + pReq->upstreamTaskId, pReq->upstreamNodeId, pReq->totalLen, pReq->msgId); + + SStreamChildEpInfo* pInfo = streamTaskGetUpstreamTaskEpInfo(pTask, pReq->upstreamTaskId); + ASSERT(pInfo != NULL); + + if (pMeta->role == NODE_ROLE_FOLLOWER) { + stError("s-task:%s task on follower received dispatch msgs, dispatch msg rejected", id); + status = TASK_INPUT_STATUS__REFUSED; + } else { + if (pReq->stage > pInfo->stage) { + // upstream task has restarted/leader-follower switch/transferred to other dnodes + stError("s-task:%s upstream task:0x%x (vgId:%d) has restart/leader-switch/vnode-transfer, prev stage:%" PRId64 + ", current:%" PRId64 " dispatch msg rejected", + id, pReq->upstreamTaskId, pReq->upstreamNodeId, pInfo->stage, pReq->stage); + status = TASK_INPUT_STATUS__REFUSED; + } else { + if (!pInfo->dataAllowed) { + stWarn("s-task:%s data from task:0x%x is denied, since inputQ is closed for it", id, pReq->upstreamTaskId); + status = TASK_INPUT_STATUS__BLOCKED; + } else { + // This task has received the checkpoint req from the upstream task, from which all the messages should be + // blocked. Note that there is no race condition here. + if (pReq->type == STREAM_INPUT__CHECKPOINT_TRIGGER) { + atomic_add_fetch_32(&pTask->upstreamInfo.numOfClosed, 1); + streamTaskCloseUpstreamInput(pTask, pReq->upstreamTaskId); + stDebug("s-task:%s close inputQ for upstream:0x%x, msgId:%d", id, pReq->upstreamTaskId, pReq->msgId); + } else if (pReq->type == STREAM_INPUT__TRANS_STATE) { + atomic_add_fetch_32(&pTask->upstreamInfo.numOfClosed, 1); + streamTaskCloseUpstreamInput(pTask, pReq->upstreamTaskId); + + // disable the related stream task here to avoid it to receive the newly arrived data after the transfer-state + STaskId* pRelTaskId = &pTask->streamTaskId; + SStreamTask* pStreamTask = streamMetaAcquireTask(pMeta, pRelTaskId->streamId, pRelTaskId->taskId); + if (pStreamTask != NULL) { + atomic_add_fetch_32(&pStreamTask->upstreamInfo.numOfClosed, 1); + streamTaskCloseUpstreamInput(pStreamTask, pReq->upstreamRelTaskId); + streamMetaReleaseTask(pMeta, pStreamTask); + } + + stDebug("s-task:%s close inputQ for upstream:0x%x since trans-state msgId:%d recv, rel stream-task:0x%" PRIx64 + " close inputQ for upstream:0x%x", + id, pReq->upstreamTaskId, pReq->msgId, pTask->streamTaskId.taskId, pReq->upstreamRelTaskId); + } + + status = streamTaskAppendInputBlocks(pTask, pReq); + } + } + } + + // disable the data from upstream tasks +// if (streamTaskGetStatus(pTask)->state == TASK_STATUS__HALT) { +// status = TASK_INPUT_STATUS__BLOCKED; +// } + + { + // do send response with the input status + int32_t code = buildDispatchRsp(pTask, pReq, status, &pRsp->pCont); + if (code != TSDB_CODE_SUCCESS) { + stError("s-task:%s failed to build dispatch rsp, msgId:%d, code:%s", id, pReq->msgId, tstrerror(code)); + terrno = code; + return code; + } + + pRsp->contLen = sizeof(SMsgHead) + sizeof(SStreamDispatchRsp); + tmsgSendRsp(pRsp); + } + + streamTrySchedExec(pTask); + + return 0; +} + int32_t tEncodeStreamTaskUpdateMsg(SEncoder* pEncoder, const SStreamTaskNodeUpdateMsg* pMsg) { if (tStartEncode(pEncoder) < 0) return -1; if (tEncodeI64(pEncoder, pMsg->streamId) < 0) return -1; diff --git a/source/libs/stream/src/streamExec.c b/source/libs/stream/src/streamExec.c index 250866005e..047b169ec9 100644 --- a/source/libs/stream/src/streamExec.c +++ b/source/libs/stream/src/streamExec.c @@ -533,8 +533,7 @@ int32_t streamProcessTransstateBlock(SStreamTask* pTask, SStreamDataBlock* pBloc return code; } -static void setTaskSchedInfo(SStreamTask* pTask, int32_t idleTime) { pTask->status.schedIdleTime = idleTime; } -static void clearTaskSchedInfo(SStreamTask* pTask) { pTask->status.schedIdleTime = 0; } +//static void streamTaskSetIdleInfo(SStreamTask* pTask, int32_t idleTime) { pTask->status.schedIdleTime = idleTime; } static void setLastExecTs(SStreamTask* pTask, int64_t ts) { pTask->status.lastExecTs = ts; } /** @@ -559,26 +558,26 @@ static int32_t doStreamExecTask(SStreamTask* pTask) { if (streamQueueIsFull(pTask->outputq.queue)) { stWarn("s-task:%s outputQ is full, idle for 500ms and retry", id); - setTaskSchedInfo(pTask, 500); + streamTaskSetIdleInfo(pTask, 500); return 0; } if (pTask->inputq.status == TASK_INPUT_STATUS__BLOCKED) { stWarn("s-task:%s downstream task inputQ blocked, idle for 1sec and retry", id); - setTaskSchedInfo(pTask, 1000); + streamTaskSetIdleInfo(pTask, 1000); return 0; } if (taosGetTimestampMs() - pTask->status.lastExecTs < MIN_INVOKE_INTERVAL) { stDebug("s-task:%s invoke with high frequency, idle and retry exec in 50ms", id); - setTaskSchedInfo(pTask, MIN_INVOKE_INTERVAL); + streamTaskSetIdleInfo(pTask, MIN_INVOKE_INTERVAL); return 0; } EExtractDataCode ret = streamTaskGetDataFromInputQ(pTask, &pInput, &numOfBlocks, &blockSize); if (ret == EXEC_AFTER_IDLE) { ASSERT(pInput == NULL && numOfBlocks == 0); - setTaskSchedInfo(pTask, MIN_INVOKE_INTERVAL); + streamTaskSetIdleInfo(pTask, MIN_INVOKE_INTERVAL); return 0; } else { if (pInput == NULL) { @@ -720,66 +719,6 @@ bool streamTaskReadyToRun(const SStreamTask* pTask, char** pStatus) { } } -static void doStreamExecTaskHelper(void* param, void* tmrId) { - SStreamTask* pTask = (SStreamTask*)param; - - SStreamTaskState* p = streamTaskGetStatus(pTask); - if (p->state == TASK_STATUS__DROPPING || p->state == TASK_STATUS__STOP) { - streamTaskSetSchedStatusInactive(pTask); - - int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1); - stDebug("s-task:%s status:%s not resume task, ref:%d", pTask->id.idStr, p->name, ref); - - streamMetaReleaseTask(pTask->pMeta, pTask); - return; - } - - // task resume running - SStreamTaskRunReq* pRunReq = rpcMallocCont(sizeof(SStreamTaskRunReq)); - if (pRunReq == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - /*int8_t status = */streamTaskSetSchedStatusInactive(pTask); - - int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1); - stError("failed to create msg to resume s-task:%s, reason out of memory, ref:%d", pTask->id.idStr, ref); - - streamMetaReleaseTask(pTask->pMeta, pTask); - return; - } - - pRunReq->head.vgId = pTask->info.nodeId; - pRunReq->streamId = pTask->id.streamId; - pRunReq->taskId = pTask->id.taskId; - pRunReq->reqType = STREAM_EXEC_T_RESUME_TASK; - - int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1); - stDebug("trigger to resume s-task:%s after being idled for %dms, ref:%d", pTask->id.idStr, pTask->status.schedIdleTime, ref); - - SRpcMsg msg = {.msgType = TDMT_STREAM_TASK_RUN, .pCont = pRunReq, .contLen = sizeof(SStreamTaskRunReq)}; - tmsgPutToQueue(pTask->pMsgCb, STREAM_QUEUE, &msg); - - // release the task ref count - clearTaskSchedInfo(pTask); - streamMetaReleaseTask(pTask->pMeta, pTask); -} - -static int32_t schedTaskInFuture(SStreamTask* pTask) { - int32_t ref = atomic_add_fetch_32(&pTask->status.timerActive, 1); - stDebug("s-task:%s task should idle, add into timer to retry in %dms, ref:%d", pTask->id.idStr, - pTask->status.schedIdleTime, ref); - - // add one ref count for task - /*SStreamTask* pAddRefTask = */streamMetaAcquireOneTask(pTask); - - if (pTask->schedInfo.pIdleTimer == NULL) { - pTask->schedInfo.pIdleTimer = taosTmrStart(doStreamExecTaskHelper, pTask->status.schedIdleTime, pTask, streamTimer); - } else { - taosTmrReset(doStreamExecTaskHelper, pTask->status.schedIdleTime, pTask, streamTimer, &pTask->schedInfo.pIdleTimer); - } - - return TSDB_CODE_SUCCESS; -} - int32_t streamResumeTask(SStreamTask* pTask) { ASSERT(pTask->status.schedStatus == TASK_SCHED_STATUS__ACTIVE); const char* id = pTask->id.idStr; @@ -793,7 +732,7 @@ int32_t streamResumeTask(SStreamTask* pTask) { int32_t numOfItems = streamQueueGetNumOfItems(pTask->inputq.queue); if ((numOfItems == 0) || streamTaskShouldStop(pTask) || streamTaskShouldPause(pTask)) { atomic_store_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__INACTIVE); - clearTaskSchedInfo(pTask); + streamTaskClearSchedIdleInfo(pTask); taosThreadMutexUnlock(&pTask->lock); setLastExecTs(pTask, taosGetTimestampMs()); @@ -806,7 +745,7 @@ int32_t streamResumeTask(SStreamTask* pTask) { } else { // check if this task needs to be idle for a while if (pTask->status.schedIdleTime > 0) { - schedTaskInFuture(pTask); + streamTaskResumeInFuture(pTask); taosThreadMutexUnlock(&pTask->lock); setLastExecTs(pTask, taosGetTimestampMs()); diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index a091a866a0..8218319309 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -373,6 +373,7 @@ SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandF pMeta->numOfPausedTasks = 0; pMeta->numOfStreamTasks = 0; + pMeta->closeFlag = false; stInfo("vgId:%d open stream meta succ, latest checkpoint:%" PRId64 ", stage:%" PRId64, vgId, pMeta->chkpId, stage); @@ -1215,7 +1216,7 @@ void metaHbToMnode(void* param, void* tmrId) { } // need to stop, stop now - if (pMeta->pHbInfo->stopFlag == STREAM_META_WILL_STOP) { + if (pMeta->pHbInfo->stopFlag == STREAM_META_WILL_STOP) { // todo refactor: not need this now, use closeFlag in Meta pMeta->pHbInfo->stopFlag = STREAM_META_OK_TO_STOP; stDebug("vgId:%d jump out of meta timer", pMeta->vgId); taosReleaseRef(streamMetaId, rid); @@ -1281,6 +1282,8 @@ void streamMetaNotifyClose(SStreamMeta* pMeta) { streamMetaWLock(pMeta); + pMeta->closeFlag = true; + void* pIter = NULL; while (1) { pIter = taosHashIterate(pMeta->pTasksMap, pIter); @@ -1304,7 +1307,7 @@ void streamMetaNotifyClose(SStreamMeta* pMeta) { } } - stDebug("vgId:%d start to check all tasks", vgId); + stDebug("vgId:%d start to check all tasks for closing", vgId); int64_t st = taosGetTimestampMs(); while (streamMetaTaskInTimer(pMeta)) { @@ -1438,35 +1441,47 @@ void streamMetaUpdateStageRole(SStreamMeta* pMeta, int64_t stage, bool isLeader) } } -static SArray* prepareBeforeStartTasks(SStreamMeta* pMeta) { +static int32_t prepareBeforeStartTasks(SStreamMeta* pMeta, SArray** pList, int64_t now) { streamMetaWLock(pMeta); - SArray* pTaskList = taosArrayDup(pMeta->pTaskList, NULL); + if (pMeta->closeFlag) { + streamMetaWUnLock(pMeta); + stError("vgId:%d vnode is closed, not start check task(s) downstream status", pMeta->vgId); + return -1; + } + + *pList = taosArrayDup(pMeta->pTaskList, NULL); + taosHashClear(pMeta->startInfo.pReadyTaskSet); taosHashClear(pMeta->startInfo.pFailedTaskSet); - pMeta->startInfo.startTs = taosGetTimestampMs(); + pMeta->startInfo.startTs = now; streamMetaResetTaskStatus(pMeta); streamMetaWUnLock(pMeta); - return pTaskList; + return TSDB_CODE_SUCCESS; } int32_t streamMetaStartAllTasks(SStreamMeta* pMeta) { int32_t code = TSDB_CODE_SUCCESS; int32_t vgId = pMeta->vgId; + int64_t now = taosGetTimestampMs(); int32_t numOfTasks = taosArrayGetSize(pMeta->pTaskList); - stInfo("vgId:%d start to check all %d stream task(s) downstream status", vgId, numOfTasks); + stInfo("vgId:%d start to check all %d stream task(s) downstream status, start ts:%"PRId64, vgId, numOfTasks, now); if (numOfTasks == 0) { - stInfo("vgId:%d start tasks completed", pMeta->vgId); + stInfo("vgId:%d no tasks to be started", pMeta->vgId); return TSDB_CODE_SUCCESS; } - int64_t now = taosGetTimestampMs(); + SArray* pTaskList = NULL; + code = prepareBeforeStartTasks(pMeta, &pTaskList, now); + if (code != TSDB_CODE_SUCCESS) { + ASSERT(pTaskList == NULL); + return TSDB_CODE_SUCCESS; + } - SArray* pTaskList = prepareBeforeStartTasks(pMeta); numOfTasks = taosArrayGetSize(pTaskList); // broadcast the check downstream tasks msg @@ -1742,4 +1757,26 @@ int32_t streamMetaAddFailedTask(SStreamMeta* pMeta, int64_t streamId, int32_t ta } return code; +} + +void streamMetaAddIntoUpdateTaskList(SStreamMeta* pMeta, SStreamTask* pTask, SStreamTask* pHTask, int32_t transId, + int64_t startTs) { + const char* id = pTask->id.idStr; + int32_t vgId = pTask->pMeta->vgId; + + // keep the already updated info + STaskUpdateEntry entry = {.streamId = pTask->id.streamId, .taskId = pTask->id.taskId, .transId = transId}; + taosHashPut(pMeta->updateInfo.pTasks, &entry, sizeof(entry), NULL, 0); + + int64_t el = taosGetTimestampMs() - startTs; + if (pHTask != NULL) { + STaskUpdateEntry hEntry = {.streamId = pHTask->id.streamId, .taskId = pHTask->id.taskId, .transId = transId}; + taosHashPut(pMeta->updateInfo.pTasks, &hEntry, sizeof(hEntry), NULL, 0); + + stDebug("s-task:%s vgId:%d transId:%d task nodeEp update completed, streamTask/hTask closed, elapsed:%" PRId64 + " ms", id, vgId, transId, el); + } else { + stDebug("s-task:%s vgId:%d transId:%d task nodeEp update completed, streamTask closed, elapsed time:%" PRId64 "ms", + id, vgId, transId, el); + } } \ No newline at end of file diff --git a/source/libs/stream/src/streamQueue.c b/source/libs/stream/src/streamQueue.c index 9e872a1aff..0ac282c362 100644 --- a/source/libs/stream/src/streamQueue.c +++ b/source/libs/stream/src/streamQueue.c @@ -424,4 +424,6 @@ void streamTaskPutbackToken(STokenBucket* pBucket) { // size in KB void streamTaskConsumeQuota(STokenBucket* pBucket, int32_t bytes) { pBucket->quotaRemain -= SIZE_IN_MiB(bytes); -} \ No newline at end of file +} + +void streamTaskInputFail(SStreamTask* pTask) { atomic_store_8(&pTask->inputq.status, TASK_INPUT_STATUS__FAILED); } \ No newline at end of file diff --git a/source/libs/stream/src/streamSched.c b/source/libs/stream/src/streamSched.c new file mode 100644 index 0000000000..2e337234b6 --- /dev/null +++ b/source/libs/stream/src/streamSched.c @@ -0,0 +1,165 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#include "streamInt.h" +#include "ttimer.h" + +static void streamTaskResumeHelper(void* param, void* tmrId); +static void streamSchedByTimer(void* param, void* tmrId); + +int32_t streamSetupScheduleTrigger(SStreamTask* pTask) { + if (pTask->info.triggerParam != 0 && pTask->info.fillHistory == 0) { + int32_t ref = atomic_add_fetch_32(&pTask->refCnt, 1); + ASSERT(ref == 2 && pTask->schedInfo.pDelayTimer == NULL); + + stDebug("s-task:%s setup scheduler trigger, delay:%" PRId64 " ms", pTask->id.idStr, pTask->info.triggerParam); + + pTask->schedInfo.pDelayTimer = taosTmrStart(streamSchedByTimer, (int32_t)pTask->info.triggerParam, pTask, streamTimer); + pTask->schedInfo.status = TASK_TRIGGER_STATUS__INACTIVE; + } + + return 0; +} + +int32_t streamTrySchedExec(SStreamTask* pTask) { + if (streamTaskSetSchedStatusWait(pTask)) { + streamTaskSchedTask(pTask->pMsgCb, pTask->info.nodeId, pTask->id.streamId, pTask->id.taskId, 0); + } else { + stTrace("s-task:%s not launch task since sched status:%d", pTask->id.idStr, pTask->status.schedStatus); + } + + return 0; +} + +int32_t streamTaskSchedTask(SMsgCb* pMsgCb, int32_t vgId, int64_t streamId, int32_t taskId, int32_t execType) { + SStreamTaskRunReq* pRunReq = rpcMallocCont(sizeof(SStreamTaskRunReq)); + if (pRunReq == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + stError("vgId:%d failed to create msg to start stream task:0x%x exec, type:%d, code:%s", vgId, taskId, execType, + terrstr()); + return -1; + } + + stDebug("vgId:%d create msg to start stream task:0x%x, exec type:%d", vgId, taskId, execType); + + pRunReq->head.vgId = vgId; + pRunReq->streamId = streamId; + pRunReq->taskId = taskId; + pRunReq->reqType = execType; + + SRpcMsg msg = {.msgType = TDMT_STREAM_TASK_RUN, .pCont = pRunReq, .contLen = sizeof(SStreamTaskRunReq)}; + tmsgPutToQueue(pMsgCb, STREAM_QUEUE, &msg); + return TSDB_CODE_SUCCESS; +} + +void streamTaskClearSchedIdleInfo(SStreamTask* pTask) { pTask->status.schedIdleTime = 0; } + +void streamTaskSetIdleInfo(SStreamTask* pTask, int32_t idleTime) { pTask->status.schedIdleTime = idleTime; } + +int32_t streamTaskResumeInFuture(SStreamTask* pTask) { + int32_t ref = atomic_add_fetch_32(&pTask->status.timerActive, 1); + stDebug("s-task:%s task should idle, add into timer to retry in %dms, ref:%d", pTask->id.idStr, + pTask->status.schedIdleTime, ref); + + // add one ref count for task + /*SStreamTask* pAddRefTask = */streamMetaAcquireOneTask(pTask); + + if (pTask->schedInfo.pIdleTimer == NULL) { + pTask->schedInfo.pIdleTimer = taosTmrStart(streamTaskResumeHelper, pTask->status.schedIdleTime, pTask, streamTimer); + } else { + taosTmrReset(streamTaskResumeHelper, pTask->status.schedIdleTime, pTask, streamTimer, &pTask->schedInfo.pIdleTimer); + } + + return TSDB_CODE_SUCCESS; +} + +////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// +void streamTaskResumeHelper(void* param, void* tmrId) { + SStreamTask* pTask = (SStreamTask*)param; + SStreamTaskId* pId = &pTask->id; + SStreamTaskState* p = streamTaskGetStatus(pTask); + + if (p->state == TASK_STATUS__DROPPING || p->state == TASK_STATUS__STOP) { + streamTaskSetSchedStatusInactive(pTask); + + int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1); + stDebug("s-task:%s status:%s not resume task, ref:%d", pId->idStr, p->name, ref); + + streamMetaReleaseTask(pTask->pMeta, pTask); + return; + } + + streamTaskSchedTask(pTask->pMsgCb, pTask->info.nodeId, pId->streamId, pId->taskId, STREAM_EXEC_T_RESUME_TASK); + + int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1); + stDebug("trigger to resume s-task:%s after being idled for %dms, ref:%d", pId->idStr, pTask->status.schedIdleTime, + ref); + + // release the task ref count + streamTaskClearSchedIdleInfo(pTask); + streamMetaReleaseTask(pTask->pMeta, pTask); +} + +void streamSchedByTimer(void* param, void* tmrId) { + SStreamTask* pTask = (void*)param; + const char* id = pTask->id.idStr; + int32_t nextTrigger = (int32_t)pTask->info.triggerParam; + + int8_t status = atomic_load_8(&pTask->schedInfo.status); + stTrace("s-task:%s in scheduler, trigger status:%d, next:%dms", id, status, nextTrigger); + + if (streamTaskShouldStop(pTask) || streamTaskShouldPause(pTask)) { + stDebug("s-task:%s jump out of schedTimer", id); + return; + } + + if (streamTaskGetStatus(pTask)->state == TASK_STATUS__CK) { + stDebug("s-task:%s in checkpoint procedure, not retrieve result, next:%dms", id, nextTrigger); + } else { + if (status == TASK_TRIGGER_STATUS__ACTIVE) { + SStreamTrigger* pTrigger = taosAllocateQitem(sizeof(SStreamTrigger), DEF_QITEM, 0); + if (pTrigger == NULL) { + stError("s-task:%s failed to prepare retrieve data trigger, code:%s, try again in %dms", id, "out of memory", + nextTrigger); + taosTmrReset(streamSchedByTimer, nextTrigger, pTask, streamTimer, &pTask->schedInfo.pDelayTimer); + return; + } + + pTrigger->type = STREAM_INPUT__GET_RES; + pTrigger->pBlock = taosMemoryCalloc(1, sizeof(SSDataBlock)); + if (pTrigger->pBlock == NULL) { + taosFreeQitem(pTrigger); + + stError("s-task:%s failed to prepare retrieve data trigger, code:%s, try again in %dms", id, "out of memory", + nextTrigger); + taosTmrReset(streamSchedByTimer, nextTrigger, pTask, streamTimer, &pTask->schedInfo.pDelayTimer); + return; + } + + atomic_store_8(&pTask->schedInfo.status, TASK_TRIGGER_STATUS__INACTIVE); + pTrigger->pBlock->info.type = STREAM_GET_ALL; + + int32_t code = streamTaskPutDataIntoInputQ(pTask, (SStreamQueueItem*)pTrigger); + if (code != TSDB_CODE_SUCCESS) { + taosTmrReset(streamSchedByTimer, nextTrigger, pTask, streamTimer, &pTask->schedInfo.pDelayTimer); + return; + } + + streamTrySchedExec(pTask); + } + } + + taosTmrReset(streamSchedByTimer, nextTrigger, pTask, streamTimer, &pTask->schedInfo.pDelayTimer); +} diff --git a/source/libs/stream/src/streamTask.c b/source/libs/stream/src/streamTask.c index 9f4e6aaea1..c7e34987ab 100644 --- a/source/libs/stream/src/streamTask.c +++ b/source/libs/stream/src/streamTask.c @@ -992,3 +992,55 @@ int32_t streamTaskSendCheckpointReq(SStreamTask* pTask) { tmsgSendReq(&pTask->info.mnodeEpset, &msg); return 0; } + +SStreamChildEpInfo* streamTaskGetUpstreamTaskEpInfo(SStreamTask* pTask, int32_t taskId) { + int32_t num = taosArrayGetSize(pTask->upstreamInfo.pList); + for (int32_t i = 0; i < num; ++i) { + SStreamChildEpInfo* pInfo = taosArrayGetP(pTask->upstreamInfo.pList, i); + if (pInfo->taskId == taskId) { + return pInfo; + } + } + + stError("s-task:%s failed to find upstream task:0x%x", pTask->id.idStr, taskId); + return NULL; +} + +char* createStreamTaskIdStr(int64_t streamId, int32_t taskId) { + char buf[128] = {0}; + sprintf(buf, "0x%" PRIx64 "-0x%x", streamId, taskId); + return taosStrdup(buf); +} + +static int32_t streamTaskEnqueueRetrieve(SStreamTask* pTask, SStreamRetrieveReq* pReq) { + SStreamDataBlock* pData = taosAllocateQitem(sizeof(SStreamDataBlock), DEF_QITEM, sizeof(SStreamDataBlock)); + int8_t status = TASK_INPUT_STATUS__NORMAL; + + // enqueue + if (pData != NULL) { + stDebug("s-task:%s (child %d) recv retrieve req from task:0x%x(vgId:%d), reqId:0x%" PRIx64, pTask->id.idStr, + pTask->info.selfChildId, pReq->srcTaskId, pReq->srcNodeId, pReq->reqId); + + pData->type = STREAM_INPUT__DATA_RETRIEVE; + pData->srcVgId = 0; + streamRetrieveReqToData(pReq, pData); + if (streamTaskPutDataIntoInputQ(pTask, (SStreamQueueItem*)pData) == 0) { + status = TASK_INPUT_STATUS__NORMAL; + } else { + status = TASK_INPUT_STATUS__FAILED; + } + } else { // todo handle oom + /*streamTaskInputFail(pTask);*/ + /*status = TASK_INPUT_STATUS__FAILED;*/ + } + + return status == TASK_INPUT_STATUS__NORMAL ? 0 : -1; +} + +int32_t streamProcessRetrieveReq(SStreamTask* pTask, SStreamRetrieveReq* pReq) { + int32_t code = streamTaskEnqueueRetrieve(pTask, pReq); + if(code != 0){ + return code; + } + return streamTrySchedExec(pTask); +} \ No newline at end of file diff --git a/source/libs/stream/src/streamTimer.c b/source/libs/stream/src/streamTimer.c new file mode 100644 index 0000000000..6e956e2682 --- /dev/null +++ b/source/libs/stream/src/streamTimer.c @@ -0,0 +1,40 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#include "streamInt.h" +#include "ttimer.h" + +void* streamTimer = NULL; + +int32_t streamTimerInit() { + streamTimer = taosTmrInit(1000, 100, 10000, "STREAM"); + if (streamTimer == NULL) { + stError("init stream timer failed, code:%s", tstrerror(terrno)); + return -1; + } + + stInfo("init stream timer, %p", streamTimer); + return 0; +} + +void streamTimerCleanUp() { + stInfo("cleanup stream timer, %p", streamTimer); + taosTmrCleanUp(streamTimer); + streamTimer = NULL; +} + +tmr_h streamTimerGetInstance() { + return streamTimer; +} diff --git a/source/libs/sync/src/syncSnapshot.c b/source/libs/sync/src/syncSnapshot.c index 973eb8aaa5..8dab694975 100644 --- a/source/libs/sync/src/syncSnapshot.c +++ b/source/libs/sync/src/syncSnapshot.c @@ -1100,6 +1100,14 @@ static int32_t syncNodeOnSnapshotPrepRsp(SSyncNode *pSyncNode, SSyncSnapshotSend int32_t code = -1; SSnapshot snapshot = {0}; + if (pMsg->snapBeginIndex > pSyncNode->commitIndex) { + sSError(pSender, + "snapshot begin index is greater than commit index. snapBeginIndex:%" PRId64 ", commitIndex:%" PRId64, + pMsg->snapBeginIndex, pSyncNode->commitIndex); + terrno = TSDB_CODE_SYN_INVALID_SNAPSHOT_MSG; + return -1; + } + taosThreadMutexLock(&pSender->pSndBuf->mutex); pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, &snapshot); diff --git a/tests/parallel_test/cases.task b/tests/parallel_test/cases.task index 3fca381fda..5770497c7d 100644 --- a/tests/parallel_test/cases.task +++ b/tests/parallel_test/cases.task @@ -383,6 +383,7 @@ ,,y,system-test,./pytest.sh python3 ./test.py -f 1-insert/test_ts4479.py ,,y,system-test,./pytest.sh python3 ./test.py -f 1-insert/test_td29793.py ,,y,system-test,./pytest.sh python3 ./test.py -f 1-insert/insert_timestamp.py +,,y,system-test,./pytest.sh python3 ./test.py -f 1-insert/test_td29157.py ,,y,system-test,./pytest.sh python3 ./test.py -f 0-others/show.py ,,y,system-test,./pytest.sh python3 ./test.py -f 0-others/show_tag_index.py ,,y,system-test,./pytest.sh python3 ./test.py -f 0-others/information_schema.py diff --git a/tests/system-test/1-insert/test_td29157.py b/tests/system-test/1-insert/test_td29157.py new file mode 100644 index 0000000000..69ce60c6a9 --- /dev/null +++ b/tests/system-test/1-insert/test_td29157.py @@ -0,0 +1,55 @@ +from util.log import * +from util.cases import * +from util.sql import * + + +class TDTestCase: + """Verify td-29157 + """ + def init(self, conn, logSql, replicaVer=1): + tdLog.debug("start to execute %s" % __file__) + tdSql.init(conn.cursor(), True) + self.conn = conn + self.db_name = "td29157" + + def run(self): + self.conn.execute(f"drop database if exists {self.db_name}") + self.conn.execute(f"CREATE DATABASE {self.db_name}") + self.conn.execute(f"USE {self.db_name}") + + tdSql.execute("create table stb1 (ts timestamp, c0 varbinary(10)) tags(t0 varbinary(10));") + tdSql.execute("insert into ctb11 using stb1 tags(\"0x11\") values(now,\"0x01\");") + tdSql.execute("insert into ctb12 using stb1 tags(\"0x22\") values(now,\"0x02\");") + tdSql.query("show tags from ctb11;") + tdSql.checkRows(1) + tdSql.checkData(0, 3, 't0') + tdSql.checkData(0, 4, 'VARBINARY(10)') + tdSql.checkData(0, 5, '\\x30783131') + + tdSql.execute("create table stb2 (ts timestamp, c0 geometry(500)) tags(t0 geometry(100));") + tdSql.execute("insert into ctb2 using stb2 tags('LINESTRING (1.000000 1.000000, 2.000000 2.000000, 5.000000 5.000000)') values(now,'POLYGON((1.0 1.0, 2.0 2.0, 1.0 1.0))');") + tdSql.query("show tags from ctb2;") + tdSql.checkRows(1) + tdSql.checkData(0, 3, 't0') + tdSql.checkData(0, 4, 'GEOMETRY(100)') + tdSql.checkData(0, 5, 'LINESTRING (1.000000 1.000000, 2.000000 2.000000, 5.000000 5.000000)') + + tdSql.execute("create table stb3 (ts timestamp, c0 bigint, c1 varchar(10)) tags(t0 geometry(100), t1 varbinary(10));") + tdSql.execute("insert into ctb3 using stb3 tags('POLYGON EMPTY', \"0x03\") values(now,100, \"abc\");") + tdSql.query("show tags from ctb3;") + tdSql.checkRows(2) + tdSql.checkData(0, 3, 't0') + tdSql.checkData(0, 4, 'GEOMETRY(100)') + tdSql.checkData(0, 5, 'POLYGON EMPTY') + tdSql.checkData(1, 3, 't1') + tdSql.checkData(1, 4, 'VARBINARY(10)') + tdSql.checkData(1, 5, '\\x30783033') + + + def stop(self): + tdSql.execute("drop database if exists %s" % self.db_name) + tdSql.close() + tdLog.success("%s successfully executed" % __file__) + +tdCases.addWindows(__file__, TDTestCase()) +tdCases.addLinux(__file__, TDTestCase())