Merge pull request #22818 from taosdata/fix/3_liaohj
fix(stream): fix error in fill history and check quit status of vnode.
This commit is contained in:
commit
372747cc23
|
@ -1,5 +1,5 @@
|
|||
cmake_minimum_required(VERSION 3.0)
|
||||
set(CMAKE_VERBOSE_MAKEFILE ON)
|
||||
set(CMAKE_VERBOSE_MAKEFILE FALSE)
|
||||
set(TD_BUILD_TAOSA_INTERNAL FALSE)
|
||||
|
||||
#set output directory
|
||||
|
|
|
@ -260,7 +260,7 @@ typedef struct SStreamTaskId {
|
|||
typedef struct SCheckpointInfo {
|
||||
int64_t checkpointId;
|
||||
int64_t checkpointVer; // latest checkpointId version
|
||||
int64_t currentVer; // current offset in WAL, not serialize it
|
||||
int64_t nextProcessVer; // current offset in WAL, not serialize it
|
||||
} SCheckpointInfo;
|
||||
|
||||
typedef struct SStreamStatus {
|
||||
|
@ -312,12 +312,27 @@ typedef struct STaskSchedInfo {
|
|||
void* pTimer;
|
||||
} STaskSchedInfo;
|
||||
|
||||
typedef struct SSinkTaskRecorder {
|
||||
int64_t numOfSubmit;
|
||||
int64_t numOfBlocks;
|
||||
int64_t numOfRows;
|
||||
} SSinkTaskRecorder;
|
||||
|
||||
typedef struct {
|
||||
int64_t created;
|
||||
int64_t init;
|
||||
int64_t step1Start;
|
||||
int64_t step2Start;
|
||||
int64_t sinkStart;
|
||||
} STaskTimestamp;
|
||||
|
||||
typedef struct STokenBucket {
|
||||
int32_t capacity; // total capacity
|
||||
int64_t fillTimestamp;// fill timestamp
|
||||
int32_t numOfToken; // total available tokens
|
||||
int32_t rate; // number of token per second
|
||||
} STokenBucket;
|
||||
|
||||
struct SStreamTask {
|
||||
int64_t ver;
|
||||
SStreamTaskId id;
|
||||
|
@ -345,6 +360,8 @@ struct SStreamTask {
|
|||
STaskSinkSma smaSink;
|
||||
STaskSinkFetch fetchSink;
|
||||
};
|
||||
SSinkTaskRecorder sinkRecorder;
|
||||
STokenBucket tokenBucket;
|
||||
|
||||
void* launchTaskTimer;
|
||||
SMsgCb* pMsgCb; // msg handle
|
||||
|
@ -683,7 +700,6 @@ int32_t streamSourceScanHistoryData(SStreamTask* pTask);
|
|||
int32_t streamDispatchScanHistoryFinishMsg(SStreamTask* pTask);
|
||||
|
||||
// agg level
|
||||
int32_t streamTaskScanHistoryPrepare(SStreamTask* pTask);
|
||||
int32_t streamProcessScanHistoryFinishReq(SStreamTask* pTask, SStreamScanHistoryFinishReq* pReq,
|
||||
SRpcHandleInfo* pRpcInfo);
|
||||
int32_t streamProcessScanHistoryFinishRsp(SStreamTask* pTask);
|
||||
|
|
|
@ -192,7 +192,7 @@ int32_t walApplyVer(SWal *, int64_t ver);
|
|||
// int32_t walDataCorrupted(SWal*);
|
||||
|
||||
// wal reader
|
||||
SWalReader *walOpenReader(SWal *, SWalFilterCond *pCond);
|
||||
SWalReader *walOpenReader(SWal *, SWalFilterCond *pCond, int64_t id);
|
||||
void walCloseReader(SWalReader *pRead);
|
||||
void walReadReset(SWalReader *pReader);
|
||||
int32_t walReadVer(SWalReader *pRead, int64_t ver);
|
||||
|
|
|
@ -86,18 +86,18 @@ int32_t sndExpandTask(SSnode *pSnode, SStreamTask *pTask, int64_t ver) {
|
|||
SCheckpointInfo* pChkInfo = &pTask->chkInfo;
|
||||
// checkpoint ver is the kept version, handled data should be the next version.
|
||||
if (pTask->chkInfo.checkpointId != 0) {
|
||||
pTask->chkInfo.currentVer = pTask->chkInfo.checkpointVer + 1;
|
||||
qInfo("s-task:%s restore from the checkpointId:%" PRId64 " ver:%" PRId64 " currentVer:%" PRId64, pTask->id.idStr,
|
||||
pChkInfo->checkpointId, pChkInfo->checkpointVer, pChkInfo->currentVer);
|
||||
pTask->chkInfo.nextProcessVer = pTask->chkInfo.checkpointVer + 1;
|
||||
qInfo("s-task:%s restore from the checkpointId:%" PRId64 " ver:%" PRId64 " nextProcessVer:%" PRId64, pTask->id.idStr,
|
||||
pChkInfo->checkpointId, pChkInfo->checkpointVer, pChkInfo->nextProcessVer);
|
||||
} else {
|
||||
if (pTask->chkInfo.currentVer == -1) {
|
||||
pTask->chkInfo.currentVer = 0;
|
||||
if (pTask->chkInfo.nextProcessVer == -1) {
|
||||
pTask->chkInfo.nextProcessVer = 0;
|
||||
}
|
||||
}
|
||||
|
||||
qInfo("snode:%d expand stream task, s-task:%s, checkpointId:%" PRId64 " checkpointVer:%" PRId64 " currentVer:%" PRId64
|
||||
qInfo("snode:%d expand stream task, s-task:%s, checkpointId:%" PRId64 " checkpointVer:%" PRId64 " nextProcessVer:%" PRId64
|
||||
" child id:%d, level:%d, status:%s fill-history:%d, trigger:%" PRId64 " ms",
|
||||
SNODE_HANDLE, pTask->id.idStr, pChkInfo->checkpointId, pChkInfo->checkpointVer, pChkInfo->currentVer,
|
||||
SNODE_HANDLE, pTask->id.idStr, pChkInfo->checkpointId, pChkInfo->checkpointVer, pChkInfo->nextProcessVer,
|
||||
pTask->info.selfChildId, pTask->info.taskLevel, streamGetTaskStatusStr(pTask->status.taskStatus),
|
||||
pTask->info.fillHistory, pTask->info.triggerParam);
|
||||
|
||||
|
|
|
@ -250,7 +250,7 @@ int32_t tqProcessTaskDropReq(STQ* pTq, int64_t version, char* msg, int32_t msgLe
|
|||
int32_t tqProcessTaskPauseReq(STQ* pTq, int64_t version, char* msg, int32_t msgLen);
|
||||
int32_t tqProcessTaskResumeReq(STQ* pTq, int64_t version, char* msg, int32_t msgLen);
|
||||
int32_t tqProcessStreamTaskCheckReq(STQ* pTq, SRpcMsg* pMsg);
|
||||
int32_t tqProcessStreamTaskCheckRsp(STQ* pTq, int64_t version, SRpcMsg* pMsg);
|
||||
int32_t tqProcessStreamTaskCheckRsp(STQ* pTq, SRpcMsg* pMsg);
|
||||
int32_t tqProcessTaskRunReq(STQ* pTq, SRpcMsg* pMsg);
|
||||
int32_t tqProcessTaskDispatchReq(STQ* pTq, SRpcMsg* pMsg, bool exec);
|
||||
int32_t tqProcessTaskDispatchRsp(STQ* pTq, SRpcMsg* pMsg);
|
||||
|
|
|
@ -819,7 +819,7 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver) {
|
|||
|
||||
if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) {
|
||||
SWalFilterCond cond = {.deleteMsg = 1}; // delete msg also extract from wal files
|
||||
pTask->exec.pWalReader = walOpenReader(pTq->pVnode->pWal, &cond);
|
||||
pTask->exec.pWalReader = walOpenReader(pTq->pVnode->pWal, &cond, pTask->id.taskId);
|
||||
}
|
||||
|
||||
// reset the task status from unfinished transaction
|
||||
|
@ -834,14 +834,14 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver) {
|
|||
|
||||
// checkpoint ver is the kept version, handled data should be the next version.
|
||||
if (pTask->chkInfo.checkpointId != 0) {
|
||||
pTask->chkInfo.currentVer = pTask->chkInfo.checkpointVer + 1;
|
||||
pTask->chkInfo.nextProcessVer = pTask->chkInfo.checkpointVer + 1;
|
||||
tqInfo("s-task:%s restore from the checkpointId:%" PRId64 " ver:%" PRId64 " currentVer:%" PRId64, pTask->id.idStr,
|
||||
pChkInfo->checkpointId, pChkInfo->checkpointVer, pChkInfo->currentVer);
|
||||
pChkInfo->checkpointId, pChkInfo->checkpointVer, pChkInfo->nextProcessVer);
|
||||
}
|
||||
|
||||
tqInfo("vgId:%d expand stream task, s-task:%s, checkpointId:%" PRId64 " checkpointVer:%" PRId64 " currentVer:%" PRId64
|
||||
" child id:%d, level:%d, status:%s fill-history:%d, trigger:%" PRId64 " ms",
|
||||
vgId, pTask->id.idStr, pChkInfo->checkpointId, pChkInfo->checkpointVer, pChkInfo->currentVer,
|
||||
vgId, pTask->id.idStr, pChkInfo->checkpointId, pChkInfo->checkpointVer, pChkInfo->nextProcessVer,
|
||||
pTask->info.selfChildId, pTask->info.taskLevel, streamGetTaskStatusStr(pTask->status.taskStatus),
|
||||
pTask->info.fillHistory, pTask->info.triggerParam);
|
||||
|
||||
|
@ -890,9 +890,10 @@ int32_t tqProcessStreamTaskCheckReq(STQ* pTq, SRpcMsg* pMsg) {
|
|||
return streamSendCheckRsp(pTq->pStreamMeta, &req, &rsp, &pMsg->info, taskId);
|
||||
}
|
||||
|
||||
int32_t tqProcessStreamTaskCheckRsp(STQ* pTq, int64_t sversion, SRpcMsg* pMsg) {
|
||||
int32_t tqProcessStreamTaskCheckRsp(STQ* pTq, SRpcMsg* pMsg) {
|
||||
char* pReq = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
|
||||
int32_t len = pMsg->contLen - sizeof(SMsgHead);
|
||||
int32_t vgId = pTq->pStreamMeta->vgId;
|
||||
|
||||
int32_t code;
|
||||
SStreamTaskCheckRsp rsp;
|
||||
|
@ -901,7 +902,9 @@ int32_t tqProcessStreamTaskCheckRsp(STQ* pTq, int64_t sversion, SRpcMsg* pMsg) {
|
|||
tDecoderInit(&decoder, (uint8_t*)pReq, len);
|
||||
code = tDecodeStreamTaskCheckRsp(&decoder, &rsp);
|
||||
if (code < 0) {
|
||||
terrno = TSDB_CODE_INVALID_MSG;
|
||||
tDecoderClear(&decoder);
|
||||
tqError("vgId:%d failed to parse check rsp msg, code:%s", vgId, tstrerror(terrno));
|
||||
return -1;
|
||||
}
|
||||
|
||||
|
@ -1087,14 +1090,17 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) {
|
|||
}
|
||||
|
||||
// now we can stop the stream task execution
|
||||
streamTaskHalt(pStreamTask);
|
||||
|
||||
int64_t latestVer = 0;
|
||||
taosThreadMutexLock(&pStreamTask->lock);
|
||||
streamTaskHalt(pStreamTask);
|
||||
tqDebug("s-task:%s level:%d sched-status:%d is halt by fill-history task:%s", pStreamTask->id.idStr,
|
||||
pStreamTask->info.taskLevel, pStreamTask->status.schedStatus, id);
|
||||
latestVer = walReaderGetCurrentVer(pStreamTask->exec.pWalReader);
|
||||
taosThreadMutexUnlock(&pStreamTask->lock);
|
||||
|
||||
// if it's an source task, extract the last version in wal.
|
||||
pRange = &pTask->dataRange.range;
|
||||
int64_t latestVer = walReaderGetCurrentVer(pStreamTask->exec.pWalReader);
|
||||
done = streamHistoryTaskSetVerRangeStep2(pTask, latestVer);
|
||||
|
||||
if (done) {
|
||||
|
@ -1115,7 +1121,7 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) {
|
|||
|
||||
int64_t dstVer = pTask->dataRange.range.minVer - 1;
|
||||
|
||||
pTask->chkInfo.currentVer = dstVer;
|
||||
pTask->chkInfo.nextProcessVer = dstVer;
|
||||
walReaderSetSkipToVersion(pTask->exec.pWalReader, dstVer);
|
||||
tqDebug("s-task:%s wal reader start scan WAL verRange:%" PRId64 "-%" PRId64 ", set sched-status:%d", id, dstVer,
|
||||
pTask->dataRange.range.maxVer, TASK_SCHED_STATUS__INACTIVE);
|
||||
|
@ -1123,7 +1129,7 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) {
|
|||
atomic_store_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__INACTIVE);
|
||||
|
||||
// set the fill-history task to be normal
|
||||
if (pTask->info.fillHistory == 1) {
|
||||
if (pTask->info.fillHistory == 1 && !streamTaskShouldStop(&pTask->status)) {
|
||||
streamSetStatusNormal(pTask);
|
||||
}
|
||||
|
||||
|
@ -1148,7 +1154,7 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) {
|
|||
tqDebug(
|
||||
"s-task:%s scan-history in stream time window completed, now start to handle data from WAL, start "
|
||||
"ver:%" PRId64 ", window:%" PRId64 " - %" PRId64,
|
||||
id, pTask->chkInfo.currentVer, pWindow->skey, pWindow->ekey);
|
||||
id, pTask->chkInfo.nextProcessVer, pWindow->skey, pWindow->ekey);
|
||||
}
|
||||
|
||||
code = streamTaskScanHistoryDataComplete(pTask);
|
||||
|
@ -1283,8 +1289,8 @@ int32_t tqProcessTaskRunReq(STQ* pTq, SRpcMsg* pMsg) {
|
|||
// even in halt status, the data in inputQ must be processed
|
||||
int8_t st = pTask->status.taskStatus;
|
||||
if (st == TASK_STATUS__NORMAL || st == TASK_STATUS__SCAN_HISTORY || st == TASK_STATUS__CK) {
|
||||
tqDebug("vgId:%d s-task:%s start to process block from inputQ, last chk point:%" PRId64, vgId, pTask->id.idStr,
|
||||
pTask->chkInfo.currentVer);
|
||||
tqDebug("vgId:%d s-task:%s start to process block from inputQ, next checked ver:%" PRId64, vgId, pTask->id.idStr,
|
||||
pTask->chkInfo.nextProcessVer);
|
||||
streamProcessRunReq(pTask);
|
||||
} else {
|
||||
atomic_store_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__INACTIVE);
|
||||
|
@ -1423,10 +1429,10 @@ int32_t tqProcessTaskResumeImpl(STQ* pTq, SStreamTask* pTask, int64_t sversion,
|
|||
walReaderSetSkipToVersion(pTask->exec.pWalReader, sversion);
|
||||
tqDebug("vgId:%d s-task:%s resume to exec, prev paused version:%" PRId64 ", start from vnode ver:%" PRId64
|
||||
", schedStatus:%d",
|
||||
vgId, pTask->id.idStr, pTask->chkInfo.currentVer, sversion, pTask->status.schedStatus);
|
||||
vgId, pTask->id.idStr, pTask->chkInfo.nextProcessVer, sversion, pTask->status.schedStatus);
|
||||
} else { // from the previous paused version and go on
|
||||
tqDebug("vgId:%d s-task:%s resume to exec, from paused ver:%" PRId64 ", vnode ver:%" PRId64 ", schedStatus:%d",
|
||||
vgId, pTask->id.idStr, pTask->chkInfo.currentVer, sversion, pTask->status.schedStatus);
|
||||
vgId, pTask->id.idStr, pTask->chkInfo.nextProcessVer, sversion, pTask->status.schedStatus);
|
||||
}
|
||||
|
||||
if (level == TASK_LEVEL__SOURCE && pTask->info.fillHistory &&
|
||||
|
|
|
@ -312,14 +312,14 @@ static int buildHandle(STQ* pTq, STqHandle* handle){
|
|||
return -1;
|
||||
}
|
||||
} else if (handle->execHandle.subType == TOPIC_SUB_TYPE__DB) {
|
||||
handle->pWalReader = walOpenReader(pVnode->pWal, NULL);
|
||||
handle->pWalReader = walOpenReader(pVnode->pWal, NULL, 0);
|
||||
handle->execHandle.pTqReader = tqReaderOpen(pVnode);
|
||||
|
||||
buildSnapContext(reader.vnode, reader.version, 0, handle->execHandle.subType, handle->fetchMeta,
|
||||
(SSnapContext**)(&reader.sContext));
|
||||
handle->execHandle.task = qCreateQueueExecTaskInfo(NULL, &reader, vgId, NULL, handle->consumerId);
|
||||
} else if (handle->execHandle.subType == TOPIC_SUB_TYPE__TABLE) {
|
||||
handle->pWalReader = walOpenReader(pVnode->pWal, NULL);
|
||||
handle->pWalReader = walOpenReader(pVnode->pWal, NULL, 0);
|
||||
|
||||
if(handle->execHandle.execTb.qmsg != NULL && strcmp(handle->execHandle.execTb.qmsg, "") != 0) {
|
||||
if (nodesStringToNode(handle->execHandle.execTb.qmsg, &handle->execHandle.execTb.node) != 0) {
|
||||
|
|
|
@ -187,25 +187,26 @@ end:
|
|||
int32_t tqFetchLog(STQ* pTq, STqHandle* pHandle, int64_t* fetchOffset, uint64_t reqId) {
|
||||
int32_t code = -1;
|
||||
int32_t vgId = TD_VID(pTq->pVnode);
|
||||
int64_t id = pHandle->pWalReader->readerId;
|
||||
|
||||
int64_t offset = *fetchOffset;
|
||||
int64_t lastVer = walGetLastVer(pHandle->pWalReader->pWal);
|
||||
int64_t committedVer = walGetCommittedVer(pHandle->pWalReader->pWal);
|
||||
int64_t appliedVer = walGetAppliedVer(pHandle->pWalReader->pWal);
|
||||
|
||||
wDebug("vgId:%d, wal start to fetch, index:%" PRId64 ", last index:%" PRId64 " commit index:%" PRId64 ", applied index:%" PRId64,
|
||||
vgId, offset, lastVer, committedVer, appliedVer);
|
||||
wDebug("vgId:%d, wal start to fetch, index:%" PRId64 ", last index:%" PRId64 " commit index:%" PRId64 ", applied index:%" PRId64", 0x%"PRIx64,
|
||||
vgId, offset, lastVer, committedVer, appliedVer, id);
|
||||
|
||||
while (offset <= appliedVer) {
|
||||
if (walFetchHead(pHandle->pWalReader, offset) < 0) {
|
||||
tqDebug("tmq poll: consumer:0x%" PRIx64 ", (epoch %d) vgId:%d offset %" PRId64
|
||||
", no more log to return, reqId:0x%" PRIx64,
|
||||
pHandle->consumerId, pHandle->epoch, vgId, offset, reqId);
|
||||
", no more log to return, reqId:0x%" PRIx64 " 0x%" PRIx64,
|
||||
pHandle->consumerId, pHandle->epoch, vgId, offset, reqId, id);
|
||||
goto END;
|
||||
}
|
||||
|
||||
tqDebug("vgId:%d, consumer:0x%" PRIx64 " taosx get msg ver %" PRId64 ", type: %s, reqId:0x%" PRIx64, vgId,
|
||||
pHandle->consumerId, offset, TMSG_INFO(pHandle->pWalReader->pHead->head.msgType), reqId);
|
||||
tqDebug("vgId:%d, consumer:0x%" PRIx64 " taosx get msg ver %" PRId64 ", type: %s, reqId:0x%" PRIx64" 0x%"PRIx64, vgId,
|
||||
pHandle->consumerId, offset, TMSG_INFO(pHandle->pWalReader->pHead->head.msgType), reqId, id);
|
||||
|
||||
if (pHandle->pWalReader->pHead->head.msgType == TDMT_VND_SUBMIT) {
|
||||
code = walFetchBody(pHandle->pWalReader);
|
||||
|
@ -250,7 +251,7 @@ STqReader* tqReaderOpen(SVnode* pVnode) {
|
|||
return NULL;
|
||||
}
|
||||
|
||||
pReader->pWalReader = walOpenReader(pVnode->pWal, NULL);
|
||||
pReader->pWalReader = walOpenReader(pVnode->pWal, NULL, 0);
|
||||
if (pReader->pWalReader == NULL) {
|
||||
taosMemoryFree(pReader);
|
||||
return NULL;
|
||||
|
@ -494,7 +495,7 @@ bool tqNextBlockImpl(STqReader* pReader, const char* idstr) {
|
|||
tqDebug("block found, ver:%" PRId64 ", uid:%" PRId64 ", %s", pReader->msg.ver, pSubmitTbData->uid, idstr);
|
||||
return true;
|
||||
} else {
|
||||
tqInfo("discard submit block, uid:%" PRId64 ", total queried tables:%d continue %s", pSubmitTbData->uid,
|
||||
tqDebug("discard submit block, uid:%" PRId64 ", total queried tables:%d continue %s", pSubmitTbData->uid,
|
||||
taosHashGetSize(pReader->tbIdHash), idstr);
|
||||
}
|
||||
|
||||
|
|
|
@ -24,10 +24,13 @@ typedef struct STableSinkInfo {
|
|||
tstr name;
|
||||
} STableSinkInfo;
|
||||
|
||||
static int32_t doSinkResultBlock(SVnode* pVnode, int32_t blockIndex, char* stbFullName, int64_t suid,
|
||||
SSDataBlock* pDataBlock, SStreamTask* pTask);
|
||||
static int32_t doSinkDeleteBlock(SVnode* pVnode, char* stbFullName, SSDataBlock* pDataBlock, SStreamTask* pTask,
|
||||
static int32_t doBuildSubmitFromResBlock(SVnode* pVnode, int32_t blockIndex, char* stbFullName, int64_t suid,
|
||||
SSDataBlock* pDataBlock, SStreamTask* pTask, SSubmitTbData* pTableData);
|
||||
static int32_t doBuildAndSendDeleteMsg(SVnode* pVnode, char* stbFullName, SSDataBlock* pDataBlock, SStreamTask* pTask,
|
||||
int64_t suid);
|
||||
static int32_t tqBuildSubmitReq(SSubmitReq2* pSubmitReq, int32_t vgId, void** pMsg, int32_t* msgLen);
|
||||
static void fillBucket(STokenBucket* pBucket);
|
||||
static bool hasAvailableToken(STokenBucket* pBucket);
|
||||
|
||||
int32_t tqBuildDeleteReq(const char* stbFullName, const SSDataBlock* pDataBlock, SBatchDeleteReq* deleteReq,
|
||||
const char* pIdStr) {
|
||||
|
@ -133,34 +136,18 @@ static int32_t tqPutReqToQueue(SVnode* pVnode, SVCreateTbBatchReq* pReqs) {
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
|
||||
void tqSinkToTablePipeline(SStreamTask* pTask, void* vnode, void* data) {
|
||||
const SArray* pBlocks = (const SArray*)data;
|
||||
SVnode* pVnode = (SVnode*)vnode;
|
||||
int64_t suid = pTask->tbSink.stbUid;
|
||||
char* stbFullName = pTask->tbSink.stbFullName;
|
||||
STSchema* pTSchema = pTask->tbSink.pTSchema;
|
||||
int32_t vgId = TD_VID(pVnode);
|
||||
int32_t numOfBlocks = taosArrayGetSize(pBlocks);
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
|
||||
tqDebug("vgId:%d, s-task:%s write %d stream resBlock(s) into table", vgId, pTask->id.idStr, numOfBlocks);
|
||||
|
||||
SArray* tagArray = NULL;
|
||||
SArray* pVals = NULL;
|
||||
SArray* crTblArray = NULL;
|
||||
|
||||
for (int32_t i = 0; i < numOfBlocks; i++) {
|
||||
SSDataBlock* pDataBlock = taosArrayGet(pBlocks, i);
|
||||
int32_t rows = pDataBlock->info.rows;
|
||||
|
||||
if (pDataBlock->info.type == STREAM_DELETE_RESULT) {
|
||||
code = doSinkDeleteBlock(pVnode, stbFullName, pDataBlock, pTask, suid);
|
||||
} else if (pDataBlock->info.type == STREAM_CREATE_CHILD_TABLE) {
|
||||
static int32_t doBuildAndSendCreateTableMsg(SVnode* pVnode, char* stbFullName, SSDataBlock* pDataBlock, SStreamTask* pTask,
|
||||
int64_t suid) {
|
||||
tqDebug("s-task:%s build create table msg", pTask->id.idStr);
|
||||
|
||||
STSchema* pTSchema = pTask->tbSink.pTSchema;
|
||||
int32_t rows = pDataBlock->info.rows;
|
||||
SArray* tagArray = NULL;
|
||||
int32_t code = 0;
|
||||
|
||||
SVCreateTbBatchReq reqs = {0};
|
||||
crTblArray = reqs.pArray = taosArrayInit(1, sizeof(SVCreateTbReq));
|
||||
|
||||
SArray* crTblArray = reqs.pArray = taosArrayInit(1, sizeof(SVCreateTbReq));
|
||||
if (NULL == reqs.pArray) {
|
||||
goto _end;
|
||||
}
|
||||
|
@ -182,7 +169,6 @@ void tqSinkToTablePipeline(SStreamTask* pTask, void* vnode, void* data) {
|
|||
int32_t size = taosArrayGetSize(pDataBlock->pDataBlock);
|
||||
if (size == 2) {
|
||||
tagArray = taosArrayInit(1, sizeof(STagVal));
|
||||
|
||||
if (!tagArray) {
|
||||
tdDestroySVCreateTbReq(pCreateTbReq);
|
||||
goto _end;
|
||||
|
@ -220,7 +206,6 @@ void tqSinkToTablePipeline(SStreamTask* pTask, void* vnode, void* data) {
|
|||
}
|
||||
taosArrayPush(tagArray, &tagVal);
|
||||
}
|
||||
|
||||
}
|
||||
pCreateTbReq->ctb.tagNum = TMAX(size - UD_TAG_COLUMN_INDEX, 1);
|
||||
|
||||
|
@ -229,7 +214,7 @@ void tqSinkToTablePipeline(SStreamTask* pTask, void* vnode, void* data) {
|
|||
tagArray = taosArrayDestroy(tagArray);
|
||||
if (pTag == NULL) {
|
||||
tdDestroySVCreateTbReq(pCreateTbReq);
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
goto _end;
|
||||
}
|
||||
|
||||
|
@ -238,6 +223,7 @@ void tqSinkToTablePipeline(SStreamTask* pTask, void* vnode, void* data) {
|
|||
// set table name
|
||||
if (!pDataBlock->info.parTbName[0]) {
|
||||
SColumnInfoData* pGpIdColInfo = taosArrayGet(pDataBlock->pDataBlock, UD_GROUPID_COLUMN_INDEX);
|
||||
|
||||
void* pGpIdData = colDataGetData(pGpIdColInfo, rowId);
|
||||
pCreateTbReq->name = buildCtbNameByGroupId(stbFullName, *(uint64_t*)pGpIdData);
|
||||
} else {
|
||||
|
@ -249,30 +235,200 @@ void tqSinkToTablePipeline(SStreamTask* pTask, void* vnode, void* data) {
|
|||
}
|
||||
|
||||
reqs.nReqs = taosArrayGetSize(reqs.pArray);
|
||||
if (tqPutReqToQueue(pVnode, &reqs) != TSDB_CODE_SUCCESS) {
|
||||
goto _end;
|
||||
code = tqPutReqToQueue(pVnode, &reqs);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
tqError("s-task:%s failed to send create table msg", pTask->id.idStr);
|
||||
}
|
||||
|
||||
tagArray = taosArrayDestroy(tagArray);
|
||||
taosArrayDestroyEx(crTblArray, (FDelete)tdDestroySVCreateTbReq);
|
||||
crTblArray = NULL;
|
||||
} else if (pDataBlock->info.type == STREAM_CHECKPOINT) {
|
||||
continue;
|
||||
} else {
|
||||
code = doSinkResultBlock(pVnode, i, stbFullName, suid, pDataBlock, pTask);
|
||||
}
|
||||
}
|
||||
|
||||
tqDebug("vgId:%d, s-task:%s write results completed", vgId, pTask->id.idStr);
|
||||
|
||||
_end:
|
||||
taosArrayDestroy(tagArray);
|
||||
taosArrayDestroy(pVals);
|
||||
taosArrayDestroyEx(crTblArray, (FDelete)tdDestroySVCreateTbReq);
|
||||
return code;
|
||||
}
|
||||
|
||||
static int32_t doBuildSubmitAndSendMsg(SVnode* pVnode, SStreamTask* pTask, int32_t numOfBlocks, SSubmitReq2* pReq) {
|
||||
const char* id = pTask->id.idStr;
|
||||
int32_t vgId = TD_VID(pVnode);
|
||||
int32_t len = 0;
|
||||
void* pBuf = NULL;
|
||||
|
||||
int32_t code = tqBuildSubmitReq(pReq, vgId, &pBuf, &len);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
tqError("s-task:%s build submit msg failed, vgId:%d, code:%s", id, vgId, tstrerror(code));
|
||||
return code;
|
||||
}
|
||||
|
||||
SRpcMsg msg = {.msgType = TDMT_VND_SUBMIT, .pCont = pBuf, .contLen = len};
|
||||
code = tmsgPutToQueue(&pVnode->msgCb, WRITE_QUEUE, &msg);
|
||||
if (code == TSDB_CODE_SUCCESS) {
|
||||
tqDebug("s-task:%s vgId:%d send submit %d blocks into dstTables completed", id, vgId, numOfBlocks);
|
||||
} else {
|
||||
tqError("s-task:%s failed to put into write-queue since %s", id, terrstr());
|
||||
}
|
||||
|
||||
pTask->sinkRecorder.numOfSubmit += 1;
|
||||
|
||||
if ((pTask->sinkRecorder.numOfSubmit % 5000) == 0) {
|
||||
SSinkTaskRecorder* pRec = &pTask->sinkRecorder;
|
||||
double el = (taosGetTimestampMs() - pTask->tsInfo.sinkStart) / 1000.0;
|
||||
tqInfo("s-task:%s vgId:%d write %" PRId64 " blocks (%" PRId64 " rows) in %" PRId64
|
||||
" submit into dst table, duration:%.2f Sec.",
|
||||
pTask->id.idStr, vgId, pRec->numOfBlocks, pRec->numOfRows, pRec->numOfSubmit, el);
|
||||
}
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
void tqSinkToTablePipeline(SStreamTask* pTask, void* vnode, void* data) {
|
||||
const SArray* pBlocks = (const SArray*)data;
|
||||
SVnode* pVnode = (SVnode*)vnode;
|
||||
int64_t suid = pTask->tbSink.stbUid;
|
||||
char* stbFullName = pTask->tbSink.stbFullName;
|
||||
STSchema* pTSchema = pTask->tbSink.pTSchema;
|
||||
int32_t vgId = TD_VID(pVnode);
|
||||
int32_t numOfBlocks = taosArrayGetSize(pBlocks);
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
const char* id = pTask->id.idStr;
|
||||
|
||||
if (pTask->tsInfo.sinkStart == 0) {
|
||||
pTask->tsInfo.sinkStart = taosGetTimestampMs();
|
||||
}
|
||||
|
||||
bool isMixBlocks = true;
|
||||
for(int32_t i = 0; i < numOfBlocks; ++i) {
|
||||
SSDataBlock* p = taosArrayGet(pBlocks, i);
|
||||
if (p->info.type == STREAM_DELETE_RESULT || p->info.type == STREAM_CREATE_CHILD_TABLE) {
|
||||
isMixBlocks = true;
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
if (isMixBlocks) {
|
||||
tqDebug("vgId:%d, s-task:%s write %d stream resBlock(s) into table, has delete block, submit one-by-one", vgId, id,
|
||||
numOfBlocks);
|
||||
|
||||
for(int32_t i = 0; i < numOfBlocks; ++i) {
|
||||
SSDataBlock* pDataBlock = taosArrayGet(pBlocks, i);
|
||||
if (pDataBlock->info.type == STREAM_DELETE_RESULT) {
|
||||
code = doBuildAndSendDeleteMsg(pVnode, stbFullName, pDataBlock, pTask, suid);
|
||||
} else if (pDataBlock->info.type == STREAM_CREATE_CHILD_TABLE) {
|
||||
code = doBuildAndSendCreateTableMsg(pVnode, stbFullName, pDataBlock, pTask, suid);
|
||||
} else if (pDataBlock->info.type == STREAM_CHECKPOINT) {
|
||||
continue;
|
||||
} else {
|
||||
pTask->sinkRecorder.numOfBlocks += 1;
|
||||
|
||||
SSubmitReq2 submitReq = {.aSubmitTbData = taosArrayInit(1, sizeof(SSubmitTbData))};
|
||||
if (submitReq.aSubmitTbData == NULL) {
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
tqError("s-task:%s vgId:%d failed to prepare submit msg in sink task, code:%s", id, vgId, tstrerror(code));
|
||||
return;
|
||||
}
|
||||
|
||||
SSubmitTbData tbData = {.suid = suid, .uid = 0, .sver = pTSchema->version};
|
||||
code = doBuildSubmitFromResBlock(pVnode, i, stbFullName, suid, pDataBlock, pTask, &tbData);
|
||||
taosArrayPush(submitReq.aSubmitTbData, &tbData);
|
||||
|
||||
code = doBuildSubmitAndSendMsg(pVnode, pTask, 1, &submitReq);
|
||||
}
|
||||
}
|
||||
} else {
|
||||
tqDebug("vgId:%d, s-task:%s write %d stream resBlock(s) into table, merge submit msg", vgId, id, numOfBlocks);
|
||||
SHashObj* pTableIndexMap = taosHashInit(numOfBlocks, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK);
|
||||
|
||||
SSubmitReq2 submitReq = {.aSubmitTbData = taosArrayInit(1, sizeof(SSubmitTbData))};
|
||||
if (submitReq.aSubmitTbData == NULL) {
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
tqError("s-task:%s vgId:%d failed to prepare submit msg in sink task, code:%s", id, vgId, tstrerror(code));
|
||||
taosHashCleanup(pTableIndexMap);
|
||||
return;
|
||||
}
|
||||
|
||||
bool hasSubmit = false;
|
||||
|
||||
for (int32_t i = 0; i < numOfBlocks; i++) {
|
||||
SSDataBlock* pDataBlock = taosArrayGet(pBlocks, i);
|
||||
if (pDataBlock->info.type == STREAM_DELETE_RESULT) {
|
||||
code = doBuildAndSendDeleteMsg(pVnode, stbFullName, pDataBlock, pTask, suid);
|
||||
} else if (pDataBlock->info.type == STREAM_CREATE_CHILD_TABLE) {
|
||||
code = doBuildAndSendCreateTableMsg(pVnode, stbFullName, pDataBlock, pTask, suid);
|
||||
} else if (pDataBlock->info.type == STREAM_CHECKPOINT) {
|
||||
continue;
|
||||
} else {
|
||||
hasSubmit = true;
|
||||
pTask->sinkRecorder.numOfBlocks += 1;
|
||||
|
||||
SSubmitTbData tbData = {.suid = suid, .uid = 0, .sver = pTSchema->version};
|
||||
code = doBuildSubmitFromResBlock(pVnode, i, stbFullName, suid, pDataBlock, pTask, &tbData);
|
||||
|
||||
int32_t* index = taosHashGet(pTableIndexMap, &tbData.uid, sizeof(tbData.uid));
|
||||
if (index == NULL) { // no data yet, append it
|
||||
taosArrayPush(submitReq.aSubmitTbData, &tbData);
|
||||
|
||||
int32_t size = (int32_t)taosArrayGetSize(submitReq.aSubmitTbData) - 1;
|
||||
taosHashPut(pTableIndexMap, &tbData.uid, sizeof(tbData.uid), &size, sizeof(size));
|
||||
} else {
|
||||
SSubmitTbData* pExisted = taosArrayGet(submitReq.aSubmitTbData, *index);
|
||||
// merge the new submit table block with the existed blocks
|
||||
// if ts in the new data block overlap with existed one, replace it
|
||||
int32_t oldLen = taosArrayGetSize(pExisted->aRowP);
|
||||
int32_t newLen = taosArrayGetSize(tbData.aRowP);
|
||||
|
||||
int32_t j = 0, k = 0;
|
||||
SArray* pFinal = taosArrayInit(oldLen + newLen, POINTER_BYTES);
|
||||
while (j < newLen && k < oldLen) {
|
||||
SRow* pNewRow = taosArrayGetP(tbData.aRowP, j);
|
||||
SRow* pOldRow = taosArrayGetP(pExisted->aRowP, k);
|
||||
if (pNewRow->ts <= pOldRow->ts) {
|
||||
taosArrayPush(pFinal, &pNewRow);
|
||||
if (pNewRow->ts < pOldRow->ts) {
|
||||
j += 1;
|
||||
} else {
|
||||
j += 1;
|
||||
k += 1;
|
||||
}
|
||||
} else {
|
||||
taosArrayPush(pFinal, &pOldRow);
|
||||
k += 1;
|
||||
}
|
||||
}
|
||||
|
||||
while (j < newLen) {
|
||||
SRow* pRow = taosArrayGetP(tbData.aRowP, j++);
|
||||
taosArrayPush(pFinal, &pRow);
|
||||
}
|
||||
|
||||
while (k < oldLen) {
|
||||
SRow* pRow = taosArrayGetP(pExisted->aRowP, k++);
|
||||
taosArrayPush(pFinal, &pRow);
|
||||
}
|
||||
|
||||
taosArrayDestroy(tbData.aRowP);
|
||||
taosArrayDestroy(pExisted->aRowP);
|
||||
pExisted->aRowP = pFinal;
|
||||
|
||||
tqDebug("s-task:%s rows merged, final rows:%d, uid:%" PRId64 ", existed auto-create table:%d, new-block:%d", id,
|
||||
(int32_t)taosArrayGetSize(pFinal), pExisted->uid, (pExisted->pCreateTbReq != NULL), (tbData.pCreateTbReq != NULL));
|
||||
}
|
||||
|
||||
pTask->sinkRecorder.numOfRows += pDataBlock->info.rows;
|
||||
}
|
||||
}
|
||||
|
||||
taosHashCleanup(pTableIndexMap);
|
||||
|
||||
if (hasSubmit) {
|
||||
doBuildSubmitAndSendMsg(pVnode, pTask, numOfBlocks, &submitReq);
|
||||
} else {
|
||||
tDestroySubmitReq(&submitReq, TSDB_MSG_FLG_ENCODE);
|
||||
tqDebug("vgId:%d, s-task:%s write results completed", vgId, id);
|
||||
}
|
||||
}
|
||||
|
||||
// TODO: change
|
||||
}
|
||||
|
||||
int32_t doSinkDeleteBlock(SVnode* pVnode, char* stbFullName, SSDataBlock* pDataBlock, SStreamTask* pTask,
|
||||
int32_t doBuildAndSendDeleteMsg(SVnode* pVnode, char* stbFullName, SSDataBlock* pDataBlock, SStreamTask* pTask,
|
||||
int64_t suid) {
|
||||
SBatchDeleteReq deleteReq = {.suid = suid, .deleteReqs = taosArrayInit(0, sizeof(SSingleDeleteReq))};
|
||||
|
||||
|
@ -390,7 +546,6 @@ static int32_t doPutIntoCache(SSHashObj* pSinkTableMap, STableSinkInfo* pTableSi
|
|||
int32_t code = tqPutTableInfo(pSinkTableMap, groupId, pTableSinkInfo);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
taosMemoryFreeClear(pTableSinkInfo);
|
||||
tqError("s-task:%s failed to put tableSinkInfo in to cache, code:%s", id, tstrerror(code));
|
||||
} else {
|
||||
tqDebug("s-task:%s new dst table:%s(uid:%" PRIu64 ") added into cache, total:%d", id, pTableSinkInfo->name.data,
|
||||
pTableSinkInfo->uid, tSimpleHashGetSize(pSinkTableMap));
|
||||
|
@ -399,26 +554,75 @@ static int32_t doPutIntoCache(SSHashObj* pSinkTableMap, STableSinkInfo* pTableSi
|
|||
return code;
|
||||
}
|
||||
|
||||
int32_t doSinkResultBlock(SVnode* pVnode, int32_t blockIndex, char* stbFullName, int64_t suid, SSDataBlock* pDataBlock,
|
||||
SStreamTask* pTask) {
|
||||
int32_t tqBuildSubmitReq(SSubmitReq2* pSubmitReq, int32_t vgId, void** pMsg, int32_t* msgLen) {
|
||||
int32_t code = 0;
|
||||
void* pBuf = NULL;
|
||||
*msgLen = 0;
|
||||
|
||||
// encode
|
||||
int32_t len = 0;
|
||||
tEncodeSize(tEncodeSubmitReq, pSubmitReq, len, code);
|
||||
|
||||
SEncoder encoder;
|
||||
len += sizeof(SSubmitReq2Msg);
|
||||
|
||||
pBuf = rpcMallocCont(len);
|
||||
if (NULL == pBuf) {
|
||||
tDestroySubmitReq(pSubmitReq, TSDB_MSG_FLG_ENCODE);
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
|
||||
((SSubmitReq2Msg*)pBuf)->header.vgId = vgId;
|
||||
((SSubmitReq2Msg*)pBuf)->header.contLen = htonl(len);
|
||||
((SSubmitReq2Msg*)pBuf)->version = htobe64(1);
|
||||
|
||||
tEncoderInit(&encoder, POINTER_SHIFT(pBuf, sizeof(SSubmitReq2Msg)), len - sizeof(SSubmitReq2Msg));
|
||||
if (tEncodeSubmitReq(&encoder, pSubmitReq) < 0) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
tqError("failed to encode submit req, code:%s, ignore and continue", terrstr());
|
||||
tEncoderClear(&encoder);
|
||||
rpcFreeCont(pBuf);
|
||||
tDestroySubmitReq(pSubmitReq, TSDB_MSG_FLG_ENCODE);
|
||||
return code;
|
||||
}
|
||||
|
||||
tEncoderClear(&encoder);
|
||||
tDestroySubmitReq(pSubmitReq, TSDB_MSG_FLG_ENCODE);
|
||||
|
||||
*msgLen = len;
|
||||
*pMsg = pBuf;
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static int32_t tsAscendingSortFn(const void* p1, const void* p2) {
|
||||
SRow* pRow1 = *(SRow**) p1;
|
||||
SRow* pRow2 = *(SRow**) p2;
|
||||
|
||||
if (pRow1->ts == pRow2->ts) {
|
||||
return 0;
|
||||
} else {
|
||||
return pRow1->ts > pRow2->ts? 1:-1;
|
||||
}
|
||||
}
|
||||
|
||||
int32_t doBuildSubmitFromResBlock(SVnode* pVnode, int32_t blockIndex, char* stbFullName, int64_t suid,
|
||||
SSDataBlock* pDataBlock, SStreamTask* pTask, SSubmitTbData* pTableData) {
|
||||
int32_t numOfRows = pDataBlock->info.rows;
|
||||
int32_t vgId = TD_VID(pVnode);
|
||||
uint64_t groupId = pDataBlock->info.id.groupId;
|
||||
STSchema* pTSchema = pTask->tbSink.pTSchema;
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
void* pBuf = NULL;
|
||||
SArray* pVals = NULL;
|
||||
const char* id = pTask->id.idStr;
|
||||
|
||||
SSubmitTbData tbData = {.suid = suid, .uid = 0, .sver = pTSchema->version};
|
||||
tqDebug("s-task:%s sink data pipeline, build submit msg from %d-th resBlock, including %d rows, dst suid:%" PRId64,
|
||||
tqDebug("s-task:%s sink data pipeline, build submit msg from %dth resBlock, including %d rows, dst suid:%" PRId64,
|
||||
id, blockIndex + 1, numOfRows, suid);
|
||||
|
||||
tbData.aRowP = taosArrayInit(numOfRows, sizeof(SRow*));
|
||||
pTableData->aRowP = taosArrayInit(numOfRows, sizeof(SRow*));
|
||||
pVals = taosArrayInit(pTSchema->numOfCols, sizeof(SColVal));
|
||||
|
||||
if (tbData.aRowP == NULL || pVals == NULL) {
|
||||
taosArrayDestroy(tbData.aRowP);
|
||||
if (pTableData->aRowP == NULL || pVals == NULL) {
|
||||
pTableData->aRowP = taosArrayDestroy(pTableData->aRowP);
|
||||
taosArrayDestroy(pVals);
|
||||
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
|
@ -459,13 +663,21 @@ int32_t doSinkResultBlock(SVnode* pVnode, int32_t blockIndex, char* stbFullName,
|
|||
}
|
||||
|
||||
if (exist) {
|
||||
tbData.uid = pTableSinkInfo->uid;
|
||||
pTableData->uid = pTableSinkInfo->uid;
|
||||
|
||||
if (tbData.uid == 0) {
|
||||
if (pTableData->uid == 0) {
|
||||
tqDebug("s-task:%s cached tableInfo uid is invalid, acquire it from meta", id);
|
||||
}
|
||||
|
||||
while (pTableSinkInfo->uid == 0) {
|
||||
if (streamTaskShouldStop(&pTask->status)) {
|
||||
tqDebug("s-task:%s task will stop, quit from waiting for table:%s create", id, dstTableName);
|
||||
pTableData->aRowP = taosArrayDestroy(pTableData->aRowP);
|
||||
taosArrayDestroy(pVals);
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
// wait for the table to be created
|
||||
SMetaReader mr = {0};
|
||||
metaReaderDoInit(&mr, pVnode->pMeta, 0);
|
||||
|
@ -476,21 +688,21 @@ int32_t doSinkResultBlock(SVnode* pVnode, int32_t blockIndex, char* stbFullName,
|
|||
if (!isValid) { // not valid table, ignore it
|
||||
metaReaderClear(&mr);
|
||||
|
||||
taosArrayDestroy(tbData.aRowP);
|
||||
pTableData->aRowP = taosArrayDestroy(pTableData->aRowP);
|
||||
taosArrayDestroy(pVals);
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
} else {
|
||||
tqDebug("s-task:%s set uid:%"PRIu64" for dstTable:%s from meta", id, mr.me.uid, pTableSinkInfo->name.data);
|
||||
|
||||
tbData.uid = mr.me.uid;
|
||||
pTableData->uid = mr.me.uid;
|
||||
pTableSinkInfo->uid = mr.me.uid;
|
||||
metaReaderClear(&mr);
|
||||
}
|
||||
} else { // not exist, wait and retry
|
||||
metaReaderClear(&mr);
|
||||
taosMsleep(100);
|
||||
tqDebug("s-task:%s wait for the table:%s ready before insert data", id, dstTableName);
|
||||
tqDebug("s-task:%s wait 100ms for the table:%s ready before insert data", id, dstTableName);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -510,12 +722,12 @@ int32_t doSinkResultBlock(SVnode* pVnode, int32_t blockIndex, char* stbFullName,
|
|||
|
||||
tqDebug("s-task:%s stream write into table:%s, table auto created", id, dstTableName);
|
||||
|
||||
tbData.flags = SUBMIT_REQ_AUTO_CREATE_TABLE;
|
||||
tbData.pCreateTbReq = buildAutoCreateTableReq(stbFullName, suid, pTSchema->numOfCols + 1, pDataBlock);
|
||||
if (tbData.pCreateTbReq == NULL) {
|
||||
pTableData->flags = SUBMIT_REQ_AUTO_CREATE_TABLE;
|
||||
pTableData->pCreateTbReq = buildAutoCreateTableReq(stbFullName, suid, pTSchema->numOfCols + 1, pDataBlock);
|
||||
if (pTableData->pCreateTbReq == NULL) {
|
||||
tqError("s-task:%s failed to build auto create table req, code:%s", id, tstrerror(terrno));
|
||||
|
||||
taosArrayDestroy(tbData.aRowP);
|
||||
pTableData->aRowP = taosArrayDestroy(pTableData->aRowP);
|
||||
taosArrayDestroy(pVals);
|
||||
|
||||
return terrno;
|
||||
|
@ -527,14 +739,14 @@ int32_t doSinkResultBlock(SVnode* pVnode, int32_t blockIndex, char* stbFullName,
|
|||
if (!isValid) {
|
||||
metaReaderClear(&mr);
|
||||
taosMemoryFree(pTableSinkInfo);
|
||||
taosArrayDestroy(tbData.aRowP);
|
||||
pTableData->aRowP = taosArrayDestroy(pTableData->aRowP);
|
||||
taosArrayDestroy(pVals);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
} else {
|
||||
tbData.uid = mr.me.uid;
|
||||
pTableData->uid = mr.me.uid;
|
||||
metaReaderClear(&mr);
|
||||
|
||||
doPutIntoCache(pTask->tbSink.pTblInfo, pTableSinkInfo, groupId, tbData.uid, id);
|
||||
doPutIntoCache(pTask->tbSink.pTblInfo, pTableSinkInfo, groupId, pTableData->uid, id);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -565,7 +777,7 @@ int32_t doSinkResultBlock(SVnode* pVnode, int32_t blockIndex, char* stbFullName,
|
|||
void* colData = colDataGetData(pColData, j);
|
||||
if (IS_STR_DATA_TYPE(pCol->type)) {
|
||||
// address copy, no value
|
||||
SValue sv = (SValue){.nData = varDataLen(colData), .pData = varDataVal(colData)};
|
||||
SValue sv = (SValue){.nData = varDataLen(colData), .pData = (uint8_t*) varDataVal(colData)};
|
||||
SColVal cv = COL_VAL_VALUE(pCol->colId, pCol->type, sv);
|
||||
taosArrayPush(pVals, &cv);
|
||||
} else {
|
||||
|
@ -582,68 +794,19 @@ int32_t doSinkResultBlock(SVnode* pVnode, int32_t blockIndex, char* stbFullName,
|
|||
SRow* pRow = NULL;
|
||||
code = tRowBuild(pVals, (STSchema*)pTSchema, &pRow);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
tDestroySubmitTbData(&tbData, TSDB_MSG_FLG_ENCODE);
|
||||
tDestroySubmitTbData(pTableData, TSDB_MSG_FLG_ENCODE);
|
||||
|
||||
taosArrayDestroy(tbData.aRowP);
|
||||
pTableData->aRowP = taosArrayDestroy(pTableData->aRowP);
|
||||
taosArrayDestroy(pVals);
|
||||
return code;
|
||||
}
|
||||
|
||||
ASSERT(pRow);
|
||||
taosArrayPush(tbData.aRowP, &pRow);
|
||||
taosArrayPush(pTableData->aRowP, &pRow);
|
||||
}
|
||||
|
||||
SSubmitReq2 submitReq = {0};
|
||||
if (!(submitReq.aSubmitTbData = taosArrayInit(1, sizeof(SSubmitTbData)))) {
|
||||
tDestroySubmitTbData(&tbData, TSDB_MSG_FLG_ENCODE);
|
||||
|
||||
taosArrayDestroy(tbData.aRowP);
|
||||
taosArrayDestroy(pVals);
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
|
||||
taosArrayPush(submitReq.aSubmitTbData, &tbData);
|
||||
|
||||
// encode
|
||||
int32_t len = 0;
|
||||
tEncodeSize(tEncodeSubmitReq, &submitReq, len, code);
|
||||
|
||||
SEncoder encoder;
|
||||
len += sizeof(SSubmitReq2Msg);
|
||||
|
||||
pBuf = rpcMallocCont(len);
|
||||
if (NULL == pBuf) {
|
||||
tDestroySubmitReq(&submitReq, TSDB_MSG_FLG_ENCODE);
|
||||
taosArrayDestroy(tbData.aRowP);
|
||||
taosArrayDestroy(pVals);
|
||||
}
|
||||
|
||||
((SSubmitReq2Msg*)pBuf)->header.vgId = vgId;
|
||||
((SSubmitReq2Msg*)pBuf)->header.contLen = htonl(len);
|
||||
((SSubmitReq2Msg*)pBuf)->version = htobe64(1);
|
||||
|
||||
tEncoderInit(&encoder, POINTER_SHIFT(pBuf, sizeof(SSubmitReq2Msg)), len - sizeof(SSubmitReq2Msg));
|
||||
if (tEncodeSubmitReq(&encoder, &submitReq) < 0) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
tqError("failed to encode submit req, code:%s, ignore and continue", terrstr());
|
||||
tEncoderClear(&encoder);
|
||||
rpcFreeCont(pBuf);
|
||||
tDestroySubmitReq(&submitReq, TSDB_MSG_FLG_ENCODE);
|
||||
|
||||
return code;
|
||||
}
|
||||
|
||||
tEncoderClear(&encoder);
|
||||
tDestroySubmitReq(&submitReq, TSDB_MSG_FLG_ENCODE);
|
||||
|
||||
SRpcMsg msg = { .msgType = TDMT_VND_SUBMIT, .pCont = pBuf, .contLen = len };
|
||||
code = tmsgPutToQueue(&pVnode->msgCb, WRITE_QUEUE, &msg);
|
||||
|
||||
if(code == TSDB_CODE_SUCCESS) {
|
||||
tqDebug("s-task:%s send submit msg to dstTable:%s, numOfRows:%d", id, pTableSinkInfo->name.data, numOfRows);
|
||||
} else {
|
||||
tqError("s-task:%s failed to put into write-queue since %s", id, terrstr());
|
||||
}
|
||||
taosArraySort(pTableData->aRowP, tsAscendingSortFn);
|
||||
tqDebug("s-task:%s build submit msg for dstTable:%s, numOfRows:%d", id, dstTableName, numOfRows);
|
||||
|
||||
taosArrayDestroy(pVals);
|
||||
return code;
|
||||
|
|
|
@ -112,7 +112,7 @@ int32_t tqCheckAndRunStreamTaskAsync(STQ* pTq) {
|
|||
|
||||
int32_t numOfTasks = taosArrayGetSize(pMeta->pTaskList);
|
||||
if (numOfTasks == 0) {
|
||||
tqInfo("vgId:%d no stream tasks exist", vgId);
|
||||
tqDebug("vgId:%d no stream tasks existed to run", vgId);
|
||||
taosWUnLockLatch(&pMeta->lock);
|
||||
return 0;
|
||||
}
|
||||
|
@ -150,7 +150,7 @@ int32_t tqScanWalAsync(STQ* pTq, bool ckPause) {
|
|||
|
||||
int32_t numOfTasks = taosArrayGetSize(pMeta->pTaskList);
|
||||
if (numOfTasks == 0) {
|
||||
tqInfo("vgId:%d no stream tasks exist", vgId);
|
||||
tqDebug("vgId:%d no stream tasks existed to run", vgId);
|
||||
taosWUnLockLatch(&pMeta->lock);
|
||||
return 0;
|
||||
}
|
||||
|
@ -256,36 +256,36 @@ int32_t tqStartStreamTasks(STQ* pTq) {
|
|||
int32_t setWalReaderStartOffset(SStreamTask* pTask, int32_t vgId) {
|
||||
// seek the stored version and extract data from WAL
|
||||
int64_t firstVer = walReaderGetValidFirstVer(pTask->exec.pWalReader);
|
||||
if (pTask->chkInfo.currentVer < firstVer) {
|
||||
if (pTask->chkInfo.nextProcessVer < firstVer) {
|
||||
tqWarn("vgId:%d s-task:%s ver:%" PRId64 " earlier than the first ver of wal range %" PRId64 ", forward to %" PRId64,
|
||||
vgId, pTask->id.idStr, pTask->chkInfo.currentVer, firstVer, firstVer);
|
||||
vgId, pTask->id.idStr, pTask->chkInfo.nextProcessVer, firstVer, firstVer);
|
||||
|
||||
pTask->chkInfo.currentVer = firstVer;
|
||||
pTask->chkInfo.nextProcessVer = firstVer;
|
||||
|
||||
// todo need retry if failed
|
||||
int32_t code = walReaderSeekVer(pTask->exec.pWalReader, pTask->chkInfo.currentVer);
|
||||
int32_t code = walReaderSeekVer(pTask->exec.pWalReader, pTask->chkInfo.nextProcessVer);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
return code;
|
||||
}
|
||||
|
||||
// append the data for the stream
|
||||
tqDebug("vgId:%d s-task:%s wal reader seek to ver:%" PRId64, vgId, pTask->id.idStr, pTask->chkInfo.currentVer);
|
||||
tqDebug("vgId:%d s-task:%s wal reader seek to ver:%" PRId64, vgId, pTask->id.idStr, pTask->chkInfo.nextProcessVer);
|
||||
} else {
|
||||
int64_t currentVer = walReaderGetCurrentVer(pTask->exec.pWalReader);
|
||||
if (currentVer == -1) { // we only seek the read for the first time
|
||||
int32_t code = walReaderSeekVer(pTask->exec.pWalReader, pTask->chkInfo.currentVer);
|
||||
int32_t code = walReaderSeekVer(pTask->exec.pWalReader, pTask->chkInfo.nextProcessVer);
|
||||
if (code != TSDB_CODE_SUCCESS) { // no data in wal, quit
|
||||
return code;
|
||||
}
|
||||
|
||||
// append the data for the stream
|
||||
tqDebug("vgId:%d s-task:%s wal reader initial seek to ver:%" PRId64, vgId, pTask->id.idStr,
|
||||
pTask->chkInfo.currentVer);
|
||||
pTask->chkInfo.nextProcessVer);
|
||||
}
|
||||
}
|
||||
|
||||
int64_t skipToVer = walReaderGetSkipToVersion(pTask->exec.pWalReader);
|
||||
if (skipToVer != 0 && skipToVer > pTask->chkInfo.currentVer) {
|
||||
if (skipToVer != 0 && skipToVer > pTask->chkInfo.nextProcessVer) {
|
||||
int32_t code = walReaderSeekVer(pTask->exec.pWalReader, skipToVer);
|
||||
if (code != TSDB_CODE_SUCCESS) { // no data in wal, quit
|
||||
return code;
|
||||
|
@ -304,7 +304,7 @@ void handleFillhistoryScanComplete(SStreamTask* pTask, int64_t ver) {
|
|||
|
||||
if ((pTask->info.fillHistory == 1) && ver > pTask->dataRange.range.maxVer) {
|
||||
if (!pTask->status.appendTranstateBlock) {
|
||||
qWarn("s-task:%s fill-history scan WAL, currentVer:%" PRId64 " reach the maximum ver:%" PRId64
|
||||
qWarn("s-task:%s fill-history scan WAL, nextProcessVer:%" PRId64 " out of the maximum ver:%" PRId64
|
||||
", not scan wal anymore, add transfer-state block into inputQ",
|
||||
id, ver, maxVer);
|
||||
|
||||
|
@ -313,7 +313,7 @@ void handleFillhistoryScanComplete(SStreamTask* pTask, int64_t ver) {
|
|||
/*int32_t code = */streamTaskPutTranstateIntoInputQ(pTask);
|
||||
/*int32_t code = */ streamSchedExec(pTask);
|
||||
} else {
|
||||
qWarn("s-task:%s fill-history scan WAL, currentVer:%" PRId64 " reach the maximum ver:%" PRId64 ", not scan wal",
|
||||
qWarn("s-task:%s fill-history scan WAL, nextProcessVer:%" PRId64 " out of the maximum ver:%" PRId64 ", not scan wal",
|
||||
id, ver, maxVer);
|
||||
}
|
||||
}
|
||||
|
@ -396,25 +396,23 @@ int32_t doScanWalForAllTasks(SStreamMeta* pStreamMeta, bool* pScanIdle) {
|
|||
int32_t numOfItems = streamTaskGetInputQItems(pTask);
|
||||
int64_t maxVer = (pTask->info.fillHistory == 1) ? pTask->dataRange.range.maxVer : INT64_MAX;
|
||||
|
||||
taosThreadMutexLock(&pTask->lock);
|
||||
|
||||
pStatus = streamGetTaskStatusStr(pTask->status.taskStatus);
|
||||
if (pTask->status.taskStatus != TASK_STATUS__NORMAL) {
|
||||
tqDebug("s-task:%s not ready for submit block from wal, status:%s", pTask->id.idStr, pStatus);
|
||||
taosThreadMutexUnlock(&pTask->lock);
|
||||
streamMetaReleaseTask(pStreamMeta, pTask);
|
||||
continue;
|
||||
}
|
||||
|
||||
SStreamQueueItem* pItem = NULL;
|
||||
code = extractMsgFromWal(pTask->exec.pWalReader, (void**)&pItem, maxVer, pTask->id.idStr);
|
||||
|
||||
if ((code != TSDB_CODE_SUCCESS || pItem == NULL) && (numOfItems == 0)) { // failed, continue
|
||||
handleFillhistoryScanComplete(pTask, walReaderGetCurrentVer(pTask->exec.pWalReader));
|
||||
streamMetaReleaseTask(pStreamMeta, pTask);
|
||||
continue;
|
||||
}
|
||||
|
||||
taosThreadMutexLock(&pTask->lock);
|
||||
pStatus = streamGetTaskStatusStr(pTask->status.taskStatus);
|
||||
|
||||
if (pTask->status.taskStatus != TASK_STATUS__NORMAL) {
|
||||
tqDebug("s-task:%s not ready for submit block from wal, status:%s", pTask->id.idStr, pStatus);
|
||||
taosThreadMutexUnlock(&pTask->lock);
|
||||
streamMetaReleaseTask(pStreamMeta, pTask);
|
||||
if (pItem != NULL) {
|
||||
streamFreeQitem(pItem);
|
||||
}
|
||||
continue;
|
||||
}
|
||||
|
||||
|
@ -423,12 +421,12 @@ int32_t doScanWalForAllTasks(SStreamMeta* pStreamMeta, bool* pScanIdle) {
|
|||
code = streamTaskPutDataIntoInputQ(pTask, pItem);
|
||||
if (code == TSDB_CODE_SUCCESS) {
|
||||
int64_t ver = walReaderGetCurrentVer(pTask->exec.pWalReader);
|
||||
pTask->chkInfo.currentVer = ver;
|
||||
pTask->chkInfo.nextProcessVer = ver;
|
||||
handleFillhistoryScanComplete(pTask, ver);
|
||||
tqDebug("s-task:%s set the ver:%" PRId64 " from WALReader after extract block from WAL", pTask->id.idStr, ver);
|
||||
} else {
|
||||
tqError("s-task:%s append input queue failed, too many in inputQ, ver:%" PRId64, pTask->id.idStr,
|
||||
pTask->chkInfo.currentVer);
|
||||
pTask->chkInfo.nextProcessVer);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -756,7 +756,7 @@ int32_t vnodeProcessStreamMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo)
|
|||
case TDMT_VND_STREAM_TASK_CHECK:
|
||||
return tqProcessStreamTaskCheckReq(pVnode->pTq, pMsg);
|
||||
case TDMT_VND_STREAM_TASK_CHECK_RSP:
|
||||
return tqProcessStreamTaskCheckRsp(pVnode->pTq, 0, pMsg);
|
||||
return tqProcessStreamTaskCheckRsp(pVnode->pTq, pMsg);
|
||||
case TDMT_STREAM_RETRIEVE:
|
||||
return tqProcessTaskRetrieveReq(pVnode->pTq, pMsg);
|
||||
case TDMT_STREAM_RETRIEVE_RSP:
|
||||
|
@ -1443,8 +1443,11 @@ static int32_t vnodeProcessSubmitReq(SVnode *pVnode, int64_t ver, void *pReq, in
|
|||
|
||||
SColData *pColData = (SColData *)taosArrayGet(pSubmitTbData->aCol, 0);
|
||||
TSKEY *aKey = (TSKEY *)(pColData->pData);
|
||||
vDebug("vgId:%d submit %d rows data, uid:%"PRId64, TD_VID(pVnode), pColData->nVal, pSubmitTbData->uid);
|
||||
|
||||
for (int32_t iRow = 0; iRow < pColData->nVal; iRow++) {
|
||||
vDebug("vgId:%d uid:%"PRId64" ts:%"PRId64, TD_VID(pVnode), pSubmitTbData->uid, aKey[iRow]);
|
||||
|
||||
if (aKey[iRow] < minKey || aKey[iRow] > maxKey || (iRow > 0 && aKey[iRow] <= aKey[iRow - 1])) {
|
||||
code = TSDB_CODE_INVALID_MSG;
|
||||
vError("vgId:%d %s failed since %s, version:%" PRId64, TD_VID(pVnode), __func__, tstrerror(terrno), ver);
|
||||
|
@ -1456,10 +1459,13 @@ static int32_t vnodeProcessSubmitReq(SVnode *pVnode, int64_t ver, void *pReq, in
|
|||
int32_t nRow = TARRAY_SIZE(pSubmitTbData->aRowP);
|
||||
SRow **aRow = (SRow **)TARRAY_DATA(pSubmitTbData->aRowP);
|
||||
|
||||
vDebug("vgId:%d submit %d rows data, uid:%"PRId64, TD_VID(pVnode), nRow, pSubmitTbData->uid);
|
||||
for (int32_t iRow = 0; iRow < nRow; ++iRow) {
|
||||
vDebug("vgId:%d uid:%"PRId64" ts:%"PRId64, TD_VID(pVnode), pSubmitTbData->uid, aRow[iRow]->ts);
|
||||
|
||||
if (aRow[iRow]->ts < minKey || aRow[iRow]->ts > maxKey || (iRow > 0 && aRow[iRow]->ts <= aRow[iRow - 1]->ts)) {
|
||||
code = TSDB_CODE_INVALID_MSG;
|
||||
vError("vgId:%d %s failed since %s, version:%" PRId64, TD_VID(pVnode), __func__, tstrerror(terrno), ver);
|
||||
vError("vgId:%d %s failed since %s, version:%" PRId64, TD_VID(pVnode), __func__, tstrerror(code), ver);
|
||||
goto _exit;
|
||||
}
|
||||
}
|
||||
|
@ -1564,6 +1570,8 @@ static int32_t vnodeProcessSubmitReq(SVnode *pVnode, int64_t ver, void *pReq, in
|
|||
} else { // create table failed
|
||||
if (terrno != TSDB_CODE_TDB_TABLE_ALREADY_EXIST) {
|
||||
code = terrno;
|
||||
vError("vgId:%d failed to create table:%s, code:%s", TD_VID(pVnode), pSubmitTbData->pCreateTbReq->name,
|
||||
tstrerror(terrno));
|
||||
goto _exit;
|
||||
}
|
||||
terrno = 0;
|
||||
|
|
|
@ -2257,7 +2257,7 @@ FETCH_NEXT_BLOCK:
|
|||
int32_t current = pInfo->validBlockIndex++;
|
||||
SPackedData* pSubmit = taosArrayGet(pInfo->pBlockLists, current);
|
||||
|
||||
qDebug("set %d/%d as the input submit block, %s", current, totalBlocks, id);
|
||||
qDebug("set %d/%d as the input submit block, %s", current + 1, totalBlocks, id);
|
||||
if (pAPI->tqReaderFn.tqReaderSetSubmitMsg(pInfo->tqReader, pSubmit->msgStr, pSubmit->msgLen, pSubmit->ver) < 0) {
|
||||
qError("submit msg messed up when initializing stream submit block %p, current %d/%d, %s", pSubmit, current, totalBlocks, id);
|
||||
continue;
|
||||
|
|
|
@ -77,6 +77,8 @@ int32_t streamNotifyUpstreamContinue(SStreamTask* pTask);
|
|||
int32_t streamTaskFillHistoryFinished(SStreamTask* pTask);
|
||||
int32_t streamTransferStateToStreamTask(SStreamTask* pTask);
|
||||
|
||||
int32_t streamTaskInitTokenBucket(STokenBucket* pBucket, int32_t cap, int32_t rate);
|
||||
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
#endif
|
||||
|
|
|
@ -1591,7 +1591,6 @@ int32_t streamStateOpenBackendCf(void* backend, char* name, char** cfs, int32_t
|
|||
return 0;
|
||||
}
|
||||
int streamStateOpenBackend(void* backend, SStreamState* pState) {
|
||||
// qInfo("start to open state %p on backend %p 0x%" PRIx64 "-%d", pState, backend, pState->streamId, pState->taskId);
|
||||
taosAcquireRef(streamBackendId, pState->streamBackendRid);
|
||||
SBackendWrapper* handle = backend;
|
||||
SBackendCfWrapper* pBackendCfWrapper = taosMemoryCalloc(1, sizeof(SBackendCfWrapper));
|
||||
|
|
|
@ -287,7 +287,7 @@ int32_t streamSaveAllTaskStatus(SStreamMeta* pMeta, int64_t checkpointId) {
|
|||
streamTaskOpenAllUpstreamInput(p); // open inputQ for all upstream tasks
|
||||
qDebug("vgId:%d s-task:%s level:%d commit task status after checkpoint completed, checkpointId:%" PRId64
|
||||
", Ver(saved):%" PRId64 " currentVer:%" PRId64 ", status to be normal, prev:%s",
|
||||
pMeta->vgId, p->id.idStr, p->info.taskLevel, checkpointId, p->chkInfo.checkpointVer, p->chkInfo.currentVer,
|
||||
pMeta->vgId, p->id.idStr, p->info.taskLevel, checkpointId, p->chkInfo.checkpointVer, p->chkInfo.nextProcessVer,
|
||||
streamGetTaskStatusStr(prev));
|
||||
}
|
||||
|
||||
|
|
|
@ -1001,10 +1001,17 @@ int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp, i
|
|||
// so the TASK_INPUT_STATUS_BLOCKED is rsp
|
||||
if (pRsp->inputStatus == TASK_INPUT_STATUS__BLOCKED) {
|
||||
pTask->inputInfo.status = TASK_INPUT_STATUS__BLOCKED; // block the input of current task, to push pressure to upstream
|
||||
double el = 0;
|
||||
if (pTask->msgInfo.blockingTs == 0) {
|
||||
pTask->msgInfo.blockingTs = taosGetTimestampMs(); // record the blocking start time
|
||||
} else {
|
||||
el = (taosGetTimestampMs() - pTask->msgInfo.blockingTs) / 1000.0;
|
||||
}
|
||||
|
||||
int8_t ref = atomic_add_fetch_8(&pTask->status.timerActive, 1);
|
||||
qError("s-task:%s inputQ of downstream task:0x%x is full, time:%" PRId64 " wait for %dms and retry dispatch data, ref:%d",
|
||||
id, pRsp->downstreamTaskId, pTask->msgInfo.blockingTs, DISPATCH_RETRY_INTERVAL_MS, ref);
|
||||
qError("s-task:%s inputQ of downstream task:0x%x is full, time:%" PRId64
|
||||
" wait for %dms and retry dispatch data, total wait:%.2fSec ref:%d",
|
||||
id, pRsp->downstreamTaskId, pTask->msgInfo.blockingTs, DISPATCH_RETRY_INTERVAL_MS, el, ref);
|
||||
streamRetryDispatchStreamBlock(pTask, DISPATCH_RETRY_INTERVAL_MS);
|
||||
} else { // pipeline send data in output queue
|
||||
// this message has been sent successfully, let's try next one.
|
||||
|
|
|
@ -84,7 +84,7 @@ static int32_t streamTaskExecImpl(SStreamTask* pTask, SStreamQueueItem* pItem, i
|
|||
}
|
||||
|
||||
if (pTask->inputInfo.status == TASK_INPUT_STATUS__BLOCKED) {
|
||||
qWarn("s-task:%s downstream task inputQ blocked, idle for 1sec and retry", pTask->id.idStr);
|
||||
qWarn("s-task:%s downstream task inputQ blocked, idle for 1sec and retry exec task", pTask->id.idStr);
|
||||
taosMsleep(1000);
|
||||
continue;
|
||||
}
|
||||
|
@ -563,11 +563,11 @@ int32_t streamExecForAll(SStreamTask* pTask) {
|
|||
SIZE_IN_MB(resSize), totalBlocks);
|
||||
|
||||
// update the currentVer if processing the submit blocks.
|
||||
ASSERT(pTask->chkInfo.checkpointVer <= pTask->chkInfo.currentVer && ver >= pTask->chkInfo.checkpointVer);
|
||||
ASSERT(pTask->chkInfo.checkpointVer <= pTask->chkInfo.nextProcessVer && ver >= pTask->chkInfo.checkpointVer);
|
||||
|
||||
if (ver != pTask->chkInfo.checkpointVer) {
|
||||
qDebug("s-task:%s update checkpointVer(unsaved) from %" PRId64 " to %" PRId64, pTask->id.idStr,
|
||||
pTask->chkInfo.checkpointVer, ver);
|
||||
qDebug("s-task:%s update checkpointVer(unsaved) from %" PRId64 " to %" PRId64 " , currentVer:%" PRId64,
|
||||
pTask->id.idStr, pTask->chkInfo.checkpointVer, ver, pTask->chkInfo.nextProcessVer);
|
||||
pTask->chkInfo.checkpointVer = ver;
|
||||
}
|
||||
|
||||
|
|
|
@ -29,6 +29,8 @@ typedef struct SQueueReader {
|
|||
int32_t waitDuration; // maximum wait time to format several block into a batch to process, unit: ms
|
||||
} SQueueReader;
|
||||
|
||||
static bool streamTaskHasAvailableToken(STokenBucket* pBucket);
|
||||
|
||||
static void streamQueueCleanup(SStreamQueue* pQueue) {
|
||||
void* qItem = NULL;
|
||||
while ((qItem = streamQueueNextItem(pQueue)) != NULL) {
|
||||
|
@ -175,6 +177,14 @@ int32_t streamTaskGetDataFromInputQ(SStreamTask* pTask, SStreamQueueItem** pInpu
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
STokenBucket* pBucket = &pTask->tokenBucket;
|
||||
bool has = streamTaskHasAvailableToken(pBucket);
|
||||
if (!has) { // no available token in th bucket, ignore this execution
|
||||
// qInfo("s-task:%s no available token for sink, capacity:%d, rate:%d token/sec, quit", pTask->id.idStr,
|
||||
// pBucket->capacity, pBucket->rate);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
SStreamQueueItem* qItem = streamQueueNextItem(pTask->inputInfo.queue);
|
||||
if (qItem == NULL) {
|
||||
qDebug("===stream===break batchSize:%d, %s", *numOfBlocks, id);
|
||||
|
@ -264,9 +274,9 @@ int32_t streamTaskPutDataIntoInputQ(SStreamTask* pTask, SStreamQueueItem* pItem)
|
|||
if (type == STREAM_INPUT__DATA_SUBMIT) {
|
||||
SStreamDataSubmit* px = (SStreamDataSubmit*)pItem;
|
||||
if ((pTask->info.taskLevel == TASK_LEVEL__SOURCE) && streamQueueIsFull(pQueue)) {
|
||||
qError(
|
||||
"s-task:%s inputQ is full, capacity(size:%d num:%dMiB), current(blocks:%d, size:%.2fMiB) stop to push data",
|
||||
pTask->id.idStr, STREAM_TASK_INPUT_QUEUE_CAPACITY, STREAM_TASK_INPUT_QUEUE_CAPACITY_IN_SIZE, total, size);
|
||||
// qError(
|
||||
// "s-task:%s inputQ is full, capacity(size:%d num:%dMiB), current(blocks:%d, size:%.2fMiB) stop to push data",
|
||||
// pTask->id.idStr, STREAM_TASK_INPUT_QUEUE_CAPACITY, STREAM_TASK_INPUT_QUEUE_CAPACITY_IN_SIZE, total, size);
|
||||
streamDataSubmitDestroy(px);
|
||||
taosFreeQitem(pItem);
|
||||
return -1;
|
||||
|
@ -288,8 +298,8 @@ int32_t streamTaskPutDataIntoInputQ(SStreamTask* pTask, SStreamQueueItem* pItem)
|
|||
} else if (type == STREAM_INPUT__DATA_BLOCK || type == STREAM_INPUT__DATA_RETRIEVE ||
|
||||
type == STREAM_INPUT__REF_DATA_BLOCK) {
|
||||
if (streamQueueIsFull(pQueue)) {
|
||||
qError("s-task:%s input queue is full, capacity:%d size:%d MiB, current(blocks:%d, size:%.2fMiB) abort",
|
||||
pTask->id.idStr, STREAM_TASK_INPUT_QUEUE_CAPACITY, STREAM_TASK_INPUT_QUEUE_CAPACITY_IN_SIZE, total, size);
|
||||
// qError("s-task:%s input queue is full, capacity:%d size:%d MiB, current(blocks:%d, size:%.2fMiB) abort",
|
||||
// pTask->id.idStr, STREAM_TASK_INPUT_QUEUE_CAPACITY, STREAM_TASK_INPUT_QUEUE_CAPACITY_IN_SIZE, total, size);
|
||||
destroyStreamDataBlock((SStreamDataBlock*)pItem);
|
||||
return -1;
|
||||
}
|
||||
|
@ -320,3 +330,45 @@ int32_t streamTaskPutDataIntoInputQ(SStreamTask* pTask, SStreamQueueItem* pItem)
|
|||
|
||||
return 0;
|
||||
}
|
||||
|
||||
int32_t streamTaskInitTokenBucket(STokenBucket* pBucket, int32_t cap, int32_t rate) {
|
||||
if (cap < 100 || rate < 50 || pBucket == NULL) {
|
||||
qError("failed to init sink task bucket, cap:%d, rate:%d", cap, rate);
|
||||
return TSDB_CODE_INVALID_PARA;
|
||||
}
|
||||
|
||||
pBucket->capacity = cap;
|
||||
pBucket->rate = rate;
|
||||
pBucket->numOfToken = cap;
|
||||
pBucket->fillTimestamp = taosGetTimestampMs();
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static void fillBucket(STokenBucket* pBucket) {
|
||||
int64_t now = taosGetTimestampMs();
|
||||
int64_t delta = now - pBucket->fillTimestamp;
|
||||
ASSERT(pBucket->numOfToken >= 0);
|
||||
|
||||
int32_t inc = (delta / 1000.0) * pBucket->rate;
|
||||
if (inc > 0) {
|
||||
if ((pBucket->numOfToken + inc) < pBucket->capacity) {
|
||||
pBucket->numOfToken += inc;
|
||||
} else {
|
||||
pBucket->numOfToken = pBucket->capacity;
|
||||
}
|
||||
|
||||
pBucket->fillTimestamp = now;
|
||||
qDebug("new token available, current:%d, inc:%d ts:%"PRId64, pBucket->numOfToken, inc, now);
|
||||
}
|
||||
}
|
||||
|
||||
bool streamTaskHasAvailableToken(STokenBucket* pBucket) {
|
||||
fillBucket(pBucket);
|
||||
if (pBucket->numOfToken > 0) {
|
||||
// qDebug("current token:%d", pBucket->numOfToken);
|
||||
--pBucket->numOfToken;
|
||||
return true;
|
||||
} else {
|
||||
return false;
|
||||
}
|
||||
}
|
|
@ -30,6 +30,13 @@ static void streamTaskSetRangeStreamCalc(SStreamTask* pTask);
|
|||
static int32_t initScanHistoryReq(SStreamTask* pTask, SStreamScanHistoryReq* pReq, int8_t igUntreated);
|
||||
|
||||
static void streamTaskSetReady(SStreamTask* pTask, int32_t numOfReqs) {
|
||||
if (pTask->status.taskStatus == TASK_STATUS__SCAN_HISTORY && pTask->info.taskLevel != TASK_LEVEL__SOURCE) {
|
||||
pTask->numOfWaitingUpstream = taosArrayGetSize(pTask->pUpstreamInfoList);
|
||||
qDebug("s-task:%s level:%d task wait for %d upstream tasks complete scan-history procedure, status:%s",
|
||||
pTask->id.idStr, pTask->info.taskLevel, pTask->numOfWaitingUpstream,
|
||||
streamGetTaskStatusStr(pTask->status.taskStatus));
|
||||
}
|
||||
|
||||
ASSERT(pTask->status.downstreamReady == 0);
|
||||
pTask->status.downstreamReady = 1;
|
||||
|
||||
|
@ -97,11 +104,8 @@ int32_t streamTaskLaunchScanHistory(SStreamTask* pTask) {
|
|||
streamSetParamForScanHistory(pTask);
|
||||
streamTaskEnablePause(pTask);
|
||||
}
|
||||
|
||||
streamTaskScanHistoryPrepare(pTask);
|
||||
} else if (pTask->info.taskLevel == TASK_LEVEL__SINK) {
|
||||
qDebug("s-task:%s sink task do nothing to handle scan-history", pTask->id.idStr);
|
||||
streamTaskScanHistoryPrepare(pTask);
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
@ -402,15 +406,6 @@ int32_t streamTaskPutTranstateIntoInputQ(SStreamTask* pTask) {
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
// agg
|
||||
int32_t streamTaskScanHistoryPrepare(SStreamTask* pTask) {
|
||||
pTask->numOfWaitingUpstream = taosArrayGetSize(pTask->pUpstreamInfoList);
|
||||
qDebug("s-task:%s level:%d task wait for %d upstream tasks complete scan-history procedure, status:%s",
|
||||
pTask->id.idStr, pTask->info.taskLevel, pTask->numOfWaitingUpstream,
|
||||
streamGetTaskStatusStr(pTask->status.taskStatus));
|
||||
return 0;
|
||||
}
|
||||
|
||||
int32_t streamAggUpstreamScanHistoryFinish(SStreamTask* pTask) {
|
||||
void* exec = pTask->exec.pExecutor;
|
||||
if (pTask->info.fillHistory && qRestoreStreamOperatorOption(exec) < 0) {
|
||||
|
@ -509,7 +504,8 @@ int32_t streamProcessScanHistoryFinishRsp(SStreamTask* pTask) {
|
|||
|
||||
static void checkFillhistoryTaskStatus(SStreamTask* pTask, SStreamTask* pHTask) {
|
||||
pHTask->dataRange.range.minVer = 0;
|
||||
pHTask->dataRange.range.maxVer = pTask->chkInfo.currentVer;
|
||||
// the query version range should be limited to the already processed data
|
||||
pHTask->dataRange.range.maxVer = pTask->chkInfo.nextProcessVer - 1;
|
||||
|
||||
if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) {
|
||||
qDebug("s-task:%s set the launch condition for fill-history s-task:%s, window:%" PRId64 " - %" PRId64
|
||||
|
|
|
@ -375,16 +375,17 @@ int32_t streamTaskInit(SStreamTask* pTask, SStreamMeta* pMeta, SMsgCb* pMsgCb, i
|
|||
return -1;
|
||||
}
|
||||
|
||||
pTask->tsInfo.init = taosGetTimestampMs();
|
||||
pTask->tsInfo.created = taosGetTimestampMs();
|
||||
pTask->inputInfo.status = TASK_INPUT_STATUS__NORMAL;
|
||||
pTask->outputInfo.status = TASK_OUTPUT_STATUS__NORMAL;
|
||||
pTask->pMeta = pMeta;
|
||||
|
||||
pTask->chkInfo.currentVer = ver;
|
||||
pTask->chkInfo.nextProcessVer = ver;
|
||||
pTask->dataRange.range.maxVer = ver;
|
||||
pTask->dataRange.range.minVer = ver;
|
||||
pTask->pMsgCb = pMsgCb;
|
||||
|
||||
streamTaskInitTokenBucket(&pTask->tokenBucket, 150, 100);
|
||||
taosThreadMutexInit(&pTask->lock, NULL);
|
||||
streamTaskOpenAllUpstreamInput(pTask);
|
||||
|
||||
|
|
|
@ -59,7 +59,7 @@ SSyncLogStore* logStoreCreate(SSyncNode* pSyncNode) {
|
|||
ASSERT(pData->pWal != NULL);
|
||||
|
||||
taosThreadMutexInit(&(pData->mutex), NULL);
|
||||
pData->pWalHandle = walOpenReader(pData->pWal, NULL);
|
||||
pData->pWalHandle = walOpenReader(pData->pWal, NULL, 0);
|
||||
ASSERT(pData->pWalHandle != NULL);
|
||||
|
||||
pLogStore->syncLogUpdateCommitIndex = raftLogUpdateCommitIndex;
|
||||
|
|
|
@ -395,7 +395,7 @@ static void uvOnPipeWriteCb(uv_write_t* req, int status) {
|
|||
if (status == 0) {
|
||||
tTrace("success to dispatch conn to work thread");
|
||||
} else {
|
||||
tError("fail to dispatch conn to work thread");
|
||||
tError("fail to dispatch conn to work thread, code:%s", uv_strerror(status));
|
||||
}
|
||||
if (!uv_is_closing((uv_handle_t*)req->data)) {
|
||||
uv_close((uv_handle_t*)req->data, uvFreeCb);
|
||||
|
|
|
@ -16,7 +16,7 @@
|
|||
#include "taoserror.h"
|
||||
#include "walInt.h"
|
||||
|
||||
SWalReader *walOpenReader(SWal *pWal, SWalFilterCond *cond) {
|
||||
SWalReader *walOpenReader(SWal *pWal, SWalFilterCond *cond, int64_t id) {
|
||||
SWalReader *pReader = taosMemoryCalloc(1, sizeof(SWalReader));
|
||||
if (pReader == NULL) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
|
@ -24,7 +24,7 @@ SWalReader *walOpenReader(SWal *pWal, SWalFilterCond *cond) {
|
|||
}
|
||||
|
||||
pReader->pWal = pWal;
|
||||
pReader->readerId = tGenIdPI64();
|
||||
pReader->readerId = (id != 0)? id:tGenIdPI64();
|
||||
pReader->pIdxFile = NULL;
|
||||
pReader->pLogFile = NULL;
|
||||
pReader->curVersion = -1;
|
||||
|
@ -75,6 +75,7 @@ int32_t walNextValidMsg(SWalReader *pReader) {
|
|||
terrno = TSDB_CODE_WAL_LOG_NOT_EXIST;
|
||||
return -1;
|
||||
}
|
||||
|
||||
while (fetchVer <= appliedVer) {
|
||||
if (walFetchHead(pReader, fetchVer) < 0) {
|
||||
return -1;
|
||||
|
@ -257,9 +258,9 @@ int32_t walFetchHead(SWalReader *pRead, int64_t ver) {
|
|||
bool seeked = false;
|
||||
|
||||
wDebug("vgId:%d, try to fetch ver %" PRId64 ", first ver:%" PRId64 ", commit ver:%" PRId64 ", last ver:%" PRId64
|
||||
", applied ver:%" PRId64,
|
||||
", applied ver:%" PRId64", 0x%"PRIx64,
|
||||
pRead->pWal->cfg.vgId, ver, pRead->pWal->vers.firstVer, pRead->pWal->vers.commitVer, pRead->pWal->vers.lastVer,
|
||||
pRead->pWal->vers.appliedVer);
|
||||
pRead->pWal->vers.appliedVer, pRead->readerId);
|
||||
|
||||
// TODO: valid ver
|
||||
if (ver > pRead->pWal->vers.commitVer) {
|
||||
|
@ -297,7 +298,8 @@ int32_t walFetchHead(SWalReader *pRead, int64_t ver) {
|
|||
code = walValidHeadCksum(pRead->pHead);
|
||||
|
||||
if (code != 0) {
|
||||
wError("vgId:%d, unexpected wal log index:%" PRId64 ", since head checksum not passed", pRead->pWal->cfg.vgId, ver);
|
||||
wError("vgId:%d, unexpected wal log index:%" PRId64 ", since head checksum not passed, 0x%"PRIx64, pRead->pWal->cfg.vgId, ver,
|
||||
pRead->readerId);
|
||||
terrno = TSDB_CODE_WAL_FILE_CORRUPTED;
|
||||
return -1;
|
||||
}
|
||||
|
@ -307,9 +309,9 @@ int32_t walFetchHead(SWalReader *pRead, int64_t ver) {
|
|||
|
||||
int32_t walSkipFetchBody(SWalReader *pRead) {
|
||||
wDebug("vgId:%d, skip fetch body %" PRId64 ", first ver:%" PRId64 ", commit ver:%" PRId64 ", last ver:%" PRId64
|
||||
", applied ver:%" PRId64,
|
||||
", applied ver:%" PRId64", 0x%"PRIx64,
|
||||
pRead->pWal->cfg.vgId, pRead->pHead->head.version, pRead->pWal->vers.firstVer, pRead->pWal->vers.commitVer,
|
||||
pRead->pWal->vers.lastVer, pRead->pWal->vers.appliedVer);
|
||||
pRead->pWal->vers.lastVer, pRead->pWal->vers.appliedVer, pRead->readerId);
|
||||
|
||||
int64_t code = taosLSeekFile(pRead->pLogFile, pRead->pHead->head.bodyLen, SEEK_CUR);
|
||||
if (code < 0) {
|
||||
|
@ -324,11 +326,13 @@ int32_t walSkipFetchBody(SWalReader *pRead) {
|
|||
int32_t walFetchBody(SWalReader *pRead) {
|
||||
SWalCont *pReadHead = &pRead->pHead->head;
|
||||
int64_t ver = pReadHead->version;
|
||||
int32_t vgId = pRead->pWal->cfg.vgId;
|
||||
int64_t id = pRead->readerId;
|
||||
|
||||
wDebug("vgId:%d, fetch body %" PRId64 ", first ver:%" PRId64 ", commit ver:%" PRId64 ", last ver:%" PRId64
|
||||
", applied ver:%" PRId64,
|
||||
pRead->pWal->cfg.vgId, ver, pRead->pWal->vers.firstVer, pRead->pWal->vers.commitVer, pRead->pWal->vers.lastVer,
|
||||
pRead->pWal->vers.appliedVer);
|
||||
", applied ver:%" PRId64 ", 0x%" PRIx64,
|
||||
vgId, ver, pRead->pWal->vers.firstVer, pRead->pWal->vers.commitVer, pRead->pWal->vers.lastVer,
|
||||
pRead->pWal->vers.appliedVer, id);
|
||||
|
||||
if (pRead->capacity < pReadHead->bodyLen) {
|
||||
SWalCkHead *ptr = (SWalCkHead *)taosMemoryRealloc(pRead->pHead, sizeof(SWalCkHead) + pReadHead->bodyLen);
|
||||
|
@ -344,26 +348,25 @@ int32_t walFetchBody(SWalReader *pRead) {
|
|||
if (pReadHead->bodyLen != taosReadFile(pRead->pLogFile, pReadHead->body, pReadHead->bodyLen)) {
|
||||
if (pReadHead->bodyLen < 0) {
|
||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||
wError("vgId:%d, wal fetch body error:%" PRId64 ", read request index:%" PRId64 ", since %s",
|
||||
pRead->pWal->cfg.vgId, pReadHead->version, ver, tstrerror(terrno));
|
||||
wError("vgId:%d, wal fetch body error:%" PRId64 ", read request index:%" PRId64 ", since %s, 0x%"PRIx64,
|
||||
vgId, pReadHead->version, ver, tstrerror(terrno), id);
|
||||
} else {
|
||||
wError("vgId:%d, wal fetch body error:%" PRId64 ", read request index:%" PRId64 ", since file corrupted",
|
||||
pRead->pWal->cfg.vgId, pReadHead->version, ver);
|
||||
wError("vgId:%d, wal fetch body error:%" PRId64 ", read request index:%" PRId64 ", since file corrupted, 0x%"PRIx64,
|
||||
vgId, pReadHead->version, ver, id);
|
||||
terrno = TSDB_CODE_WAL_FILE_CORRUPTED;
|
||||
}
|
||||
return -1;
|
||||
}
|
||||
|
||||
if (pReadHead->version != ver) {
|
||||
wError("vgId:%d, wal fetch body error, index:%" PRId64 ", read request index:%" PRId64, pRead->pWal->cfg.vgId,
|
||||
pReadHead->version, ver);
|
||||
wError("vgId:%d, wal fetch body error, index:%" PRId64 ", read request index:%" PRId64", 0x%"PRIx64, vgId,
|
||||
pReadHead->version, ver, id);
|
||||
terrno = TSDB_CODE_WAL_FILE_CORRUPTED;
|
||||
return -1;
|
||||
}
|
||||
|
||||
if (walValidBodyCksum(pRead->pHead) != 0) {
|
||||
wError("vgId:%d, wal fetch body error, index:%" PRId64 ", since body checksum not passed", pRead->pWal->cfg.vgId,
|
||||
ver);
|
||||
wError("vgId:%d, wal fetch body error, index:%" PRId64 ", since body checksum not passed, 0x%" PRIx64, vgId, ver, id);
|
||||
terrno = TSDB_CODE_WAL_FILE_CORRUPTED;
|
||||
return -1;
|
||||
}
|
||||
|
|
|
@ -326,7 +326,7 @@ TEST_F(WalCleanDeleteEnv, roll) {
|
|||
TEST_F(WalKeepEnv, readHandleRead) {
|
||||
walResetEnv();
|
||||
int code;
|
||||
SWalReader* pRead = walOpenReader(pWal, NULL);
|
||||
SWalReader* pRead = walOpenReader(pWal, NULL, 0);
|
||||
ASSERT(pRead != NULL);
|
||||
|
||||
int i;
|
||||
|
@ -387,7 +387,7 @@ TEST_F(WalRetentionEnv, repairMeta1) {
|
|||
|
||||
ASSERT_EQ(pWal->vers.lastVer, 99);
|
||||
|
||||
SWalReader* pRead = walOpenReader(pWal, NULL);
|
||||
SWalReader* pRead = walOpenReader(pWal, NULL, 0);
|
||||
ASSERT(pRead != NULL);
|
||||
|
||||
for (int i = 0; i < 1000; i++) {
|
||||
|
|
|
@ -6,7 +6,8 @@ from util.cases import *
|
|||
from util.common import *
|
||||
|
||||
class TDTestCase:
|
||||
updatecfgDict = {'debugFlag': 135, 'asynclog': 0}
|
||||
updatecfgDict = {'vdebugFlag': 143, 'qdebugflag':135, 'tqdebugflag':135, 'udebugflag':135, 'rpcdebugflag':135,
|
||||
'asynclog': 0}
|
||||
def init(self, conn, logSql, replicaVar=1):
|
||||
self.replicaVar = int(replicaVar)
|
||||
tdLog.debug("start to execute %s" % __file__)
|
||||
|
|
Loading…
Reference in New Issue