Merge pull request #25458 from taosdata/fix/3_liaohj
fix(stream): add task update trans conflict level.
This commit is contained in:
commit
3b08ee3d16
|
@ -35,6 +35,7 @@ extern "C" {
|
|||
#define CACHESCAN_RETRIEVE_TYPE_SINGLE 0x2
|
||||
#define CACHESCAN_RETRIEVE_LAST_ROW 0x4
|
||||
#define CACHESCAN_RETRIEVE_LAST 0x8
|
||||
#define CACHESCAN_RETRIEVE_PK 0x10
|
||||
|
||||
#define META_READER_LOCK 0x0
|
||||
#define META_READER_NOLOCK 0x1
|
||||
|
|
|
@ -9624,6 +9624,8 @@ void tDestroySubmitTbData(SSubmitTbData *pTbData, int32_t flag) {
|
|||
taosArrayDestroy(pTbData->aRowP);
|
||||
}
|
||||
}
|
||||
|
||||
pTbData->aRowP = NULL;
|
||||
}
|
||||
|
||||
void tDestroySubmitReq(SSubmitReq2 *pReq, int32_t flag) {
|
||||
|
|
|
@ -98,7 +98,8 @@ bool mndStreamTransConflictCheck(SMnode* pMnode, int64_t streamId, const char* p
|
|||
mDebug("not conflict with checkpoint trans, name:%s, continue create trans", pTransName);
|
||||
}
|
||||
} else if ((strcmp(tInfo.name, MND_STREAM_CREATE_NAME) == 0) || (strcmp(tInfo.name, MND_STREAM_DROP_NAME) == 0) ||
|
||||
(strcmp(tInfo.name, MND_STREAM_TASK_RESET_NAME) == 0)) {
|
||||
(strcmp(tInfo.name, MND_STREAM_TASK_RESET_NAME) == 0) ||
|
||||
strcmp(tInfo.name, MND_STREAM_TASK_UPDATE_NAME) == 0) {
|
||||
mWarn("conflict with other transId:%d streamUid:0x%" PRIx64 ", trans:%s", tInfo.transId, tInfo.streamId,
|
||||
tInfo.name);
|
||||
terrno = TSDB_CODE_MND_TRANS_CONFLICT;
|
||||
|
|
|
@ -583,7 +583,7 @@ int32_t doConvertRows(SSubmitTbData* pTableData, const STSchema* pTSchema, SSDat
|
|||
|
||||
if (IS_SET_NULL(pCol)) {
|
||||
if (pCol->flags & COL_IS_KEY) {
|
||||
qError("ts:%" PRId64 " Primary key column should not be null, colId:%" PRIi16 ", colType:%" PRIi8, ts,
|
||||
qError("ts:%" PRId64 " primary key column should not be null, colId:%" PRIi16 ", colType:%" PRIi8, ts,
|
||||
pCol->colId, pCol->type);
|
||||
break;
|
||||
}
|
||||
|
@ -593,7 +593,7 @@ int32_t doConvertRows(SSubmitTbData* pTableData, const STSchema* pTSchema, SSDat
|
|||
SColumnInfoData* pColData = taosArrayGet(pDataBlock->pDataBlock, dataIndex);
|
||||
if (colDataIsNull_s(pColData, j)) {
|
||||
if (pCol->flags & COL_IS_KEY) {
|
||||
qError("ts:%" PRId64 "Primary key column should not be null, colId:%" PRIi16 ", colType:%" PRIi8,
|
||||
qError("ts:%" PRId64 " primary key column should not be null, colId:%" PRIi16 ", colType:%" PRIi8,
|
||||
ts, pCol->colId, pCol->type);
|
||||
break;
|
||||
}
|
||||
|
@ -624,8 +624,8 @@ int32_t doConvertRows(SSubmitTbData* pTableData, const STSchema* pTSchema, SSDat
|
|||
code = tRowBuild(pVals, (STSchema*)pTSchema, &pRow);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
tDestroySubmitTbData(pTableData, TSDB_MSG_FLG_ENCODE);
|
||||
pTableData->aRowP = taosArrayDestroy(pTableData->aRowP);
|
||||
taosArrayDestroy(pVals);
|
||||
tqError("s-task:%s build rows for submit failed, ts:%"PRId64, id, ts);
|
||||
return code;
|
||||
}
|
||||
|
||||
|
|
|
@ -195,12 +195,15 @@ int32_t tqStreamTaskProcessUpdateReq(SStreamMeta* pMeta, SMsgCb* cb, SRpcMsg* pM
|
|||
const char* idstr = pTask->id.idStr;
|
||||
|
||||
if (pMeta->updateInfo.transId != req.transId) {
|
||||
pMeta->updateInfo.transId = req.transId;
|
||||
tqInfo("s-task:%s receive new trans to update nodeEp msg from mnode, transId:%d", idstr, req.transId);
|
||||
ASSERT(req.transId > pMeta->updateInfo.transId);
|
||||
tqInfo("s-task:%s vgId:%d receive new trans to update nodeEp msg from mnode, transId:%d, prev transId:%d", idstr,
|
||||
vgId, req.transId, pMeta->updateInfo.transId);
|
||||
|
||||
// info needs to be kept till the new trans to update the nodeEp arrived.
|
||||
taosHashClear(pMeta->updateInfo.pTasks);
|
||||
pMeta->updateInfo.transId = req.transId;
|
||||
} else {
|
||||
tqDebug("s-task:%s recv trans to update nodeEp from mnode, transId:%d", idstr, req.transId);
|
||||
tqDebug("s-task:%s vgId:%d recv trans to update nodeEp from mnode, transId:%d", idstr, vgId, req.transId);
|
||||
}
|
||||
|
||||
// duplicate update epset msg received, discard this redundant message
|
||||
|
|
|
@ -130,7 +130,7 @@ static void tsdbClosePgCache(STsdb *pTsdb) {
|
|||
enum {
|
||||
LFLAG_LAST_ROW = 0,
|
||||
LFLAG_LAST = 1,
|
||||
LFLAG_PRIMARY_KEY = (1 << 4),
|
||||
LFLAG_PRIMARY_KEY = CACHESCAN_RETRIEVE_PK,
|
||||
};
|
||||
|
||||
typedef struct {
|
||||
|
|
|
@ -386,7 +386,11 @@ int32_t tsdbRetrieveCacheRows(void* pReader, SSDataBlock* pResBlock, const int32
|
|||
goto _end;
|
||||
}
|
||||
|
||||
int8_t ltype = (pr->type & CACHESCAN_RETRIEVE_LAST) >> 3;
|
||||
int8_t ltype = (pr->type & CACHESCAN_RETRIEVE_LAST) >> 3;
|
||||
if (pr->rowKey.numOfPKs > 0) {
|
||||
ltype |= CACHESCAN_RETRIEVE_PK;
|
||||
}
|
||||
|
||||
STableKeyInfo* pTableList = pr->pTableList;
|
||||
|
||||
// retrieve the only one last row of all tables in the uid list.
|
||||
|
|
|
@ -160,6 +160,9 @@ SOperatorInfo* createCacherowsScanOperator(SLastRowScanPhysiNode* pScanNode, SRe
|
|||
// partition by tbname
|
||||
if (oneTableForEachGroup(pTableListInfo) || (totalTables == 1)) {
|
||||
pInfo->retrieveType = CACHESCAN_RETRIEVE_TYPE_ALL | SCAN_ROW_TYPE(pScanNode->ignoreNull);
|
||||
if (pInfo->numOfPks > 0) {
|
||||
pInfo->retrieveType |= CACHESCAN_RETRIEVE_PK;
|
||||
}
|
||||
|
||||
STableKeyInfo* pList = tableListGetInfo(pTableListInfo, 0);
|
||||
|
||||
|
|
|
@ -190,6 +190,7 @@ int32_t streamProcessCheckpointBlock(SStreamTask* pTask, SStreamDataBlock* pBloc
|
|||
code = streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_GEN_CHECKPOINT);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
stError("s-task:%s handle checkpoint-trigger block failed, code:%s", id, tstrerror(code));
|
||||
streamFreeQitem((SStreamQueueItem*)pBlock);
|
||||
return code;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -21,6 +21,7 @@
|
|||
#define STREAM_RESULT_DUMP_SIZE_THRESHOLD (1048576 * 1) // 1MiB result data
|
||||
#define STREAM_SCAN_HISTORY_TIMESLICE 1000 // 1000 ms
|
||||
#define MIN_INVOKE_INTERVAL 50 // 50ms
|
||||
#define FILL_HISTORY_TASK_EXEC_INTERVAL 5000 // 5 sec
|
||||
|
||||
static int32_t streamTransferStateDoPrepare(SStreamTask* pTask);
|
||||
|
||||
|
@ -46,6 +47,7 @@ static int32_t doOutputResultBlockImpl(SStreamTask* pTask, SStreamDataBlock* pBl
|
|||
ASSERT(type == TASK_OUTPUT__FIXED_DISPATCH || type == TASK_OUTPUT__SHUFFLE_DISPATCH);
|
||||
code = streamTaskPutDataIntoOutputQ(pTask, pBlock);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
destroyStreamDataBlock(pBlock);
|
||||
return code;
|
||||
}
|
||||
|
||||
|
@ -76,7 +78,6 @@ static int32_t doDumpResult(SStreamTask* pTask, SStreamQueueItem* pItem, SArray*
|
|||
|
||||
int32_t code = doOutputResultBlockImpl(pTask, pStreamBlocks);
|
||||
if (code != TSDB_CODE_SUCCESS) { // back pressure and record position
|
||||
destroyStreamDataBlock(pStreamBlocks);
|
||||
return code;
|
||||
}
|
||||
|
||||
|
@ -244,6 +245,10 @@ static void streamScanHistoryDataImpl(SStreamTask* pTask, SArray* pRes, int32_t*
|
|||
}
|
||||
}
|
||||
|
||||
static SScanhistoryDataInfo buildScanhistoryExecRet(EScanHistoryCode code, int32_t idleTime) {
|
||||
return (SScanhistoryDataInfo){code, idleTime};
|
||||
}
|
||||
|
||||
SScanhistoryDataInfo streamScanHistoryData(SStreamTask* pTask, int64_t st) {
|
||||
ASSERT(pTask->info.taskLevel == TASK_LEVEL__SOURCE);
|
||||
|
||||
|
@ -260,7 +265,7 @@ SScanhistoryDataInfo streamScanHistoryData(SStreamTask* pTask, int64_t st) {
|
|||
if (streamTaskShouldPause(pTask)) {
|
||||
stDebug("s-task:%s paused from the scan-history task", id);
|
||||
// quit from step1, not continue to handle the step2
|
||||
return (SScanhistoryDataInfo){TASK_SCANHISTORY_QUIT, 0};
|
||||
return buildScanhistoryExecRet(TASK_SCANHISTORY_QUIT, 0);
|
||||
}
|
||||
|
||||
SArray* pRes = taosArrayInit(0, sizeof(SSDataBlock));
|
||||
|
@ -275,7 +280,7 @@ SScanhistoryDataInfo streamScanHistoryData(SStreamTask* pTask, int64_t st) {
|
|||
|
||||
if(streamTaskShouldStop(pTask)) {
|
||||
taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes);
|
||||
return (SScanhistoryDataInfo){TASK_SCANHISTORY_QUIT, 0};
|
||||
return buildScanhistoryExecRet(TASK_SCANHISTORY_QUIT, 0);
|
||||
}
|
||||
|
||||
// dispatch the generated results
|
||||
|
@ -285,38 +290,21 @@ SScanhistoryDataInfo streamScanHistoryData(SStreamTask* pTask, int64_t st) {
|
|||
|
||||
// downstream task input queue is full, try in 5sec
|
||||
if (pTask->inputq.status == TASK_INPUT_STATUS__BLOCKED && (pTask->info.fillHistory == 1)) {
|
||||
return (SScanhistoryDataInfo){TASK_SCANHISTORY_REXEC, 5000};
|
||||
return buildScanhistoryExecRet(TASK_SCANHISTORY_REXEC, FILL_HISTORY_TASK_EXEC_INTERVAL);
|
||||
}
|
||||
|
||||
if (finished) {
|
||||
return (SScanhistoryDataInfo){TASK_SCANHISTORY_CONT, 0};
|
||||
return buildScanhistoryExecRet(TASK_SCANHISTORY_CONT, 0);
|
||||
}
|
||||
|
||||
if (el >= STREAM_SCAN_HISTORY_TIMESLICE && (pTask->info.fillHistory == 1)) {
|
||||
stDebug("s-task:%s fill-history:%d time slice exhausted, elapsed time:%.2fs, retry in 100ms", id,
|
||||
pTask->info.fillHistory, el / 1000.0);
|
||||
return (SScanhistoryDataInfo){TASK_SCANHISTORY_REXEC, 100};
|
||||
return buildScanhistoryExecRet(TASK_SCANHISTORY_REXEC, 100);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// wait for the stream task to be idle
|
||||
static void waitForTaskIdle(SStreamTask* pTask, SStreamTask* pStreamTask) {
|
||||
const char* id = pTask->id.idStr;
|
||||
|
||||
int64_t st = taosGetTimestampMs();
|
||||
while (!streamTaskIsIdle(pStreamTask)) {
|
||||
stDebug("s-task:%s level:%d wait for stream task:%s to be idle, check again in 100ms", id, pTask->info.taskLevel,
|
||||
pStreamTask->id.idStr);
|
||||
taosMsleep(100);
|
||||
}
|
||||
|
||||
double el = (taosGetTimestampMs() - st) / 1000.0;
|
||||
if (el > 0) {
|
||||
stDebug("s-task:%s wait for stream task:%s for %.2fs to be idle", id, pStreamTask->id.idStr, el);
|
||||
}
|
||||
}
|
||||
|
||||
int32_t streamTransferStateDoPrepare(SStreamTask* pTask) {
|
||||
SStreamMeta* pMeta = pTask->pMeta;
|
||||
const char* id = pTask->id.idStr;
|
||||
|
|
Loading…
Reference in New Issue