Merge branch '3.0' into enh/TD-30554-3.0

This commit is contained in:
kailixu 2024-06-17 05:52:15 +08:00
commit e3645def98
25 changed files with 537 additions and 371 deletions

View File

@ -2,7 +2,7 @@
# taos-tools # taos-tools
ExternalProject_Add(taos-tools ExternalProject_Add(taos-tools
GIT_REPOSITORY https://github.com/taosdata/taos-tools.git GIT_REPOSITORY https://github.com/taosdata/taos-tools.git
GIT_TAG main GIT_TAG 3.0
SOURCE_DIR "${TD_SOURCE_DIR}/tools/taos-tools" SOURCE_DIR "${TD_SOURCE_DIR}/tools/taos-tools"
BINARY_DIR "" BINARY_DIR ""
#BUILD_IN_SOURCE TRUE #BUILD_IN_SOURCE TRUE

View File

@ -43,7 +43,7 @@ int32_t tqStreamTaskProcessTaskPauseReq(SStreamMeta* pMeta, char* pMsg);
int32_t tqStreamTaskProcessTaskResumeReq(void* handle, int64_t sversion, char* pMsg, bool fromVnode); int32_t tqStreamTaskProcessTaskResumeReq(void* handle, int64_t sversion, char* pMsg, bool fromVnode);
int32_t tqStreamTaskProcessUpdateCheckpointReq(SStreamMeta* pMeta, char* msg, int32_t msgLen); int32_t tqStreamTaskProcessUpdateCheckpointReq(SStreamMeta* pMeta, char* msg, int32_t msgLen);
int32_t tqExpandStreamTask(SStreamTask* pTask, SStreamMeta* pMeta);
void tqSetRestoreVersionInfo(SStreamTask* pTask); void tqSetRestoreVersionInfo(SStreamTask* pTask);
int32_t tqExpandStreamTask(SStreamTask* pTask);
#endif // TDENGINE_TQ_COMMON_H #endif // TDENGINE_TQ_COMMON_H

View File

@ -157,7 +157,8 @@ typedef enum EStreamTaskEvent {
typedef void FTbSink(SStreamTask* pTask, void* vnode, void* data); typedef void FTbSink(SStreamTask* pTask, void* vnode, void* data);
typedef void FSmaSink(void* vnode, int64_t smaId, const SArray* data); typedef void FSmaSink(void* vnode, int64_t smaId, const SArray* data);
typedef int32_t FTaskExpand(void* ahandle, SStreamTask* pTask, int64_t ver); typedef int32_t FTaskBuild(void* ahandle, SStreamTask* pTask, int64_t ver);
typedef int32_t FTaskExpand(SStreamTask* pTask);
typedef struct { typedef struct {
int8_t type; int8_t type;
@ -205,7 +206,6 @@ typedef struct {
typedef struct { typedef struct {
char stbFullName[TSDB_TABLE_FNAME_LEN]; char stbFullName[TSDB_TABLE_FNAME_LEN];
int32_t waitingRspCnt;
SUseDbRsp dbInfo; SUseDbRsp dbInfo;
} STaskDispatcherShuffle; } STaskDispatcherShuffle;
@ -312,15 +312,18 @@ typedef struct SMetaHbInfo SMetaHbInfo;
typedef struct SDispatchMsgInfo { typedef struct SDispatchMsgInfo {
SStreamDispatchReq* pData; // current dispatch data SStreamDispatchReq* pData; // current dispatch data
int8_t dispatchMsgType;
int64_t checkpointId;// checkpoint id msg int8_t dispatchMsgType;
int32_t transId; // transId for current checkpoint int64_t checkpointId; // checkpoint id msg
int16_t msgType; // dispatch msg type int32_t transId; // transId for current checkpoint
int32_t retryCount; // retry send data count int16_t msgType; // dispatch msg type
int64_t startTs; // dispatch start time, record total elapsed time for dispatch int32_t msgId;
SArray* pRetryList; // current dispatch successfully completed node of downstream int64_t startTs; // dispatch start time, record total elapsed time for dispatch
void* pRetryTmr; // used to dispatch data after a given time duration int64_t rspTs; // latest rsp time
void* pRspTmr; // used to dispatch data after a given time duration void* pRetryTmr; // used to dispatch data after a given time duration
TdThreadMutex lock;
int8_t inMonitor;
SArray* pSendInfo; // SArray<SDispatchEntry>
} SDispatchMsgInfo; } SDispatchMsgInfo;
typedef struct STaskQueue { typedef struct STaskQueue {
@ -484,7 +487,8 @@ typedef struct SStreamMeta {
SArray* pTaskList; // SArray<STaskId*> SArray* pTaskList; // SArray<STaskId*>
void* ahandle; void* ahandle;
TXN* txn; TXN* txn;
FTaskExpand* expandFunc; FTaskBuild* buildTaskFn;
FTaskExpand* expandTaskFn;
int32_t vgId; int32_t vgId;
int64_t stage; int64_t stage;
int32_t role; int32_t role;
@ -708,8 +712,8 @@ SScanhistoryDataInfo streamScanHistoryData(SStreamTask* pTask, int64_t st);
// stream task meta // stream task meta
void streamMetaInit(); void streamMetaInit();
void streamMetaCleanup(); void streamMetaCleanup();
SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandFunc, int32_t vgId, int64_t stage, SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskBuild expandFunc, FTaskExpand expandTaskFn,
startComplete_fn_t fn); int32_t vgId, int64_t stage, startComplete_fn_t fn);
void streamMetaClose(SStreamMeta* streamMeta); void streamMetaClose(SStreamMeta* streamMeta);
int32_t streamMetaSaveTask(SStreamMeta* pMeta, SStreamTask* pTask); // save to stream meta store int32_t streamMetaSaveTask(SStreamMeta* pMeta, SStreamTask* pTask); // save to stream meta store
int32_t streamMetaRemoveTask(SStreamMeta* pMeta, STaskId* pKey); int32_t streamMetaRemoveTask(SStreamMeta* pMeta, STaskId* pKey);

View File

@ -88,7 +88,7 @@ static int32_t parseSignAndUInteger(const char *z, int32_t n, bool *is_neg, uint
if (errno == ERANGE || errno == EINVAL || endPtr - z != n) { if (errno == ERANGE || errno == EINVAL || endPtr - z != n) {
return TSDB_CODE_FAILED; return TSDB_CODE_FAILED;
} }
if (val > UINT64_MAX) { if (val > (double)UINT64_MAX) {
errno = ERANGE; errno = ERANGE;
return TSDB_CODE_FAILED; return TSDB_CODE_FAILED;
} }
@ -172,7 +172,7 @@ int32_t toIntegerEx(const char *z, int32_t n, uint32_t type, int64_t *value) {
} break; } break;
case TK_NK_FLOAT: { case TK_NK_FLOAT: {
double val = round(taosStr2Double(z, &endPtr)); double val = round(taosStr2Double(z, &endPtr));
if (!IS_VALID_INT64(val)) { if(val < (double)INT64_MIN || val > (double)INT64_MAX){
return TSDB_CODE_FAILED; return TSDB_CODE_FAILED;
} }
if (errno == ERANGE || errno == EINVAL || endPtr - z != n) { if (errno == ERANGE || errno == EINVAL || endPtr - z != n) {
@ -271,7 +271,7 @@ int32_t toUIntegerEx(const char *z, int32_t n, uint32_t type, uint64_t *value) {
} break; } break;
case TK_NK_FLOAT: { case TK_NK_FLOAT: {
double val = round(taosStr2Double(p, &endPtr)); double val = round(taosStr2Double(p, &endPtr));
if (!IS_VALID_UINT64(val)) { if (val < 0 || val > (double)UINT64_MAX) {
return TSDB_CODE_FAILED; return TSDB_CODE_FAILED;
} }
if (errno == ERANGE || errno == EINVAL || endPtr - z != n) { if (errno == ERANGE || errno == EINVAL || endPtr - z != n) {

View File

@ -25,14 +25,6 @@
#define sndDebug(...) do { if (sndDebugFlag & DEBUG_DEBUG) { taosPrintLog("SND ", DEBUG_DEBUG, sndDebugFlag, __VA_ARGS__);}} while (0) #define sndDebug(...) do { if (sndDebugFlag & DEBUG_DEBUG) { taosPrintLog("SND ", DEBUG_DEBUG, sndDebugFlag, __VA_ARGS__);}} while (0)
// clang-format on // clang-format on
static STaskId replaceStreamTaskId(SStreamTask *pTask) {
ASSERT(pTask->info.fillHistory);
STaskId id = {.streamId = pTask->id.streamId, .taskId = pTask->id.taskId};
pTask->id.streamId = pTask->streamTaskId.streamId;
pTask->id.taskId = pTask->streamTaskId.taskId;
return id;
}
static void restoreStreamTaskId(SStreamTask *pTask, STaskId *pId) { static void restoreStreamTaskId(SStreamTask *pTask, STaskId *pId) {
ASSERT(pTask->info.fillHistory); ASSERT(pTask->info.fillHistory);
pTask->id.taskId = pId->taskId; pTask->id.taskId = pId->taskId;
@ -85,7 +77,7 @@ SSnode *sndOpen(const char *path, const SSnodeOpt *pOption) {
startRsync(); startRsync();
pSnode->msgCb = pOption->msgCb; pSnode->msgCb = pOption->msgCb;
pSnode->pMeta = streamMetaOpen(path, pSnode, (FTaskExpand *)sndExpandTask, SNODE_HANDLE, taosGetTimestampMs(), tqStartTaskCompleteCallback); pSnode->pMeta = streamMetaOpen(path, pSnode, (FTaskBuild *)sndExpandTask, tqExpandStreamTask, SNODE_HANDLE, taosGetTimestampMs(), tqStartTaskCompleteCallback);
if (pSnode->pMeta == NULL) { if (pSnode->pMeta == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
goto FAIL; goto FAIL;

View File

@ -264,7 +264,7 @@ int32_t tqProcessStreamReqCheckpointRsp(STQ* pTq, SRpcMsg* pMsg);
int32_t tqProcessTaskChkptReportRsp(STQ* pTq, SRpcMsg* pMsg); int32_t tqProcessTaskChkptReportRsp(STQ* pTq, SRpcMsg* pMsg);
int32_t tqProcessTaskCheckpointReadyRsp(STQ* pTq, SRpcMsg* pMsg); int32_t tqProcessTaskCheckpointReadyRsp(STQ* pTq, SRpcMsg* pMsg);
int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver); int32_t tqBuildStreamTask(void* pTq, SStreamTask* pTask, int64_t ver);
int32_t tqScanWal(STQ* pTq); int32_t tqScanWal(STQ* pTq);
int tqCommit(STQ*); int tqCommit(STQ*);

View File

@ -90,7 +90,8 @@ STQ* tqOpen(const char* path, SVnode* pVnode) {
int32_t tqInitialize(STQ* pTq) { int32_t tqInitialize(STQ* pTq) {
int32_t vgId = TD_VID(pTq->pVnode); int32_t vgId = TD_VID(pTq->pVnode);
pTq->pStreamMeta = streamMetaOpen(pTq->path, pTq, (FTaskExpand*)tqExpandTask, vgId, -1, tqStartTaskCompleteCallback); pTq->pStreamMeta =
streamMetaOpen(pTq->path, pTq, tqBuildStreamTask, tqExpandStreamTask, vgId, -1, tqStartTaskCompleteCallback);
if (pTq->pStreamMeta == NULL) { if (pTq->pStreamMeta == NULL) {
return -1; return -1;
} }
@ -713,7 +714,9 @@ int32_t tqProcessSubscribeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg
static void freePtr(void* ptr) { taosMemoryFree(*(void**)ptr); } static void freePtr(void* ptr) { taosMemoryFree(*(void**)ptr); }
int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t nextProcessVer) { int32_t tqBuildStreamTask(void* pTqObj, SStreamTask* pTask, int64_t nextProcessVer) {
STQ* pTq = (STQ*) pTqObj;
int32_t vgId = TD_VID(pTq->pVnode); int32_t vgId = TD_VID(pTq->pVnode);
tqDebug("s-task:0x%x start to build task", pTask->id.taskId); tqDebug("s-task:0x%x start to build task", pTask->id.taskId);
@ -1010,16 +1013,6 @@ int32_t tqProcessTaskDropReq(STQ* pTq, char* msg, int32_t msgLen) {
} }
int32_t tqProcessTaskUpdateCheckpointReq(STQ* pTq, char* msg, int32_t msgLen) { int32_t tqProcessTaskUpdateCheckpointReq(STQ* pTq, char* msg, int32_t msgLen) {
int32_t vgId = TD_VID(pTq->pVnode);
SVUpdateCheckpointInfoReq* pReq = (SVUpdateCheckpointInfoReq*)msg;
// if (!pTq->pVnode->restored) {
// tqDebug("vgId:%d update-checkpoint-info msg received during restoring, checkpointId:%" PRId64
// ", transId:%d s-task:0x%x ignore it",
// vgId, pReq->checkpointId, pReq->transId, pReq->taskId);
// return TSDB_CODE_SUCCESS;
// }
return tqStreamTaskProcessUpdateCheckpointReq(pTq->pStreamMeta, msg, msgLen); return tqStreamTaskProcessUpdateCheckpointReq(pTq->pStreamMeta, msg, msgLen);
} }

View File

@ -362,6 +362,7 @@ int32_t tqStreamTaskProcessDispatchRsp(SStreamMeta* pMeta, SRpcMsg* pMsg) {
SStreamDispatchRsp* pRsp = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)); SStreamDispatchRsp* pRsp = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
int32_t vgId = pMeta->vgId; int32_t vgId = pMeta->vgId;
pRsp->upstreamNodeId = htonl(pRsp->upstreamNodeId);
pRsp->upstreamTaskId = htonl(pRsp->upstreamTaskId); pRsp->upstreamTaskId = htonl(pRsp->upstreamTaskId);
pRsp->streamId = htobe64(pRsp->streamId); pRsp->streamId = htobe64(pRsp->streamId);
pRsp->downstreamTaskId = htonl(pRsp->downstreamTaskId); pRsp->downstreamTaskId = htonl(pRsp->downstreamTaskId);
@ -369,6 +370,9 @@ int32_t tqStreamTaskProcessDispatchRsp(SStreamMeta* pMeta, SRpcMsg* pMsg) {
pRsp->stage = htobe64(pRsp->stage); pRsp->stage = htobe64(pRsp->stage);
pRsp->msgId = htonl(pRsp->msgId); pRsp->msgId = htonl(pRsp->msgId);
tqDebug("s-task:0x%x vgId:%d recv dispatch-rsp from 0x%x vgId:%d", pRsp->upstreamTaskId, pRsp->upstreamNodeId,
pRsp->downstreamTaskId, pRsp->downstreamNodeId);
SStreamTask* pTask = streamMetaAcquireTask(pMeta, pRsp->streamId, pRsp->upstreamTaskId); SStreamTask* pTask = streamMetaAcquireTask(pMeta, pRsp->streamId, pRsp->upstreamTaskId);
if (pTask) { if (pTask) {
streamProcessDispatchRsp(pTask, pRsp, pMsg->code); streamProcessDispatchRsp(pTask, pRsp, pMsg->code);
@ -414,7 +418,9 @@ int32_t tqStreamTaskProcessRetrieveReq(SStreamMeta* pMeta, SRpcMsg* pMsg) {
streamMetaReleaseTask(pMeta, pTask); streamMetaReleaseTask(pMeta, pTask);
tCleanupStreamRetrieveReq(&req); tCleanupStreamRetrieveReq(&req);
return code;
// always return success, to disable the auto rsp
return TSDB_CODE_SUCCESS;
} }
int32_t tqStreamTaskProcessCheckReq(SStreamMeta* pMeta, SRpcMsg* pMsg) { int32_t tqStreamTaskProcessCheckReq(SStreamMeta* pMeta, SRpcMsg* pMsg) {

View File

@ -46,6 +46,12 @@ SSttBlockLoadInfo *tCreateSttBlockLoadInfo(STSchema *pSchema, int16_t *colList,
} }
pLoadInfo->aSttBlk = taosArrayInit(4, sizeof(SSttBlk)); pLoadInfo->aSttBlk = taosArrayInit(4, sizeof(SSttBlk));
if (pLoadInfo->aSttBlk == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
taosMemoryFreeClear(pLoadInfo);
return NULL;
}
pLoadInfo->pSchema = pSchema; pLoadInfo->pSchema = pSchema;
pLoadInfo->colIds = colList; pLoadInfo->colIds = colList;
pLoadInfo->numOfCols = numOfCols; pLoadInfo->numOfCols = numOfCols;
@ -107,15 +113,21 @@ void *destroySttBlockReader(SArray *pLDataIterArray, SSttBlockLoadCostInfo *pLoa
SArray *pList = taosArrayGetP(pLDataIterArray, i); SArray *pList = taosArrayGetP(pLDataIterArray, i);
for (int32_t j = 0; j < taosArrayGetSize(pList); ++j) { for (int32_t j = 0; j < taosArrayGetSize(pList); ++j) {
SLDataIter *pIter = taosArrayGetP(pList, j); SLDataIter *pIter = taosArrayGetP(pList, j);
if (pIter->pBlockLoadInfo == NULL) {
continue;
}
SSttBlockLoadCostInfo* pCost = &pIter->pBlockLoadInfo->cost;
if (pLoadCost != NULL) { if (pLoadCost != NULL) {
pLoadCost->loadBlocks += pIter->pBlockLoadInfo->cost.loadBlocks; pLoadCost->loadBlocks += pCost->loadBlocks;
pLoadCost->loadStatisBlocks += pIter->pBlockLoadInfo->cost.loadStatisBlocks; pLoadCost->loadStatisBlocks += pCost->loadStatisBlocks;
pLoadCost->blockElapsedTime += pIter->pBlockLoadInfo->cost.blockElapsedTime; pLoadCost->blockElapsedTime += pCost->blockElapsedTime;
pLoadCost->statisElapsedTime += pIter->pBlockLoadInfo->cost.statisElapsedTime; pLoadCost->statisElapsedTime += pCost->statisElapsedTime;
} }
destroyLDataIter(pIter); destroyLDataIter(pIter);
} }
taosArrayDestroy(pList); taosArrayDestroy(pList);
} }
@ -903,6 +915,10 @@ int32_t tMergeTreeOpen2(SMergeTree *pMTree, SMergeTreeConf *pConf, SSttDataInfoF
if (pLoadInfo == NULL) { if (pLoadInfo == NULL) {
pLoadInfo = tCreateSttBlockLoadInfo(pConf->pSchema, pConf->pCols, pConf->numOfCols); pLoadInfo = tCreateSttBlockLoadInfo(pConf->pSchema, pConf->pCols, pConf->numOfCols);
if (pLoadInfo == NULL) {
code = terrno;
goto _end;
}
} }
memset(pIter, 0, sizeof(SLDataIter)); memset(pIter, 0, sizeof(SLDataIter));

View File

@ -2240,7 +2240,8 @@ static bool initSttBlockReader(SSttBlockReader* pSttBlockReader, STableBlockScan
}; };
SSttDataInfoForTable info = {.pKeyRangeList = taosArrayInit(4, sizeof(SSttKeyRange))}; SSttDataInfoForTable info = {.pKeyRangeList = taosArrayInit(4, sizeof(SSttKeyRange))};
int32_t code = tMergeTreeOpen2(&pSttBlockReader->mergeTree, &conf, &info);
int32_t code = tMergeTreeOpen2(&pSttBlockReader->mergeTree, &conf, &info);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
return false; return false;
} }

View File

@ -985,6 +985,10 @@ int32_t tsdbGetRowsInSttFiles(STFileSet* pFileSet, SArray* pSttFileBlockIterArra
if (pIter->pBlockLoadInfo == NULL) { if (pIter->pBlockLoadInfo == NULL) {
pIter->pBlockLoadInfo = tCreateSttBlockLoadInfo(pConf->pSchema, pConf->pCols, pConf->numOfCols); pIter->pBlockLoadInfo = tCreateSttBlockLoadInfo(pConf->pSchema, pConf->pCols, pConf->numOfCols);
if (pIter->pBlockLoadInfo == NULL) {
tsdbError("failed to create block load info, code: out of memory, %s", pstr);
continue;
}
} }
// load stt blocks statis for all stt-blocks, to decide if the data of queried table exists in current stt file // load stt blocks statis for all stt-blocks, to decide if the data of queried table exists in current stt file

View File

@ -64,7 +64,7 @@ static SFilePage *loadDataFromFilePage(tMemBucket *pMemBucket, int32_t slotIdx)
static void resetBoundingBox(MinMaxEntry *range, int32_t type) { static void resetBoundingBox(MinMaxEntry *range, int32_t type) {
if (IS_SIGNED_NUMERIC_TYPE(type)) { if (IS_SIGNED_NUMERIC_TYPE(type)) {
range->dMaxVal = INT64_MIN; range->dMaxVal = INT64_MIN;
range->dMinVal = INT64_MAX; range->dMinVal = (double)INT64_MAX;
} else if (IS_UNSIGNED_NUMERIC_TYPE(type)) { } else if (IS_UNSIGNED_NUMERIC_TYPE(type)) {
range->u64MaxVal = 0; range->u64MaxVal = 0;
range->u64MinVal = UINT64_MAX; range->u64MinVal = UINT64_MAX;

View File

@ -26,6 +26,10 @@
#include "tudf.h" #include "tudf.h"
#include "tudfInt.h" #include "tudfInt.h"
#ifdef _TD_DARWIN_64
#include <mach-o/dyld.h>
#endif
typedef struct SUdfdData { typedef struct SUdfdData {
bool startCalled; bool startCalled;
bool needCleanUp; bool needCleanUp;

View File

@ -26,20 +26,16 @@
extern "C" { extern "C" {
#endif #endif
#define CHECK_RSP_CHECK_INTERVAL 300 #define CHECK_RSP_CHECK_INTERVAL 300
#define LAUNCH_HTASK_INTERVAL 100 #define LAUNCH_HTASK_INTERVAL 100
#define WAIT_FOR_MINIMAL_INTERVAL 100.00 #define WAIT_FOR_MINIMAL_INTERVAL 100.00
#define MAX_RETRY_LAUNCH_HISTORY_TASK 40 #define MAX_RETRY_LAUNCH_HISTORY_TASK 40
#define RETRY_LAUNCH_INTERVAL_INC_RATE 1.2 #define RETRY_LAUNCH_INTERVAL_INC_RATE 1.2
#define MAX_BLOCK_NAME_NUM 1024
#define MAX_BLOCK_NAME_NUM 1024 #define DISPATCH_RETRY_INTERVAL_MS 300
#define DISPATCH_RETRY_INTERVAL_MS 300 #define META_HB_CHECK_INTERVAL 200
#define MAX_CONTINUE_RETRY_COUNT 5 #define META_HB_SEND_IDLE_COUNTER 25 // send hb every 5 sec
#define STREAM_TASK_KEY_LEN ((sizeof(int64_t)) << 1)
#define META_HB_CHECK_INTERVAL 200
#define META_HB_SEND_IDLE_COUNTER 25 // send hb every 5 sec
#define STREAM_TASK_KEY_LEN ((sizeof(int64_t)) << 1)
#define STREAM_TASK_QUEUE_CAPACITY 20480 #define STREAM_TASK_QUEUE_CAPACITY 20480
#define STREAM_TASK_QUEUE_CAPACITY_IN_SIZE (30) #define STREAM_TASK_QUEUE_CAPACITY_IN_SIZE (30)
@ -118,6 +114,14 @@ typedef struct {
int32_t taskId; int32_t taskId;
} STaskTriggerSendInfo; } STaskTriggerSendInfo;
typedef struct {
int32_t nodeId;
int32_t status;
int64_t sendTs;
int64_t rspTs;
int32_t retryCount;
} SDispatchEntry;
typedef struct { typedef struct {
int64_t streamId; int64_t streamId;
int64_t recvTs; int64_t recvTs;
@ -143,6 +147,12 @@ typedef enum {
EXEC_AFTER_IDLE = 0x1, EXEC_AFTER_IDLE = 0x1,
} EExtractDataCode; } EExtractDataCode;
typedef enum ECHECKPOINT_BACKUP_TYPE {
DATA_UPLOAD_DISABLE = -1,
DATA_UPLOAD_S3 = 0,
DATA_UPLOAD_RSYNC = 1,
} ECHECKPOINT_BACKUP_TYPE;
extern void* streamTimer; extern void* streamTimer;
extern int32_t streamBackendId; extern int32_t streamBackendId;
extern int32_t streamBackendCfWrapperId; extern int32_t streamBackendCfWrapperId;
@ -153,10 +163,9 @@ void streamTimerCleanUp();
void initRpcMsg(SRpcMsg* pMsg, int32_t msgType, void* pCont, int32_t contLen); void initRpcMsg(SRpcMsg* pMsg, int32_t msgType, void* pCont, int32_t contLen);
void streamRetryDispatchData(SStreamTask* pTask, int64_t waitDuration); void streamStartMonitorDispatchData(SStreamTask* pTask, int64_t waitDuration);
int32_t streamDispatchStreamBlock(SStreamTask* pTask); int32_t streamDispatchStreamBlock(SStreamTask* pTask);
void destroyDispatchMsg(SStreamDispatchReq* pReq, int32_t numOfVgroups); void destroyDispatchMsg(SStreamDispatchReq* pReq, int32_t numOfVgroups);
int32_t getNumOfDispatchBranch(SStreamTask* pTask);
void clearBufferedDispatchMsg(SStreamTask* pTask); void clearBufferedDispatchMsg(SStreamTask* pTask);
int32_t streamProcessCheckpointTriggerBlock(SStreamTask* pTask, SStreamDataBlock* pBlock); int32_t streamProcessCheckpointTriggerBlock(SStreamTask* pTask, SStreamDataBlock* pBlock);
@ -165,7 +174,7 @@ SStreamDataBlock* createStreamBlockFromResults(SStreamQueueItem* pItem, SStreamT
SArray* pRes); SArray* pRes);
void destroyStreamDataBlock(SStreamDataBlock* pBlock); void destroyStreamDataBlock(SStreamDataBlock* pBlock);
int32_t streamRetrieveReqToData(const SStreamRetrieveReq* pReq, SStreamDataBlock* pData); int32_t streamRetrieveReqToData(const SStreamRetrieveReq* pReq, SStreamDataBlock* pData, const char* idstr);
int32_t streamBroadcastToUpTasks(SStreamTask* pTask, const SSDataBlock* pBlock); int32_t streamBroadcastToUpTasks(SStreamTask* pTask, const SSDataBlock* pBlock);
int32_t streamSendCheckMsg(SStreamTask* pTask, const SStreamTaskCheckReq* pReq, int32_t nodeId, SEpSet* pEpSet); int32_t streamSendCheckMsg(SStreamTask* pTask, const SStreamTaskCheckReq* pReq, int32_t nodeId, SEpSet* pEpSet);
@ -204,12 +213,6 @@ int32_t streamQueueGetItemSize(const SStreamQueue* pQueue);
void streamMetaRemoveDB(void* arg, char* key); void streamMetaRemoveDB(void* arg, char* key);
typedef enum ECHECKPOINT_BACKUP_TYPE {
DATA_UPLOAD_DISABLE = -1,
DATA_UPLOAD_S3 = 0,
DATA_UPLOAD_RSYNC = 1,
} ECHECKPOINT_BACKUP_TYPE;
ECHECKPOINT_BACKUP_TYPE streamGetCheckpointBackupType(); ECHECKPOINT_BACKUP_TYPE streamGetCheckpointBackupType();
int32_t streamTaskDownloadCheckpointData(const char* id, char* path); int32_t streamTaskDownloadCheckpointData(const char* id, char* path);

View File

@ -509,7 +509,7 @@ void doSendCheckMsg(SStreamTask* pTask, SDownstreamStatusInfo* p) {
STaskOutputInfo* pOutputInfo = &pTask->outputInfo; STaskOutputInfo* pOutputInfo = &pTask->outputInfo;
if (pOutputInfo->type == TASK_OUTPUT__FIXED_DISPATCH) { if (pOutputInfo->type == TASK_OUTPUT__FIXED_DISPATCH) {
STaskDispatcherFixed* pDispatch = &pOutputInfo->fixedDispatcher; STaskDispatcherFixed* pDispatch = &pOutputInfo->fixedDispatcher;
setCheckDownstreamReqInfo(&req, p->reqId, pDispatch->taskId, pDispatch->taskId); setCheckDownstreamReqInfo(&req, p->reqId, pDispatch->taskId, pDispatch->nodeId);
stDebug("s-task:%s (vgId:%d) stage:%" PRId64 " re-send check downstream task:0x%x(vgId:%d) reqId:0x%" PRIx64, stDebug("s-task:%s (vgId:%d) stage:%" PRId64 " re-send check downstream task:0x%x(vgId:%d) reqId:0x%" PRIx64,
id, pTask->info.nodeId, req.stage, req.downstreamTaskId, req.downstreamNodeId, req.reqId); id, pTask->info.nodeId, req.stage, req.downstreamTaskId, req.downstreamNodeId, req.reqId);

View File

@ -104,10 +104,12 @@ void destroyStreamDataBlock(SStreamDataBlock* pBlock) {
taosFreeQitem(pBlock); taosFreeQitem(pBlock);
} }
int32_t streamRetrieveReqToData(const SStreamRetrieveReq* pReq, SStreamDataBlock* pData) { int32_t streamRetrieveReqToData(const SStreamRetrieveReq* pReq, SStreamDataBlock* pData, const char* id) {
SArray* pArray = taosArrayInit(1, sizeof(SSDataBlock)); SArray* pArray = taosArrayInit(1, sizeof(SSDataBlock));
if (pArray == NULL) { if (pArray == NULL) {
return -1; terrno = TSDB_CODE_OUT_OF_MEMORY;
stError("failed to prepare retrieve block, %s", id);
return terrno;
} }
taosArrayPush(pArray, &(SSDataBlock){0}); taosArrayPush(pArray, &(SSDataBlock){0});
@ -126,7 +128,7 @@ int32_t streamRetrieveReqToData(const SStreamRetrieveReq* pReq, SStreamDataBlock
pData->reqId = pReq->reqId; pData->reqId = pReq->reqId;
pData->blocks = pArray; pData->blocks = pArray;
return 0; return TSDB_CODE_SUCCESS;
} }
SStreamDataSubmit* streamDataSubmitNew(SPackedData* pData, int32_t type) { SStreamDataSubmit* streamDataSubmitNew(SPackedData* pData, int32_t type) {

View File

@ -23,13 +23,16 @@ typedef struct SBlockName {
char parTbName[TSDB_TABLE_NAME_LEN]; char parTbName[TSDB_TABLE_NAME_LEN];
} SBlockName; } SBlockName;
static void doRetryDispatchData(void* param, void* tmrId); static void doMonitorDispatchData(void* param, void* tmrId);
static int32_t doSendDispatchMsg(SStreamTask* pTask, const SStreamDispatchReq* pReq, int32_t vgId, SEpSet* pEpSet); static int32_t doSendDispatchMsg(SStreamTask* pTask, const SStreamDispatchReq* pReq, int32_t vgId, SEpSet* pEpSet);
static int32_t streamAddBlockIntoDispatchMsg(const SSDataBlock* pBlock, SStreamDispatchReq* pReq); static int32_t streamAddBlockIntoDispatchMsg(const SSDataBlock* pBlock, SStreamDispatchReq* pReq);
static int32_t streamSearchAndAddBlock(SStreamTask* pTask, SStreamDispatchReq* pReqs, SSDataBlock* pDataBlock, static int32_t streamSearchAndAddBlock(SStreamTask* pTask, SStreamDispatchReq* pReqs, SSDataBlock* pDataBlock,
int32_t vgSz, int64_t groupId); int64_t groupId, int64_t now);
static int32_t tInitStreamDispatchReq(SStreamDispatchReq* pReq, const SStreamTask* pTask, int32_t vgId, static int32_t tInitStreamDispatchReq(SStreamDispatchReq* pReq, const SStreamTask* pTask, int32_t vgId,
int32_t numOfBlocks, int64_t dstTaskId, int32_t type); int32_t numOfBlocks, int64_t dstTaskId, int32_t type);
static int32_t getFailedDispatchInfo(SDispatchMsgInfo* pMsgInfo, int64_t now);
static bool isDispatchRspTimeout(SDispatchEntry* pEntry, int64_t now);
static void addDispatchEntry(SDispatchMsgInfo* pMsgInfo, int32_t nodeId, int64_t now, bool lock);
void initRpcMsg(SRpcMsg* pMsg, int32_t msgType, void* pCont, int32_t contLen) { void initRpcMsg(SRpcMsg* pMsg, int32_t msgType, void* pCont, int32_t contLen) {
pMsg->msgType = msgType; pMsg->msgType = msgType;
@ -42,7 +45,7 @@ static int32_t tInitStreamDispatchReq(SStreamDispatchReq* pReq, const SStreamTas
pReq->streamId = pTask->id.streamId; pReq->streamId = pTask->id.streamId;
pReq->srcVgId = vgId; pReq->srcVgId = vgId;
pReq->stage = pTask->pMeta->stage; pReq->stage = pTask->pMeta->stage;
pReq->msgId = pTask->execInfo.dispatch; pReq->msgId = pTask->msgInfo.msgId;
pReq->upstreamTaskId = pTask->id.taskId; pReq->upstreamTaskId = pTask->id.taskId;
pReq->upstreamChildId = pTask->info.selfChildId; pReq->upstreamChildId = pTask->info.selfChildId;
pReq->upstreamNodeId = pTask->info.nodeId; pReq->upstreamNodeId = pTask->info.nodeId;
@ -65,6 +68,7 @@ static int32_t tInitStreamDispatchReq(SStreamDispatchReq* pReq, const SStreamTas
void streamTaskSendRetrieveRsp(SStreamRetrieveReq *pReq, SRpcMsg* pRsp){ void streamTaskSendRetrieveRsp(SStreamRetrieveReq *pReq, SRpcMsg* pRsp){
void* buf = rpcMallocCont(sizeof(SMsgHead) + sizeof(SStreamRetrieveRsp)); void* buf = rpcMallocCont(sizeof(SMsgHead) + sizeof(SStreamRetrieveRsp));
((SMsgHead*)buf)->vgId = htonl(pReq->srcNodeId); ((SMsgHead*)buf)->vgId = htonl(pReq->srcNodeId);
SStreamRetrieveRsp* pCont = POINTER_SHIFT(buf, sizeof(SMsgHead)); SStreamRetrieveRsp* pCont = POINTER_SHIFT(buf, sizeof(SMsgHead));
pCont->streamId = pReq->streamId; pCont->streamId = pReq->streamId;
pCont->rspToTaskId = pReq->srcTaskId; pCont->rspToTaskId = pReq->srcTaskId;
@ -216,26 +220,66 @@ void destroyDispatchMsg(SStreamDispatchReq* pReq, int32_t numOfVgroups) {
taosMemoryFree(pReq); taosMemoryFree(pReq);
} }
int32_t getNumOfDispatchBranch(SStreamTask* pTask) {
return (pTask->outputInfo.type == TASK_OUTPUT__FIXED_DISPATCH)
? 1
: taosArrayGetSize(pTask->outputInfo.shuffleDispatcher.dbInfo.pVgroupInfos);
}
void clearBufferedDispatchMsg(SStreamTask* pTask) { void clearBufferedDispatchMsg(SStreamTask* pTask) {
SDispatchMsgInfo* pMsgInfo = &pTask->msgInfo; SDispatchMsgInfo* pMsgInfo = &pTask->msgInfo;
if (pMsgInfo->pData != NULL) { if (pMsgInfo->pData != NULL) {
destroyDispatchMsg(pMsgInfo->pData, getNumOfDispatchBranch(pTask)); destroyDispatchMsg(pMsgInfo->pData, streamTaskGetNumOfDownstream(pTask));
} }
pMsgInfo->checkpointId = -1; pMsgInfo->checkpointId = -1;
pMsgInfo->transId = -1; pMsgInfo->transId = -1;
pMsgInfo->pData = NULL; pMsgInfo->pData = NULL;
pMsgInfo->dispatchMsgType = 0; pMsgInfo->dispatchMsgType = 0;
taosThreadMutexLock(&pMsgInfo->lock);
taosArrayClear(pTask->msgInfo.pSendInfo);
taosThreadMutexUnlock(&pMsgInfo->lock);
}
static SStreamDispatchReq* createDispatchDataReq(SStreamTask* pTask, const SStreamDataBlock* pData) {
int32_t code = 0;
int32_t type = pTask->outputInfo.type;
int32_t num = streamTaskGetNumOfDownstream(pTask);
ASSERT(type == TASK_OUTPUT__SHUFFLE_DISPATCH || type == TASK_OUTPUT__FIXED_DISPATCH);
SStreamDispatchReq* pReqs = taosMemoryCalloc(num, sizeof(SStreamDispatchReq));
if (pReqs == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return NULL;
}
if (type == TASK_OUTPUT__SHUFFLE_DISPATCH) {
SArray* vgInfo = pTask->outputInfo.shuffleDispatcher.dbInfo.pVgroupInfos;
int32_t numOfVgroups = taosArrayGetSize(vgInfo);
for (int32_t i = 0; i < numOfVgroups; i++) {
SVgroupInfo* pVgInfo = taosArrayGet(vgInfo, i);
code = tInitStreamDispatchReq(&pReqs[i], pTask, pData->srcVgId, 0, pVgInfo->taskId, pData->type);
if (code != TSDB_CODE_SUCCESS) {
destroyDispatchMsg(pReqs, numOfVgroups);
terrno = code;
return NULL;
}
}
} else {
int32_t numOfBlocks = taosArrayGetSize(pData->blocks);
int32_t downstreamTaskId = pTask->outputInfo.fixedDispatcher.taskId;
code = tInitStreamDispatchReq(pReqs, pTask, pData->srcVgId, numOfBlocks, downstreamTaskId, pData->type);
if (code != TSDB_CODE_SUCCESS) {
taosMemoryFree(pReqs);
terrno = code;
return NULL;
}
}
return pReqs;
} }
static int32_t doBuildDispatchMsg(SStreamTask* pTask, const SStreamDataBlock* pData) { static int32_t doBuildDispatchMsg(SStreamTask* pTask, const SStreamDataBlock* pData) {
int32_t code = 0; int32_t code = 0;
int64_t now = taosGetTimestampMs();
int32_t numOfBlocks = taosArrayGetSize(pData->blocks); int32_t numOfBlocks = taosArrayGetSize(pData->blocks);
ASSERT(numOfBlocks != 0 && pTask->msgInfo.pData == NULL); ASSERT(numOfBlocks != 0 && pTask->msgInfo.pData == NULL);
@ -247,48 +291,29 @@ static int32_t doBuildDispatchMsg(SStreamTask* pTask, const SStreamDataBlock* pD
pTask->msgInfo.transId = p->info.window.ekey; pTask->msgInfo.transId = p->info.window.ekey;
} }
SStreamDispatchReq* pReqs = createDispatchDataReq(pTask, pData);
if (pReqs == NULL) {
stError("s-task:%s failed to create dispatch req", pTask->id.idStr);
return terrno;
}
if (pTask->outputInfo.type == TASK_OUTPUT__FIXED_DISPATCH) { if (pTask->outputInfo.type == TASK_OUTPUT__FIXED_DISPATCH) {
SStreamDispatchReq* pReq = taosMemoryCalloc(1, sizeof(SStreamDispatchReq));
int32_t downstreamTaskId = pTask->outputInfo.fixedDispatcher.taskId;
code = tInitStreamDispatchReq(pReq, pTask, pData->srcVgId, numOfBlocks, downstreamTaskId, pData->type);
if (code != TSDB_CODE_SUCCESS) {
taosMemoryFree(pReq);
return code;
}
for (int32_t i = 0; i < numOfBlocks; i++) { for (int32_t i = 0; i < numOfBlocks; i++) {
SSDataBlock* pDataBlock = taosArrayGet(pData->blocks, i); SSDataBlock* pDataBlock = taosArrayGet(pData->blocks, i);
code = streamAddBlockIntoDispatchMsg(pDataBlock, pReq); code = streamAddBlockIntoDispatchMsg(pDataBlock, pReqs);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
destroyDispatchMsg(pReq, 1); destroyDispatchMsg(pReqs, 1);
return code; return code;
} }
} }
pTask->msgInfo.pData = pReq; addDispatchEntry(&pTask->msgInfo, pTask->outputInfo.fixedDispatcher.nodeId, now, true);
pTask->msgInfo.pData = pReqs;
} else if (pTask->outputInfo.type == TASK_OUTPUT__SHUFFLE_DISPATCH) { } else if (pTask->outputInfo.type == TASK_OUTPUT__SHUFFLE_DISPATCH) {
int32_t rspCnt = atomic_load_32(&pTask->outputInfo.shuffleDispatcher.waitingRspCnt);
ASSERT(rspCnt == 0);
SArray* vgInfo = pTask->outputInfo.shuffleDispatcher.dbInfo.pVgroupInfos; SArray* vgInfo = pTask->outputInfo.shuffleDispatcher.dbInfo.pVgroupInfos;
int32_t numOfVgroups = taosArrayGetSize(vgInfo); int32_t numOfVgroups = taosArrayGetSize(vgInfo);
SStreamDispatchReq* pReqs = taosMemoryCalloc(numOfVgroups, sizeof(SStreamDispatchReq));
if (pReqs == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
for (int32_t i = 0; i < numOfVgroups; i++) {
SVgroupInfo* pVgInfo = taosArrayGet(vgInfo, i);
code = tInitStreamDispatchReq(&pReqs[i], pTask, pData->srcVgId, 0, pVgInfo->taskId, pData->type);
if (code != TSDB_CODE_SUCCESS) {
destroyDispatchMsg(pReqs, numOfVgroups);
return code;
}
}
for (int32_t i = 0; i < numOfBlocks; i++) { for (int32_t i = 0; i < numOfBlocks; i++) {
SSDataBlock* pDataBlock = taosArrayGet(pData->blocks, i); SSDataBlock* pDataBlock = taosArrayGet(pData->blocks, i);
@ -304,7 +329,8 @@ static int32_t doBuildDispatchMsg(SStreamTask* pTask, const SStreamDataBlock* pD
// it's a new vnode to receive dispatch msg, so add one // it's a new vnode to receive dispatch msg, so add one
if (pReqs[j].blockNum == 0) { if (pReqs[j].blockNum == 0) {
atomic_add_fetch_32(&pTask->outputInfo.shuffleDispatcher.waitingRspCnt, 1); SVgroupInfo* pDstVgroupInfo = taosArrayGet(vgInfo, j);
addDispatchEntry(&pTask->msgInfo, pDstVgroupInfo->vgId, now, true);
} }
pReqs[j].blockNum++; pReqs[j].blockNum++;
@ -313,7 +339,7 @@ static int32_t doBuildDispatchMsg(SStreamTask* pTask, const SStreamDataBlock* pD
continue; continue;
} }
code = streamSearchAndAddBlock(pTask, pReqs, pDataBlock, numOfVgroups, pDataBlock->info.id.groupId); code = streamSearchAndAddBlock(pTask, pReqs, pDataBlock, pDataBlock->info.id.groupId, now);
if (code != 0) { if (code != 0) {
destroyDispatchMsg(pReqs, numOfVgroups); destroyDispatchMsg(pReqs, numOfVgroups);
return code; return code;
@ -327,9 +353,9 @@ static int32_t doBuildDispatchMsg(SStreamTask* pTask, const SStreamDataBlock* pD
stDebug("s-task:%s build dispatch msg success, msgId:%d, stage:%" PRId64 " %p", pTask->id.idStr, stDebug("s-task:%s build dispatch msg success, msgId:%d, stage:%" PRId64 " %p", pTask->id.idStr,
pTask->execInfo.dispatch, pTask->pMeta->stage, pTask->msgInfo.pData); pTask->execInfo.dispatch, pTask->pMeta->stage, pTask->msgInfo.pData);
} else { } else {
int32_t numOfBranches = taosArrayGetSize(pTask->msgInfo.pSendInfo);
stDebug("s-task:%s build dispatch msg success, msgId:%d, stage:%" PRId64 " dstVgNum:%d %p", pTask->id.idStr, stDebug("s-task:%s build dispatch msg success, msgId:%d, stage:%" PRId64 " dstVgNum:%d %p", pTask->id.idStr,
pTask->execInfo.dispatch, pTask->pMeta->stage, pTask->outputInfo.shuffleDispatcher.waitingRspCnt, pTask->execInfo.dispatch, pTask->pMeta->stage, numOfBranches, pTask->msgInfo.pData);
pTask->msgInfo.pData);
} }
return code; return code;
@ -337,8 +363,8 @@ static int32_t doBuildDispatchMsg(SStreamTask* pTask, const SStreamDataBlock* pD
static int32_t sendDispatchMsg(SStreamTask* pTask, SStreamDispatchReq* pDispatchMsg) { static int32_t sendDispatchMsg(SStreamTask* pTask, SStreamDispatchReq* pDispatchMsg) {
int32_t code = 0; int32_t code = 0;
int32_t msgId = pTask->execInfo.dispatch;
const char* id = pTask->id.idStr; const char* id = pTask->id.idStr;
int32_t msgId = pTask->msgInfo.msgId;
if (pTask->outputInfo.type == TASK_OUTPUT__FIXED_DISPATCH) { if (pTask->outputInfo.type == TASK_OUTPUT__FIXED_DISPATCH) {
int32_t vgId = pTask->outputInfo.fixedDispatcher.nodeId; int32_t vgId = pTask->outputInfo.fixedDispatcher.nodeId;
@ -352,10 +378,10 @@ static int32_t sendDispatchMsg(SStreamTask* pTask, SStreamDispatchReq* pDispatch
} else { } else {
SArray* vgInfo = pTask->outputInfo.shuffleDispatcher.dbInfo.pVgroupInfos; SArray* vgInfo = pTask->outputInfo.shuffleDispatcher.dbInfo.pVgroupInfos;
int32_t numOfVgroups = taosArrayGetSize(vgInfo); int32_t numOfVgroups = taosArrayGetSize(vgInfo);
int32_t numOfBranches = taosArrayGetSize(pTask->msgInfo.pSendInfo);
int32_t actualVgroups = pTask->outputInfo.shuffleDispatcher.waitingRspCnt;
stDebug("s-task:%s (child taskId:%d) start to shuffle-dispatch blocks to %d/%d vgroup(s), msgId:%d", id, stDebug("s-task:%s (child taskId:%d) start to shuffle-dispatch blocks to %d/%d vgroup(s), msgId:%d", id,
pTask->info.selfChildId, actualVgroups, numOfVgroups, msgId); pTask->info.selfChildId, numOfBranches, numOfVgroups, msgId);
int32_t numOfSend = 0; int32_t numOfSend = 0;
for (int32_t i = 0; i < numOfVgroups; i++) { for (int32_t i = 0; i < numOfVgroups; i++) {
@ -370,7 +396,7 @@ static int32_t sendDispatchMsg(SStreamTask* pTask, SStreamDispatchReq* pDispatch
} }
// no need to try remain, all already send. // no need to try remain, all already send.
if (++numOfSend == actualVgroups) { if (++numOfSend == numOfBranches) {
break; break;
} }
} }
@ -382,102 +408,168 @@ static int32_t sendDispatchMsg(SStreamTask* pTask, SStreamDispatchReq* pDispatch
return code; return code;
} }
static void doRetryDispatchData(void* param, void* tmrId) { static void setNotInDispatchMonitor(SDispatchMsgInfo* pMsgInfo) {
SStreamTask* pTask = param; taosThreadMutexLock(&pMsgInfo->lock);
const char* id = pTask->id.idStr; pMsgInfo->inMonitor = 0;
int32_t msgId = pTask->execInfo.dispatch; taosThreadMutexUnlock(&pMsgInfo->lock);
}
static void setResendInfo(SDispatchEntry* pEntry, int64_t now) {
pEntry->sendTs = now;
pEntry->rspTs = -1;
pEntry->retryCount += 1;
}
static void addDispatchEntry(SDispatchMsgInfo* pMsgInfo, int32_t nodeId, int64_t now, bool lock) {
SDispatchEntry entry = {.nodeId = nodeId, .rspTs = -1, .status = 0, .sendTs = now};
if (lock) {
taosThreadMutexLock(&pMsgInfo->lock);
}
taosArrayPush(pMsgInfo->pSendInfo, &entry);
if (lock) {
taosThreadMutexUnlock(&pMsgInfo->lock);
}
}
static void doSendFailedDispatch(SStreamTask* pTask, SDispatchEntry* pEntry, int64_t now, const char* pMsg) {
SStreamDispatchReq* pReq = pTask->msgInfo.pData;
int32_t msgId = pTask->msgInfo.msgId;
SArray* vgInfo = pTask->outputInfo.shuffleDispatcher.dbInfo.pVgroupInfos;
int32_t numOfVgroups = taosArrayGetSize(vgInfo);
setResendInfo(pEntry, now);
for (int32_t j = 0; j < numOfVgroups; ++j) {
SVgroupInfo* pVgInfo = taosArrayGet(vgInfo, j);
if (pVgInfo->vgId == pEntry->nodeId) {
int32_t code = doSendDispatchMsg(pTask, &pReq[j], pVgInfo->vgId, &pVgInfo->epSet);
stDebug("s-task:%s (child taskId:%d) shuffle-dispatch blocks:%d to vgId:%d for %s, msgId:%d, code:%s",
pTask->id.idStr, pTask->info.selfChildId, pReq[j].blockNum, pVgInfo->vgId, pMsg, msgId, tstrerror(code));
break;
}
}
}
static void doMonitorDispatchData(void* param, void* tmrId) {
SStreamTask* pTask = param;
const char* id = pTask->id.idStr;
int32_t vgId = pTask->pMeta->vgId;
SDispatchMsgInfo* pMsgInfo = &pTask->msgInfo;
int32_t msgId = pMsgInfo->msgId;
int32_t code = 0;
int64_t now = taosGetTimestampMs();
stDebug("s-task:%s start monitor dispatch data", id);
if (streamTaskShouldStop(pTask)) { if (streamTaskShouldStop(pTask)) {
int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1); int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1);
stDebug("s-task:%s should stop, abort from timer, ref:%d", pTask->id.idStr, ref); stDebug("s-task:%s should stop, abort from timer, ref:%d", pTask->id.idStr, ref);
setNotInDispatchMonitor(pMsgInfo);
return; return;
} }
ASSERT(pTask->outputq.status == TASK_OUTPUT_STATUS__WAIT); // slave task not handle the dispatch, downstream not ready will break the monitor timer
// follower not handle the dispatch rsp
if ((pTask->pMeta->role == NODE_ROLE_FOLLOWER) || (pTask->status.downstreamReady != 1)) {
int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1);
stError("s-task:%s vgId:%d follower or downstream not ready, jump out of monitor tmr, ref:%d", id, vgId, ref);
setNotInDispatchMonitor(pMsgInfo);
return;
}
int32_t code = 0; taosThreadMutexLock(&pMsgInfo->lock);
if (pTask->outputq.status == TASK_OUTPUT_STATUS__NORMAL) {
int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1);
stDebug("s-task:%s not in dispatch procedure, abort from timer, ref:%d", pTask->id.idStr, ref);
pTask->msgInfo.inMonitor = 0;
taosThreadMutexUnlock(&pMsgInfo->lock);
return;
}
taosThreadMutexUnlock(&pMsgInfo->lock);
int32_t numOfFailed = getFailedDispatchInfo(pMsgInfo, now);
if (numOfFailed == 0) {
stDebug("s-task:%s no error occurs, check again in %dms", id, DISPATCH_RETRY_INTERVAL_MS);
streamStartMonitorDispatchData(pTask, DISPATCH_RETRY_INTERVAL_MS);
return;
}
{ {
SArray* pList = taosArrayDup(pTask->msgInfo.pRetryList, NULL);
taosArrayClear(pTask->msgInfo.pRetryList);
SStreamDispatchReq* pReq = pTask->msgInfo.pData; SStreamDispatchReq* pReq = pTask->msgInfo.pData;
if (pTask->outputInfo.type == TASK_OUTPUT__SHUFFLE_DISPATCH) { if (pTask->outputInfo.type == TASK_OUTPUT__SHUFFLE_DISPATCH) {
SArray* vgInfo = pTask->outputInfo.shuffleDispatcher.dbInfo.pVgroupInfos; stDebug("s-task:%s (child taskId:%d) retry shuffle-dispatch to down streams, msgId:%d", id,
int32_t numOfVgroups = taosArrayGetSize(vgInfo); pTask->info.selfChildId, msgId);
int32_t numOfFailed = taosArrayGetSize(pList); int32_t numOfRetry = 0;
stDebug("s-task:%s (child taskId:%d) retry shuffle-dispatch blocks to %d vgroup(s), msgId:%d", id, for (int32_t i = 0; i < taosArrayGetSize(pTask->msgInfo.pSendInfo); ++i) {
pTask->info.selfChildId, numOfFailed, msgId); SDispatchEntry* pEntry = taosArrayGet(pTask->msgInfo.pSendInfo, i);
if (pEntry->status == TSDB_CODE_SUCCESS && pEntry->rspTs > 0) {
continue;
}
for (int32_t i = 0; i < numOfFailed; i++) { // downstream not rsp yet beyond threshold that is 10s
int32_t vgId = *(int32_t*)taosArrayGet(pList, i); if (isDispatchRspTimeout(pEntry, now)) { // not respond yet beyonds 30s, re-send data
doSendFailedDispatch(pTask, pEntry, now, "timeout");
numOfRetry += 1;
continue;
}
for (int32_t j = 0; j < numOfVgroups; ++j) { // downstream inputQ is closed
SVgroupInfo* pVgInfo = taosArrayGet(vgInfo, j); if (pEntry->status == TASK_INPUT_STATUS__BLOCKED) {
if (pVgInfo->vgId == vgId) { doSendFailedDispatch(pTask, pEntry, now, "downstream inputQ blocked");
stDebug("s-task:%s (child taskId:%d) shuffle-dispatch blocks:%d to vgId:%d", pTask->id.idStr, numOfRetry += 1;
pTask->info.selfChildId, pReq[j].blockNum, pVgInfo->vgId); continue;
}
code = doSendDispatchMsg(pTask, &pReq[j], pVgInfo->vgId, &pVgInfo->epSet); // handle other errors
if (code < 0) { if (pEntry->status != TSDB_CODE_SUCCESS) {
break; doSendFailedDispatch(pTask, pEntry, now, "downstream error");
} numOfRetry += 1;
}
} }
} }
stDebug("s-task:%s complete retry shuffle-dispatch blocks to all %d vnodes, msgId:%d", pTask->id.idStr, stDebug("s-task:%s complete retry shuffle-dispatch blocks to all %d vnodes, msgId:%d", pTask->id.idStr,
numOfFailed, msgId); numOfRetry, msgId);
} else { } else {
int32_t vgId = pTask->outputInfo.fixedDispatcher.nodeId; int32_t dstVgId = pTask->outputInfo.fixedDispatcher.nodeId;
SEpSet* pEpSet = &pTask->outputInfo.fixedDispatcher.epSet; SEpSet* pEpSet = &pTask->outputInfo.fixedDispatcher.epSet;
int32_t downstreamTaskId = pTask->outputInfo.fixedDispatcher.taskId; int32_t downstreamTaskId = pTask->outputInfo.fixedDispatcher.taskId;
stDebug("s-task:%s (child taskId:%d) fix-dispatch %d block(s) to s-task:0x%x (vgId:%d), msgId:%d", id, ASSERT(taosArrayGetSize(pTask->msgInfo.pSendInfo) == 1);
pTask->info.selfChildId, 1, downstreamTaskId, vgId, msgId); SDispatchEntry* pEntry = taosArrayGet(pTask->msgInfo.pSendInfo, 0);
code = doSendDispatchMsg(pTask, pReq, vgId, pEpSet); setResendInfo(pEntry, now);
code = doSendDispatchMsg(pTask, pReq, dstVgId, pEpSet);
stDebug("s-task:%s (child taskId:%d) fix-dispatch %d block(s) to s-task:0x%x (vgId:%d), msgId:%d, code:%s", id,
pTask->info.selfChildId, 1, downstreamTaskId, dstVgId, msgId, tstrerror(code));
} }
taosArrayDestroy(pList);
} }
if (code != TSDB_CODE_SUCCESS) { if (streamTaskShouldStop(pTask)) {
if (!streamTaskShouldStop(pTask)) {
// stDebug("s-task:%s reset the waitRspCnt to be 0 before launch retry dispatch", pTask->id.idStr);
// atomic_store_32(&pTask->outputInfo.shuffleDispatcher.waitingRspCnt, 0);
if (streamTaskShouldPause(pTask)) {
streamRetryDispatchData(pTask, DISPATCH_RETRY_INTERVAL_MS * 10);
} else {
streamRetryDispatchData(pTask, DISPATCH_RETRY_INTERVAL_MS);
}
} else {
int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1);
stDebug("s-task:%s should stop, abort from timer, ref:%d", pTask->id.idStr, ref);
}
} else {
int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1); int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1);
stDebug("s-task:%s send success, jump out of timer, ref:%d", pTask->id.idStr, ref); stDebug("s-task:%s should stop, abort from timer, ref:%d", pTask->id.idStr, ref);
} setNotInDispatchMonitor(pMsgInfo);
}
void streamRetryDispatchData(SStreamTask* pTask, int64_t waitDuration) {
pTask->msgInfo.retryCount++;
stTrace("s-task:%s retry send dispatch data in %" PRId64 "ms, in timer msgId:%d, retryTimes:%d", pTask->id.idStr,
waitDuration, pTask->execInfo.dispatch, pTask->msgInfo.retryCount);
if (pTask->msgInfo.pRetryTmr != NULL) {
taosTmrReset(doRetryDispatchData, waitDuration, pTask, streamTimer, &pTask->msgInfo.pRetryTmr);
} else { } else {
pTask->msgInfo.pRetryTmr = taosTmrStart(doRetryDispatchData, waitDuration, pTask, streamTimer); streamStartMonitorDispatchData(pTask, DISPATCH_RETRY_INTERVAL_MS);
} }
} }
int32_t streamSearchAndAddBlock(SStreamTask* pTask, SStreamDispatchReq* pReqs, SSDataBlock* pDataBlock, int32_t vgSz, void streamStartMonitorDispatchData(SStreamTask* pTask, int64_t waitDuration) {
int64_t groupId) { if (pTask->msgInfo.pRetryTmr != NULL) {
taosTmrReset(doMonitorDispatchData, waitDuration, pTask, streamTimer, &pTask->msgInfo.pRetryTmr);
} else {
pTask->msgInfo.pRetryTmr = taosTmrStart(doMonitorDispatchData, waitDuration, pTask, streamTimer);
}
}
int32_t streamSearchAndAddBlock(SStreamTask* pTask, SStreamDispatchReq* pReqs, SSDataBlock* pDataBlock,
int64_t groupId, int64_t now) {
uint32_t hashValue = 0; uint32_t hashValue = 0;
SArray* vgInfo = pTask->outputInfo.shuffleDispatcher.dbInfo.pVgroupInfos; SArray* vgInfo = pTask->outputInfo.shuffleDispatcher.dbInfo.pVgroupInfos;
if (pTask->pNameMap == NULL) { if (pTask->pNameMap == NULL) {
@ -495,23 +587,24 @@ int32_t streamSearchAndAddBlock(SStreamTask* pTask, SStreamDispatchReq* pReqs, S
} else { } else {
char ctbName[TSDB_TABLE_FNAME_LEN] = {0}; char ctbName[TSDB_TABLE_FNAME_LEN] = {0};
if (pDataBlock->info.parTbName[0]) { if (pDataBlock->info.parTbName[0]) {
if(pTask->subtableWithoutMd5 != 1 && if (pTask->subtableWithoutMd5 != 1 && !isAutoTableName(pDataBlock->info.parTbName) &&
!isAutoTableName(pDataBlock->info.parTbName) && !alreadyAddGroupId(pDataBlock->info.parTbName, groupId) && groupId != 0) {
!alreadyAddGroupId(pDataBlock->info.parTbName, groupId) && if (pTask->ver == SSTREAM_TASK_SUBTABLE_CHANGED_VER) {
groupId != 0){
if(pTask->ver == SSTREAM_TASK_SUBTABLE_CHANGED_VER){
buildCtbNameAddGroupId(NULL, pDataBlock->info.parTbName, groupId); buildCtbNameAddGroupId(NULL, pDataBlock->info.parTbName, groupId);
}else if(pTask->ver > SSTREAM_TASK_SUBTABLE_CHANGED_VER) { } else if (pTask->ver > SSTREAM_TASK_SUBTABLE_CHANGED_VER) {
buildCtbNameAddGroupId(pTask->outputInfo.shuffleDispatcher.stbFullName, pDataBlock->info.parTbName, groupId); buildCtbNameAddGroupId(pTask->outputInfo.shuffleDispatcher.stbFullName, pDataBlock->info.parTbName, groupId);
} }
} }
} else { } else {
buildCtbNameByGroupIdImpl(pTask->outputInfo.shuffleDispatcher.stbFullName, groupId, pDataBlock->info.parTbName); buildCtbNameByGroupIdImpl(pTask->outputInfo.shuffleDispatcher.stbFullName, groupId, pDataBlock->info.parTbName);
} }
snprintf(ctbName, TSDB_TABLE_NAME_LEN, "%s.%s", pTask->outputInfo.shuffleDispatcher.dbInfo.db, pDataBlock->info.parTbName);
snprintf(ctbName, TSDB_TABLE_NAME_LEN, "%s.%s", pTask->outputInfo.shuffleDispatcher.dbInfo.db,
pDataBlock->info.parTbName);
/*uint32_t hashValue = MurmurHash3_32(ctbName, strlen(ctbName));*/ /*uint32_t hashValue = MurmurHash3_32(ctbName, strlen(ctbName));*/
SUseDbRsp* pDbInfo = &pTask->outputInfo.shuffleDispatcher.dbInfo; SUseDbRsp* pDbInfo = &pTask->outputInfo.shuffleDispatcher.dbInfo;
hashValue = taosGetTbHashVal(ctbName, strlen(ctbName), pDbInfo->hashMethod, pDbInfo->hashPrefix, pDbInfo->hashSuffix); hashValue =
taosGetTbHashVal(ctbName, strlen(ctbName), pDbInfo->hashMethod, pDbInfo->hashPrefix, pDbInfo->hashSuffix);
SBlockName bln = {0}; SBlockName bln = {0};
bln.hashValue = hashValue; bln.hashValue = hashValue;
memcpy(bln.parTbName, pDataBlock->info.parTbName, strlen(pDataBlock->info.parTbName)); memcpy(bln.parTbName, pDataBlock->info.parTbName, strlen(pDataBlock->info.parTbName));
@ -520,20 +613,24 @@ int32_t streamSearchAndAddBlock(SStreamTask* pTask, SStreamDispatchReq* pReqs, S
} }
} }
bool found = false; bool found = false;
int32_t numOfVgroups = taosArrayGetSize(vgInfo);
// TODO: optimize search // TODO: optimize search
int32_t j; taosThreadMutexLock(&pTask->msgInfo.lock);
for (j = 0; j < vgSz; j++) {
for (int32_t j = 0; j < numOfVgroups; j++) {
SVgroupInfo* pVgInfo = taosArrayGet(vgInfo, j); SVgroupInfo* pVgInfo = taosArrayGet(vgInfo, j);
ASSERT(pVgInfo->vgId > 0);
if (hashValue >= pVgInfo->hashBegin && hashValue <= pVgInfo->hashEnd) { if (hashValue >= pVgInfo->hashBegin && hashValue <= pVgInfo->hashEnd) {
if (streamAddBlockIntoDispatchMsg(pDataBlock, &pReqs[j]) < 0) { if (streamAddBlockIntoDispatchMsg(pDataBlock, &pReqs[j]) < 0) {
taosThreadMutexUnlock(&pTask->msgInfo.lock);
return -1; return -1;
} }
if (pReqs[j].blockNum == 0) { if (pReqs[j].blockNum == 0) {
atomic_add_fetch_32(&pTask->outputInfo.shuffleDispatcher.waitingRspCnt, 1); SVgroupInfo* pDstVgroupInfo = taosArrayGet(vgInfo, j);
addDispatchEntry(&pTask->msgInfo, pDstVgroupInfo->vgId, now, false);
} }
pReqs[j].blockNum++; pReqs[j].blockNum++;
@ -541,10 +638,28 @@ int32_t streamSearchAndAddBlock(SStreamTask* pTask, SStreamDispatchReq* pReqs, S
break; break;
} }
} }
taosThreadMutexUnlock(&pTask->msgInfo.lock);
ASSERT(found); ASSERT(found);
return 0; return 0;
} }
static void initDispatchInfo(SDispatchMsgInfo* pInfo, int32_t msgId) {
pInfo->startTs = taosGetTimestampMs();
pInfo->rspTs = -1;
pInfo->msgId = msgId;
}
static void clearDispatchInfo(SDispatchMsgInfo* pInfo) {
pInfo->startTs = -1;
pInfo->msgId = -1;
pInfo->rspTs = -1;
}
static void updateDispatchInfo(SDispatchMsgInfo* pInfo, int64_t recvTs) {
pInfo->rspTs = recvTs;
}
int32_t streamDispatchStreamBlock(SStreamTask* pTask) { int32_t streamDispatchStreamBlock(SStreamTask* pTask) {
ASSERT((pTask->outputInfo.type == TASK_OUTPUT__FIXED_DISPATCH || ASSERT((pTask->outputInfo.type == TASK_OUTPUT__FIXED_DISPATCH ||
pTask->outputInfo.type == TASK_OUTPUT__SHUFFLE_DISPATCH)); pTask->outputInfo.type == TASK_OUTPUT__SHUFFLE_DISPATCH));
@ -587,7 +702,7 @@ int32_t streamDispatchStreamBlock(SStreamTask* pTask) {
type == STREAM_INPUT__TRANS_STATE); type == STREAM_INPUT__TRANS_STATE);
pTask->execInfo.dispatch += 1; pTask->execInfo.dispatch += 1;
pTask->msgInfo.startTs = taosGetTimestampMs(); initDispatchInfo(&pTask->msgInfo, pTask->execInfo.dispatch);
int32_t code = doBuildDispatchMsg(pTask, pBlock); int32_t code = doBuildDispatchMsg(pTask, pBlock);
if (code == 0) { if (code == 0) {
@ -599,34 +714,21 @@ int32_t streamDispatchStreamBlock(SStreamTask* pTask) {
streamTaskInitTriggerDispatchInfo(pTask); streamTaskInitTriggerDispatchInfo(pTask);
} }
int32_t retryCount = 0; code = sendDispatchMsg(pTask, pTask->msgInfo.pData);
while (1) {
code = sendDispatchMsg(pTask, pTask->msgInfo.pData);
if (code == TSDB_CODE_SUCCESS) {
break;
}
stDebug("s-task:%s failed to dispatch msg:%d to downstream, code:%s, output status:%d, retry cnt:%d", id, taosThreadMutexLock(&pTask->msgInfo.lock);
pTask->execInfo.dispatch, tstrerror(terrno), pTask->outputq.status, retryCount); if (pTask->msgInfo.inMonitor == 0) {
int32_t ref = atomic_add_fetch_32(&pTask->status.timerActive, 1);
// todo deal with only partially success dispatch case stDebug("s-task:%s start dispatch monitor tmr in %dms, ref:%d, dispatch code:%s", id, DISPATCH_RETRY_INTERVAL_MS, ref,
atomic_store_32(&pTask->outputInfo.shuffleDispatcher.waitingRspCnt, 0); tstrerror(code));
if (terrno == TSDB_CODE_APP_IS_STOPPING) { // in case of this error, do not retry anymore streamStartMonitorDispatchData(pTask, DISPATCH_RETRY_INTERVAL_MS);
clearBufferedDispatchMsg(pTask); pTask->msgInfo.inMonitor = 1;
return code; } else {
} stDebug("s-task:%s already in dispatch monitor tmr", id);
if (++retryCount > MAX_CONTINUE_RETRY_COUNT) { // add to timer to retry
int32_t ref = atomic_add_fetch_32(&pTask->status.timerActive, 1);
stDebug(
"s-task:%s failed to dispatch msg to downstream for %d times, code:%s, add timer to retry in %dms, ref:%d",
pTask->id.idStr, retryCount, tstrerror(terrno), DISPATCH_RETRY_INTERVAL_MS, ref);
streamRetryDispatchData(pTask, DISPATCH_RETRY_INTERVAL_MS);
break;
}
} }
taosThreadMutexUnlock(&pTask->msgInfo.lock);
// this block can not be deleted until it has been sent to downstream task successfully. // this block can not be deleted until it has been sent to downstream task successfully.
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
@ -817,8 +919,10 @@ int32_t streamAddBlockIntoDispatchMsg(const SSDataBlock* pBlock, SStreamDispatch
int32_t dataStrLen = sizeof(SRetrieveTableRsp) + blockGetEncodeSize(pBlock) + PAYLOAD_PREFIX_LEN; int32_t dataStrLen = sizeof(SRetrieveTableRsp) + blockGetEncodeSize(pBlock) + PAYLOAD_PREFIX_LEN;
ASSERT(dataStrLen > 0); ASSERT(dataStrLen > 0);
void* buf = taosMemoryCalloc(1, dataStrLen); void* buf = taosMemoryCalloc(1, dataStrLen);
if (buf == NULL) return -1; if (buf == NULL) {
return -1;
}
SRetrieveTableRsp* pRetrieve = (SRetrieveTableRsp*)buf; SRetrieveTableRsp* pRetrieve = (SRetrieveTableRsp*)buf;
pRetrieve->useconds = 0; pRetrieve->useconds = 0;
@ -1031,23 +1135,6 @@ static int32_t handleDispatchSuccessRsp(SStreamTask* pTask, int32_t downstreamId
stDebug("s-task:%s destroy dispatch msg:%p", pTask->id.idStr, pTask->msgInfo.pData); stDebug("s-task:%s destroy dispatch msg:%p", pTask->id.idStr, pTask->msgInfo.pData);
bool delayDispatch = (pTask->msgInfo.dispatchMsgType == STREAM_INPUT__CHECKPOINT_TRIGGER); bool delayDispatch = (pTask->msgInfo.dispatchMsgType == STREAM_INPUT__CHECKPOINT_TRIGGER);
if (delayDispatch) {
taosThreadMutexLock(&pTask->lock);
// we only set the dispatch msg info for current checkpoint trans
if (streamTaskGetStatus(pTask)->state == TASK_STATUS__CK &&
pTask->chkInfo.pActiveInfo->activeId == pTask->msgInfo.checkpointId) {
ASSERT(pTask->chkInfo.pActiveInfo->transId == pTask->msgInfo.transId);
stDebug("s-task:%s checkpoint-trigger msg to 0x%x rsp for checkpointId:%" PRId64 " transId:%d confirmed",
pTask->id.idStr, downstreamId, pTask->msgInfo.checkpointId, pTask->msgInfo.transId);
streamTaskSetTriggerDispatchConfirmed(pTask, downstreamNodeId);
} else {
stWarn("s-task:%s checkpoint-trigger msg rsp for checkpointId:%" PRId64 " transId:%d discard, since expired",
pTask->id.idStr, pTask->msgInfo.checkpointId, pTask->msgInfo.transId);
}
taosThreadMutexUnlock(&pTask->lock);
}
clearBufferedDispatchMsg(pTask); clearBufferedDispatchMsg(pTask);
int64_t el = taosGetTimestampMs() - pTask->msgInfo.startTs; int64_t el = taosGetTimestampMs() - pTask->msgInfo.startTs;
@ -1074,17 +1161,59 @@ static int32_t handleDispatchSuccessRsp(SStreamTask* pTask, int32_t downstreamId
return 0; return 0;
} }
int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp, int32_t code) { static int32_t setDispatchRspInfo(SDispatchMsgInfo* pMsgInfo, int32_t vgId, int32_t code, int64_t now, const char* id) {
const char* id = pTask->id.idStr; int32_t numOfRsp = 0;
int32_t vgId = pTask->pMeta->vgId; bool alreadySet = false;
int32_t msgId = pTask->execInfo.dispatch; bool updated = false;
#if 0 taosThreadMutexLock(&pMsgInfo->lock);
// for test purpose, build the failure case for(int32_t j = 0; j < taosArrayGetSize(pMsgInfo->pSendInfo); ++j) {
if (pTask->msgInfo.dispatchMsgType == STREAM_INPUT__CHECKPOINT_TRIGGER) { SDispatchEntry* pEntry = taosArrayGet(pMsgInfo->pSendInfo, j);
pRsp->inputStatus = TASK_INPUT_STATUS__REFUSED; if (pEntry->nodeId == vgId) {
ASSERT(!alreadySet);
pEntry->rspTs = now;
pEntry->status = code;
alreadySet = true;
updated = true;
stDebug("s-task:%s record the rsp recv, ts:%"PRId64" code:%d, idx:%d", id, now, code, j);
}
if (pEntry->rspTs != -1) {
numOfRsp += 1;
}
} }
#endif
taosThreadMutexUnlock(&pMsgInfo->lock);
ASSERT(updated);
return numOfRsp;
}
bool isDispatchRspTimeout(SDispatchEntry* pEntry, int64_t now) {
return (pEntry->rspTs == -1) && (now - pEntry->sendTs) > 30 * 1000;
}
int32_t getFailedDispatchInfo(SDispatchMsgInfo* pMsgInfo, int64_t now) {
int32_t numOfFailed = 0;
taosThreadMutexLock(&pMsgInfo->lock);
for (int32_t j = 0; j < taosArrayGetSize(pMsgInfo->pSendInfo); ++j) {
SDispatchEntry* pEntry = taosArrayGet(pMsgInfo->pSendInfo, j);
if (pEntry->status != TSDB_CODE_SUCCESS || isDispatchRspTimeout(pEntry, now)) {
numOfFailed += 1;
}
}
taosThreadMutexUnlock(&pMsgInfo->lock);
return numOfFailed;
}
int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp, int32_t code) {
const char* id = pTask->id.idStr;
int32_t vgId = pTask->pMeta->vgId;
SDispatchMsgInfo* pMsgInfo = &pTask->msgInfo;
int32_t msgId = pMsgInfo->msgId;
int64_t now = taosGetTimestampMs();
int32_t totalRsp = 0;
// follower not handle the dispatch rsp // follower not handle the dispatch rsp
if ((pTask->pMeta->role == NODE_ROLE_FOLLOWER) || (pTask->status.downstreamReady != 1)) { if ((pTask->pMeta->role == NODE_ROLE_FOLLOWER) || (pTask->status.downstreamReady != 1)) {
@ -1109,53 +1238,61 @@ int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp, i
if (code == TSDB_CODE_STREAM_TASK_NOT_EXIST) { // destination task does not exist, not retry anymore if (code == TSDB_CODE_STREAM_TASK_NOT_EXIST) { // destination task does not exist, not retry anymore
stError("s-task:%s failed to dispatch msg to task:0x%x(vgId:%d), msgId:%d no retry, since task destroyed already", stError("s-task:%s failed to dispatch msg to task:0x%x(vgId:%d), msgId:%d no retry, since task destroyed already",
id, pRsp->downstreamTaskId, pRsp->downstreamNodeId, msgId); id, pRsp->downstreamTaskId, pRsp->downstreamNodeId, msgId);
totalRsp = setDispatchRspInfo(pMsgInfo, pRsp->downstreamNodeId, TSDB_CODE_SUCCESS, now, id);
} else { } else {
stError("s-task:%s failed to dispatch msgId:%d to task:0x%x(vgId:%d), code:%s, add to retry list", id, msgId, stError("s-task:%s failed to dispatch msgId:%d to task:0x%x(vgId:%d), code:%s, add to retry list", id, msgId,
pRsp->downstreamTaskId, pRsp->downstreamNodeId, tstrerror(code)); pRsp->downstreamTaskId, pRsp->downstreamNodeId, tstrerror(code));
taosThreadMutexLock(&pTask->lock); totalRsp = setDispatchRspInfo(pMsgInfo, pRsp->downstreamNodeId, code, now, id);
taosArrayPush(pTask->msgInfo.pRetryList, &pRsp->downstreamNodeId);
taosThreadMutexUnlock(&pTask->lock);
} }
} else { // code == 0 } else { // code == 0
if (pRsp->inputStatus == TASK_INPUT_STATUS__BLOCKED) { if (pRsp->inputStatus == TASK_INPUT_STATUS__BLOCKED) {
pTask->inputq.status = TASK_INPUT_STATUS__BLOCKED; pTask->inputq.status = TASK_INPUT_STATUS__BLOCKED;
// block the input of current task, to push pressure to upstream // block the input of current task, to push pressure to upstream
taosThreadMutexLock(&pTask->lock); totalRsp = setDispatchRspInfo(pMsgInfo, pRsp->downstreamNodeId, pRsp->inputStatus, now, id);
taosArrayPush(pTask->msgInfo.pRetryList, &pRsp->downstreamNodeId); stTrace("s-task:%s inputQ of downstream task:0x%x(vgId:%d) is full, wait for retry dispatch", id,
taosThreadMutexUnlock(&pTask->lock); pRsp->downstreamTaskId, pRsp->downstreamNodeId);
} else {
if (pRsp->inputStatus == TASK_INPUT_STATUS__REFUSED) {
// todo handle the role-changed during checkpoint generation, add test case
stError(
"s-task:%s downstream task:0x%x(vgId:%d) refused the dispatch msg, downstream may become follower or "
"restart already, treat it as success",
id, pRsp->downstreamTaskId, pRsp->downstreamNodeId);
}
stTrace("s-task:%s inputQ of downstream task:0x%x(vgId:%d) is full, wait for %dms and retry dispatch", id, totalRsp = setDispatchRspInfo(pMsgInfo, pRsp->downstreamNodeId, TSDB_CODE_SUCCESS, now, id);
pRsp->downstreamTaskId, pRsp->downstreamNodeId, DISPATCH_RETRY_INTERVAL_MS);
} else if (pRsp->inputStatus == TASK_INPUT_STATUS__REFUSED) { {
// todo handle the agg task failure, add test case bool delayDispatch = (pMsgInfo->dispatchMsgType == STREAM_INPUT__CHECKPOINT_TRIGGER);
if (pTask->msgInfo.dispatchMsgType == STREAM_INPUT__CHECKPOINT_TRIGGER && if (delayDispatch) {
pTask->info.taskLevel == TASK_LEVEL__SOURCE) { taosThreadMutexLock(&pTask->lock);
stError("s-task:%s failed to dispatch checkpoint-trigger msg, checkpointId:%" PRId64 // we only set the dispatch msg info for current checkpoint trans
", set the current checkpoint failed, and send rsp to mnode", if (streamTaskGetStatus(pTask)->state == TASK_STATUS__CK &&
id, pTask->chkInfo.pActiveInfo->activeId); pTask->chkInfo.pActiveInfo->activeId == pMsgInfo->checkpointId) {
{ // send checkpoint failure msg to mnode directly ASSERT(pTask->chkInfo.pActiveInfo->transId == pMsgInfo->transId);
pTask->chkInfo.pActiveInfo->failedId = pTask->chkInfo.pActiveInfo->activeId; // record the latest failed checkpoint id stDebug("s-task:%s checkpoint-trigger msg to 0x%x rsp for checkpointId:%" PRId64 " transId:%d confirmed",
pTask->chkInfo.pActiveInfo->activeId = pTask->chkInfo.pActiveInfo->activeId; pTask->id.idStr, pRsp->downstreamTaskId, pMsgInfo->checkpointId, pMsgInfo->transId);
streamTaskSendCheckpointSourceRsp(pTask);
streamTaskSetTriggerDispatchConfirmed(pTask, pRsp->downstreamNodeId);
} else {
stWarn("s-task:%s checkpoint-trigger msg rsp for checkpointId:%" PRId64
" transId:%d discard, since expired",
pTask->id.idStr, pMsgInfo->checkpointId, pMsgInfo->transId);
}
taosThreadMutexUnlock(&pTask->lock);
} }
} else {
stError("s-task:%s downstream task:0x%x(vgId:%d) refused the dispatch msg, treat it as success", id,
pRsp->downstreamTaskId, pRsp->downstreamNodeId);
} }
} }
} }
int32_t leftRsp = 0; int32_t notRsp = taosArrayGetSize(pMsgInfo->pSendInfo) - totalRsp;
if (pTask->outputInfo.type == TASK_OUTPUT__SHUFFLE_DISPATCH) { if (pTask->outputInfo.type == TASK_OUTPUT__SHUFFLE_DISPATCH) {
leftRsp = atomic_sub_fetch_32(&pTask->outputInfo.shuffleDispatcher.waitingRspCnt, 1); if (notRsp > 0) {
ASSERT(leftRsp >= 0);
if (leftRsp > 0) {
stDebug( stDebug(
"s-task:%s recv dispatch rsp, msgId:%d from 0x%x(vgId:%d), downstream task input status:%d code:%s, waiting " "s-task:%s recv dispatch rsp, msgId:%d from 0x%x(vgId:%d), downstream task input status:%d code:%s, waiting "
"for %d rsp", "for %d rsp",
id, msgId, pRsp->downstreamTaskId, pRsp->downstreamNodeId, pRsp->inputStatus, tstrerror(code), leftRsp); id, msgId, pRsp->downstreamTaskId, pRsp->downstreamNodeId, pRsp->inputStatus, tstrerror(code), notRsp);
} else { } else {
stDebug( stDebug(
"s-task:%s recv dispatch rsp, msgId:%d from 0x%x(vgId:%d), downstream task input status:%d code:%s, all rsp", "s-task:%s recv dispatch rsp, msgId:%d from 0x%x(vgId:%d), downstream task input status:%d code:%s, all rsp",
@ -1166,31 +1303,17 @@ int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp, i
msgId, pRsp->downstreamTaskId, pRsp->downstreamNodeId, pRsp->inputStatus, tstrerror(code)); msgId, pRsp->downstreamTaskId, pRsp->downstreamNodeId, pRsp->inputStatus, tstrerror(code));
} }
ASSERT(leftRsp >= 0);
// all msg rsp already, continue // all msg rsp already, continue
if (leftRsp == 0) { if (notRsp == 0) {
ASSERT(pTask->outputq.status == TASK_OUTPUT_STATUS__WAIT); ASSERT(pTask->outputq.status == TASK_OUTPUT_STATUS__WAIT);
// we need to re-try send dispatch msg to downstream tasks // we need to re-try send dispatch msg to downstream tasks
int32_t numOfFailed = taosArrayGetSize(pTask->msgInfo.pRetryList); int32_t numOfFailed = getFailedDispatchInfo(pMsgInfo, now);
if (numOfFailed > 0) { if (numOfFailed == 0) { // this message has been sent successfully, let's try next one.
if (pTask->outputInfo.type == TASK_OUTPUT__SHUFFLE_DISPATCH) {
atomic_store_32(&pTask->outputInfo.shuffleDispatcher.waitingRspCnt, numOfFailed);
stDebug("s-task:%s waiting rsp set to be %d", id, pTask->outputInfo.shuffleDispatcher.waitingRspCnt);
}
int32_t ref = atomic_add_fetch_32(&pTask->status.timerActive, 1);
stDebug("s-task:%s failed to dispatch msg to downstream, add into timer to retry in %dms, ref:%d",
pTask->id.idStr, DISPATCH_RETRY_INTERVAL_MS, ref);
streamRetryDispatchData(pTask, DISPATCH_RETRY_INTERVAL_MS);
} else { // this message has been sent successfully, let's try next one.
pTask->msgInfo.retryCount = 0;
// trans-state msg has been sent to downstream successfully. let's transfer the fill-history task state // trans-state msg has been sent to downstream successfully. let's transfer the fill-history task state
if (pTask->msgInfo.dispatchMsgType == STREAM_INPUT__TRANS_STATE) { if (pMsgInfo->dispatchMsgType == STREAM_INPUT__TRANS_STATE) {
stDebug("s-task:%s dispatch trans-state msgId:%d to downstream successfully, start to prepare transfer state", id, msgId); stDebug("s-task:%s dispatch trans-state msgId:%d to downstream successfully, start to prepare transfer state",
id, msgId);
ASSERT(pTask->info.fillHistory == 1); ASSERT(pTask->info.fillHistory == 1);
code = streamTransferStatePrepare(pTask); code = streamTransferStatePrepare(pTask);
@ -1309,7 +1432,5 @@ int32_t streamProcessDispatchMsg(SStreamTask* pTask, SStreamDispatchReq* pReq, S
} }
streamTrySchedExec(pTask); streamTrySchedExec(pTask);
return 0; return 0;
} }

View File

@ -541,14 +541,14 @@ int32_t streamProcessTransstateBlock(SStreamTask* pTask, SStreamDataBlock* pBloc
//static void streamTaskSetIdleInfo(SStreamTask* pTask, int32_t idleTime) { pTask->status.schedIdleTime = idleTime; } //static void streamTaskSetIdleInfo(SStreamTask* pTask, int32_t idleTime) { pTask->status.schedIdleTime = idleTime; }
static void setLastExecTs(SStreamTask* pTask, int64_t ts) { pTask->status.lastExecTs = ts; } static void setLastExecTs(SStreamTask* pTask, int64_t ts) { pTask->status.lastExecTs = ts; }
static void doStreamTaskExecImpl(SStreamTask* pTask, SStreamQueueItem* pBlock) { static void doStreamTaskExecImpl(SStreamTask* pTask, SStreamQueueItem* pBlock, int32_t num) {
const char* id = pTask->id.idStr; const char* id = pTask->id.idStr;
int32_t blockSize = 0; int32_t blockSize = 0;
int64_t st = taosGetTimestampMs(); int64_t st = taosGetTimestampMs();
SCheckpointInfo* pInfo = &pTask->chkInfo; SCheckpointInfo* pInfo = &pTask->chkInfo;
int64_t ver = pInfo->processedVer; int64_t ver = pInfo->processedVer;
stDebug("s-task:%s start to process batch of blocks, num:%d, type:%s", id, 1, "checkpoint-trigger"); stDebug("s-task:%s start to process batch blocks, num:%d, type:%s", id, num, streamQueueItemGetTypeStr(pBlock->type));
doSetStreamInputBlock(pTask, pBlock, &ver, id); doSetStreamInputBlock(pTask, pBlock, &ver, id);
@ -607,7 +607,7 @@ void flushStateDataInExecutor(SStreamTask* pTask, SStreamQueueItem* pCheckpointB
} }
// 2. flush data in executor to K/V store, which should be completed before do checkpoint in the K/V. // 2. flush data in executor to K/V store, which should be completed before do checkpoint in the K/V.
doStreamTaskExecImpl(pTask, pCheckpointBlock); doStreamTaskExecImpl(pTask, pCheckpointBlock, 1);
} }
/** /**
@ -643,7 +643,7 @@ static int32_t doStreamExecTask(SStreamTask* pTask) {
} }
if (taosGetTimestampMs() - pTask->status.lastExecTs < MIN_INVOKE_INTERVAL) { if (taosGetTimestampMs() - pTask->status.lastExecTs < MIN_INVOKE_INTERVAL) {
stDebug("s-task:%s invoke with high frequency, idle and retry exec in 50ms", id); stDebug("s-task:%s invoke exec too fast, idle and retry in 50ms", id);
streamTaskSetIdleInfo(pTask, MIN_INVOKE_INTERVAL); streamTaskSetIdleInfo(pTask, MIN_INVOKE_INTERVAL);
return 0; return 0;
} }
@ -698,7 +698,7 @@ static int32_t doStreamExecTask(SStreamTask* pTask) {
} }
if (type != STREAM_INPUT__CHECKPOINT) { if (type != STREAM_INPUT__CHECKPOINT) {
doStreamTaskExecImpl(pTask, pInput); doStreamTaskExecImpl(pTask, pInput, numOfBlocks);
streamFreeQitem(pInput); streamFreeQitem(pInput);
} else { // todo other thread may change the status } else { // todo other thread may change the status
// do nothing after sync executor state to storage backend, untill the vnode-level checkpoint is completed. // do nothing after sync executor state to storage backend, untill the vnode-level checkpoint is completed.

View File

@ -299,8 +299,8 @@ void streamMetaRemoveDB(void* arg, char* key) {
taosThreadMutexUnlock(&pMeta->backendMutex); taosThreadMutexUnlock(&pMeta->backendMutex);
} }
SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandFunc, int32_t vgId, int64_t stage, SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskBuild buildTaskFn, FTaskExpand expandTaskFn,
startComplete_fn_t fn) { int32_t vgId, int64_t stage, startComplete_fn_t fn) {
SStreamMeta* pMeta = taosMemoryCalloc(1, sizeof(SStreamMeta)); SStreamMeta* pMeta = taosMemoryCalloc(1, sizeof(SStreamMeta));
if (pMeta == NULL) { if (pMeta == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
@ -369,7 +369,8 @@ SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandF
pMeta->scanInfo.scanCounter = 0; pMeta->scanInfo.scanCounter = 0;
pMeta->vgId = vgId; pMeta->vgId = vgId;
pMeta->ahandle = ahandle; pMeta->ahandle = ahandle;
pMeta->expandFunc = expandFunc; pMeta->buildTaskFn = buildTaskFn;
pMeta->expandTaskFn = expandTaskFn;
pMeta->stage = stage; pMeta->stage = stage;
pMeta->role = (vgId == SNODE_HANDLE) ? NODE_ROLE_LEADER : NODE_ROLE_UNINIT; pMeta->role = (vgId == SNODE_HANDLE) ? NODE_ROLE_LEADER : NODE_ROLE_UNINIT;
pMeta->updateInfo.transId = -1; pMeta->updateInfo.transId = -1;
@ -602,7 +603,7 @@ int32_t streamMetaRegisterTask(SStreamMeta* pMeta, int64_t ver, SStreamTask* pTa
return 0; return 0;
} }
if (pMeta->expandFunc(pMeta->ahandle, pTask, ver) < 0) { if (pMeta->buildTaskFn(pMeta->ahandle, pTask, ver) < 0) {
return -1; return -1;
} }
@ -901,7 +902,7 @@ void streamMetaLoadAllTasks(SStreamMeta* pMeta) {
STaskId id = {.streamId = pTask->id.streamId, .taskId = pTask->id.taskId}; STaskId id = {.streamId = pTask->id.streamId, .taskId = pTask->id.taskId};
void* p = taosHashGet(pMeta->pTasksMap, &id, sizeof(id)); void* p = taosHashGet(pMeta->pTasksMap, &id, sizeof(id));
if (p == NULL) { if (p == NULL) {
code = pMeta->expandFunc(pMeta->ahandle, pTask, pTask->chkInfo.checkpointVer + 1); code = pMeta->buildTaskFn(pMeta->ahandle, pTask, pTask->chkInfo.checkpointVer + 1);
if (code < 0) { if (code < 0) {
stError("failed to expand s-task:0x%"PRIx64", code:%s, continue", id.taskId, tstrerror(terrno)); stError("failed to expand s-task:0x%"PRIx64", code:%s, continue", id.taskId, tstrerror(terrno));
tFreeStreamTask(pTask); tFreeStreamTask(pTask);
@ -1522,6 +1523,7 @@ bool streamMetaAllTasksReady(const SStreamMeta* pMeta) {
} }
int32_t streamMetaStartOneTask(SStreamMeta* pMeta, int64_t streamId, int32_t taskId, __stream_task_expand_fn expandFn) { int32_t streamMetaStartOneTask(SStreamMeta* pMeta, int64_t streamId, int32_t taskId, __stream_task_expand_fn expandFn) {
int32_t code = 0;
int32_t vgId = pMeta->vgId; int32_t vgId = pMeta->vgId;
stInfo("vgId:%d start task:0x%x by checking it's downstream status", vgId, taskId); stInfo("vgId:%d start task:0x%x by checking it's downstream status", vgId, taskId);
@ -1541,40 +1543,22 @@ int32_t streamMetaStartOneTask(SStreamMeta* pMeta, int64_t streamId, int32_t tas
ASSERT(pTask->status.downstreamReady == 0); ASSERT(pTask->status.downstreamReady == 0);
if (pTask->pBackend == NULL) { if (pTask->pBackend == NULL) {
int32_t code = expandFn(pTask); code = expandFn(pTask);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
streamMetaAddFailedTaskSelf(pTask, pInfo->readyTs); streamMetaAddFailedTaskSelf(pTask, pInfo->readyTs);
streamMetaReleaseTask(pMeta, pTask);
return code;
}
if (HAS_RELATED_FILLHISTORY_TASK(pTask)) {
SStreamTask* pHTask = streamMetaAcquireTask(pMeta, pTask->hTaskInfo.id.streamId, pTask->hTaskInfo.id.taskId);
if (pHTask != NULL) {
if (pHTask->pBackend == NULL) {
code = expandFn(pHTask);
if (code != TSDB_CODE_SUCCESS) {
streamMetaAddFailedTaskSelf(pHTask, pInfo->readyTs);
streamMetaReleaseTask(pMeta, pHTask);
streamMetaReleaseTask(pMeta, pTask);
return code;
}
}
streamMetaReleaseTask(pMeta, pHTask);
}
} }
} }
int32_t ret = streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_INIT); if (code == TSDB_CODE_SUCCESS) {
if (ret != TSDB_CODE_SUCCESS) { code = streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_INIT);
stError("s-task:%s vgId:%d failed to handle event:%d", pTask->id.idStr, pMeta->vgId, TASK_EVENT_INIT); if (code != TSDB_CODE_SUCCESS) {
streamMetaAddFailedTaskSelf(pTask, pInfo->readyTs); stError("s-task:%s vgId:%d failed to handle event:%d", pTask->id.idStr, pMeta->vgId, TASK_EVENT_INIT);
streamMetaAddFailedTaskSelf(pTask, pInfo->readyTs);
}
} }
streamMetaReleaseTask(pMeta, pTask); streamMetaReleaseTask(pMeta, pTask);
return ret; return code;
} }
static void displayStatusInfo(SStreamMeta* pMeta, SHashObj* pTaskSet, bool succ) { static void displayStatusInfo(SStreamMeta* pMeta, SHashObj* pTaskSet, bool succ) {

View File

@ -197,6 +197,9 @@ int32_t streamLaunchFillHistoryTask(SStreamTask* pTask) {
const char* idStr = pTask->id.idStr; const char* idStr = pTask->id.idStr;
int64_t hStreamId = pTask->hTaskInfo.id.streamId; int64_t hStreamId = pTask->hTaskInfo.id.streamId;
int32_t hTaskId = pTask->hTaskInfo.id.taskId; int32_t hTaskId = pTask->hTaskInfo.id.taskId;
int64_t now = taosGetTimestampMs();
int32_t code = 0;
ASSERT(hTaskId != 0); ASSERT(hTaskId != 0);
// check stream task status in the first place. // check stream task status in the first place.
@ -226,7 +229,18 @@ int32_t streamLaunchFillHistoryTask(SStreamTask* pTask) {
stDebug("s-task:%s fill-history task is ready, no need to check downstream", pHisTask->id.idStr); stDebug("s-task:%s fill-history task is ready, no need to check downstream", pHisTask->id.idStr);
streamMetaAddTaskLaunchResult(pMeta, hStreamId, hTaskId, pExecInfo->checkTs, pExecInfo->readyTs, true); streamMetaAddTaskLaunchResult(pMeta, hStreamId, hTaskId, pExecInfo->checkTs, pExecInfo->readyTs, true);
} else { // exist, but not ready, continue check downstream task status } else { // exist, but not ready, continue check downstream task status
checkFillhistoryTaskStatus(pTask, pHisTask); if (pHisTask->pBackend == NULL) {
code = pMeta->expandTaskFn(pHisTask);
if (code != TSDB_CODE_SUCCESS) {
streamMetaAddFailedTaskSelf(pHisTask, now);
stError("s-task:%s failed to expand fill-history task, code:%s", pHisTask->id.idStr, tstrerror(code));
}
}
if (code == TSDB_CODE_SUCCESS) {
checkFillhistoryTaskStatus(pTask, pHisTask);
}
} }
streamMetaReleaseTask(pMeta, pHisTask); streamMetaReleaseTask(pMeta, pHisTask);
@ -306,6 +320,7 @@ void tryLaunchHistoryTask(void* param, void* tmrId) {
SLaunchHTaskInfo* pInfo = param; SLaunchHTaskInfo* pInfo = param;
SStreamMeta* pMeta = pInfo->pMeta; SStreamMeta* pMeta = pInfo->pMeta;
int64_t now = taosGetTimestampMs(); int64_t now = taosGetTimestampMs();
int32_t code = 0;
streamMetaWLock(pMeta); streamMetaWLock(pMeta);
@ -362,13 +377,22 @@ void tryLaunchHistoryTask(void* param, void* tmrId) {
streamMetaReleaseTask(pMeta, pTask); streamMetaReleaseTask(pMeta, pTask);
return; return;
} else { } else {
checkFillhistoryTaskStatus(pTask, pHTask); if (pHTask->pBackend == NULL) {
streamMetaReleaseTask(pMeta, pHTask); code = pMeta->expandTaskFn(pHTask);
if (code != TSDB_CODE_SUCCESS) {
streamMetaAddFailedTaskSelf(pHTask, now);
stError("failed to expand fill-history task:%s, code:%s", pHTask->id.idStr, tstrerror(code));
}
}
// not in timer anymore if (code == TSDB_CODE_SUCCESS) {
int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1); checkFillhistoryTaskStatus(pTask, pHTask);
stDebug("s-task:0x%x fill-history task launch completed, retry times:%d, ref:%d", (int32_t)pInfo->id.taskId, // not in timer anymore
pHTaskInfo->retryTimes, ref); int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1);
stDebug("s-task:0x%x fill-history task launch completed, retry times:%d, ref:%d", (int32_t)pInfo->id.taskId,
pHTaskInfo->retryTimes, ref);
}
streamMetaReleaseTask(pMeta, pHTask);
} }
} }

View File

@ -283,10 +283,12 @@ void tFreeStreamTask(SStreamTask* pTask) {
pTask->status.pSM = streamDestroyStateMachine(pTask->status.pSM); pTask->status.pSM = streamDestroyStateMachine(pTask->status.pSM);
streamTaskDestroyUpstreamInfo(&pTask->upstreamInfo); streamTaskDestroyUpstreamInfo(&pTask->upstreamInfo);
pTask->msgInfo.pRetryList = taosArrayDestroy(pTask->msgInfo.pRetryList);
taosMemoryFree(pTask->outputInfo.pTokenBucket); taosMemoryFree(pTask->outputInfo.pTokenBucket);
taosThreadMutexDestroy(&pTask->lock); taosThreadMutexDestroy(&pTask->lock);
pTask->msgInfo.pSendInfo = taosArrayDestroy(pTask->msgInfo.pSendInfo);
taosThreadMutexDestroy(&pTask->msgInfo.lock);
pTask->outputInfo.pNodeEpsetUpdateList = taosArrayDestroy(pTask->outputInfo.pNodeEpsetUpdateList); pTask->outputInfo.pNodeEpsetUpdateList = taosArrayDestroy(pTask->outputInfo.pNodeEpsetUpdateList);
if ((pTask->status.removeBackendFiles) && (pTask->pMeta != NULL)) { if ((pTask->status.removeBackendFiles) && (pTask->pMeta != NULL)) {
@ -373,7 +375,13 @@ int32_t streamTaskInit(SStreamTask* pTask, SStreamMeta* pMeta, SMsgCb* pMsgCb, i
pTask->pMeta = pMeta; pTask->pMeta = pMeta;
pTask->pMsgCb = pMsgCb; pTask->pMsgCb = pMsgCb;
pTask->msgInfo.pRetryList = taosArrayInit(4, sizeof(int32_t)); pTask->msgInfo.pSendInfo = taosArrayInit(4, sizeof(SDispatchEntry));
if (pTask->msgInfo.pSendInfo == NULL) {
stError("s-task:%s failed to create sendInfo struct for stream task, code:Out of memory", pTask->id.idStr);
return terrno;
}
taosThreadMutexInit(&pTask->msgInfo.lock, NULL);
TdThreadMutexAttr attr = {0}; TdThreadMutexAttr attr = {0};
@ -936,32 +944,36 @@ char* createStreamTaskIdStr(int64_t streamId, int32_t taskId) {
static int32_t streamTaskEnqueueRetrieve(SStreamTask* pTask, SStreamRetrieveReq* pReq) { static int32_t streamTaskEnqueueRetrieve(SStreamTask* pTask, SStreamRetrieveReq* pReq) {
SStreamDataBlock* pData = taosAllocateQitem(sizeof(SStreamDataBlock), DEF_QITEM, sizeof(SStreamDataBlock)); SStreamDataBlock* pData = taosAllocateQitem(sizeof(SStreamDataBlock), DEF_QITEM, sizeof(SStreamDataBlock));
int8_t status = TASK_INPUT_STATUS__NORMAL; if (pData == NULL) {
stError("s-task:%s failed to allocated retrieve-block", pTask->id.idStr);
// enqueue return terrno;
if (pData != NULL) {
stDebug("s-task:%s (child %d) recv retrieve req from task:0x%x(vgId:%d), reqId:0x%" PRIx64, pTask->id.idStr,
pTask->info.selfChildId, pReq->srcTaskId, pReq->srcNodeId, pReq->reqId);
pData->type = STREAM_INPUT__DATA_RETRIEVE;
pData->srcVgId = 0;
streamRetrieveReqToData(pReq, pData);
if (streamTaskPutDataIntoInputQ(pTask, (SStreamQueueItem*)pData) == 0) {
status = TASK_INPUT_STATUS__NORMAL;
} else {
status = TASK_INPUT_STATUS__FAILED;
}
} else { // todo handle oom
/*streamTaskInputFail(pTask);*/
/*status = TASK_INPUT_STATUS__FAILED;*/
} }
return status == TASK_INPUT_STATUS__NORMAL ? 0 : -1; // enqueue
stDebug("s-task:%s (vgId:%d level:%d) recv retrieve req from task:0x%x(vgId:%d), reqId:0x%" PRIx64, pTask->id.idStr,
pTask->pMeta->vgId, pTask->info.taskLevel, pReq->srcTaskId, pReq->srcNodeId, pReq->reqId);
pData->type = STREAM_INPUT__DATA_RETRIEVE;
pData->srcVgId = 0;
int32_t code = streamRetrieveReqToData(pReq, pData, pTask->id.idStr);
if (code != TSDB_CODE_SUCCESS) {
taosFreeQitem(pData);
return code;
}
code = streamTaskPutDataIntoInputQ(pTask, (SStreamQueueItem*)pData);
if (code != TSDB_CODE_SUCCESS) {
stError("s-task:%s failed to put retrieve-block into inputQ, inputQ is full, discard the retrieve msg",
pTask->id.idStr);
}
return code;
} }
int32_t streamProcessRetrieveReq(SStreamTask* pTask, SStreamRetrieveReq* pReq) { int32_t streamProcessRetrieveReq(SStreamTask* pTask, SStreamRetrieveReq* pReq) {
int32_t code = streamTaskEnqueueRetrieve(pTask, pReq); int32_t code = streamTaskEnqueueRetrieve(pTask, pReq);
if(code != 0){ if (code != 0) {
return code; return code;
} }
return streamTrySchedExec(pTask); return streamTrySchedExec(pTask);

View File

@ -43,7 +43,7 @@ SStreamState *stateCreate(const char *path) {
pTask->ver = 1024; pTask->ver = 1024;
pTask->id.streamId = 1023; pTask->id.streamId = 1023;
pTask->id.taskId = 1111111; pTask->id.taskId = 1111111;
SStreamMeta *pMeta = streamMetaOpen((path), NULL, NULL, 0, 0, NULL); SStreamMeta *pMeta = streamMetaOpen((path), NULL, NULL, NULL, 0, 0, NULL);
pTask->pMeta = pMeta; pTask->pMeta = pMeta;
SStreamState *p = streamStateOpen((char *)path, pTask, true, 32, 32 * 1024); SStreamState *p = streamStateOpen((char *)path, pTask, true, 32, 32 * 1024);

View File

@ -24,7 +24,7 @@
#define UNIT_ONE_EXBIBYTE (UNIT_ONE_PEBIBYTE * UNIT_SIZE_CONVERT_FACTOR) #define UNIT_ONE_EXBIBYTE (UNIT_ONE_PEBIBYTE * UNIT_SIZE_CONVERT_FACTOR)
static int32_t parseCfgIntWithUnit(const char* str, double *res) { static int32_t parseCfgIntWithUnit(const char* str, double *res) {
double val, temp = INT64_MAX; double val, temp = (double)INT64_MAX;
char* endPtr; char* endPtr;
errno = 0; errno = 0;
val = taosStr2Int64(str, &endPtr, 0); val = taosStr2Int64(str, &endPtr, 0);

View File

@ -121,7 +121,7 @@ echo "tmrDebugFlag 131" >> $TAOS_CFG
echo "uDebugFlag 131" >> $TAOS_CFG echo "uDebugFlag 131" >> $TAOS_CFG
echo "rpcDebugFlag 135" >> $TAOS_CFG echo "rpcDebugFlag 135" >> $TAOS_CFG
echo "jniDebugFlag 131" >> $TAOS_CFG echo "jniDebugFlag 131" >> $TAOS_CFG
echo "qDebugFlag 131" >> $TAOS_CFG echo "qDebugFlag 135" >> $TAOS_CFG
echo "cDebugFlag 135" >> $TAOS_CFG echo "cDebugFlag 135" >> $TAOS_CFG
echo "dDebugFlag 131" >> $TAOS_CFG echo "dDebugFlag 131" >> $TAOS_CFG
echo "vDebugFlag 131" >> $TAOS_CFG echo "vDebugFlag 131" >> $TAOS_CFG
@ -136,7 +136,7 @@ echo "idxDebugFlag 135" >> $TAOS_CFG
echo "udfDebugFlag 135" >> $TAOS_CFG echo "udfDebugFlag 135" >> $TAOS_CFG
echo "smaDebugFlag 135" >> $TAOS_CFG echo "smaDebugFlag 135" >> $TAOS_CFG
echo "metaDebugFlag 135" >> $TAOS_CFG echo "metaDebugFlag 135" >> $TAOS_CFG
echo "stDebugFlag 135" >> $TAOS_CFG echo "stDebugFlag 143" >> $TAOS_CFG
echo "numOfLogLines 20000000" >> $TAOS_CFG echo "numOfLogLines 20000000" >> $TAOS_CFG
echo "asyncLog 0" >> $TAOS_CFG echo "asyncLog 0" >> $TAOS_CFG
echo "locale en_US.UTF-8" >> $TAOS_CFG echo "locale en_US.UTF-8" >> $TAOS_CFG

View File

@ -57,7 +57,7 @@ loop1:
sleep 1000 sleep 1000
$loop_count = $loop_count + 1 $loop_count = $loop_count + 1
if $loop_count == 10 then if $loop_count == 100 then
return -1 return -1
endi endi