Merge pull request #27882 from taosdata/fix/3_liaohj

refactor: check return value for stream.
This commit is contained in:
Haojun Liao 2024-09-14 18:51:59 +08:00 committed by GitHub
commit 1a4c09cffa
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
26 changed files with 245 additions and 172 deletions

View File

@ -74,7 +74,7 @@ typedef enum {
* @param vgId
* @return
*/
qTaskInfo_t qCreateStreamExecTaskInfo(void* msg, SReadHandle* readers, int32_t vgId, int32_t taskId);
int32_t qCreateStreamExecTaskInfo(qTaskInfo_t* pInfo, void* msg, SReadHandle* readers, int32_t vgId, int32_t taskId);
/**
* Create the exec task for queue mode
@ -93,7 +93,7 @@ int32_t qGetTableList(int64_t suid, void* pVnode, void* node, SArray **tableList
* @param taskId
* @param queryId
*/
void qSetTaskId(qTaskInfo_t tinfo, uint64_t taskId, uint64_t queryId);
int32_t qSetTaskId(qTaskInfo_t tinfo, uint64_t taskId, uint64_t queryId);
int32_t qSetStreamOpOpen(qTaskInfo_t tinfo);

View File

@ -173,7 +173,7 @@ typedef struct TsdReader {
int32_t (*tsdReaderOpen)(void* pVnode, SQueryTableDataCond* pCond, void* pTableList, int32_t numOfTables,
SSDataBlock* pResBlock, void** ppReader, const char* idstr, SHashObj** pIgnoreTables);
void (*tsdReaderClose)();
void (*tsdSetReaderTaskId)(void *pReader, const char *pId);
int32_t (*tsdSetReaderTaskId)(void *pReader, const char *pId);
int32_t (*tsdSetQueryTableList)();
int32_t (*tsdNextDataBlock)();

View File

@ -673,7 +673,7 @@ int8_t streamTaskSetSchedStatusInactive(SStreamTask* pTask);
int32_t streamTaskClearHTaskAttr(SStreamTask* pTask, int32_t clearRelHalt);
int32_t streamExecTask(SStreamTask* pTask);
void streamResumeTask(SStreamTask* pTask);
int32_t streamResumeTask(SStreamTask* pTask);
int32_t streamTrySchedExec(SStreamTask* pTask);
int32_t streamTaskSchedTask(SMsgCb* pMsgCb, int32_t vgId, int64_t streamId, int32_t taskId, int32_t execType);
void streamTaskResumeInFuture(SStreamTask* pTask);

View File

@ -978,6 +978,7 @@ int32_t taosGetErrSize();
#define TSDB_CODE_STREAM_NOT_LEADER TAOS_DEF_ERROR_CODE(0, 0x4105)
#define TSDB_CODE_STREAM_CONFLICT_EVENT TAOS_DEF_ERROR_CODE(0, 0x4106)
#define TSDB_CODE_STREAM_INTERNAL_ERROR TAOS_DEF_ERROR_CODE(0, 0x4107)
#define TSDB_CODE_STREAM_INPUTQ_FULL TAOS_DEF_ERROR_CODE(0, 0x4108)
// TDLite
#define TSDB_CODE_TDLITE_IVLD_OPEN_FLAGS TAOS_DEF_ERROR_CODE(0, 0x5100)

View File

@ -164,7 +164,7 @@ typedef struct STsdbReader STsdbReader;
int32_t tsdbReaderOpen2(void *pVnode, SQueryTableDataCond *pCond, void *pTableList, int32_t numOfTables,
SSDataBlock *pResBlock, void **ppReader, const char *idstr, SHashObj **pIgnoreTables);
int32_t tsdbSetTableList2(STsdbReader *pReader, const void *pTableList, int32_t num);
void tsdbReaderSetId2(STsdbReader *pReader, const char *idstr);
int32_t tsdbReaderSetId(void *pReader, const char *idstr);
void tsdbReaderClose2(STsdbReader *pReader);
int32_t tsdbNextDataBlock2(STsdbReader *pReader, bool *hasNext);
int32_t tsdbRetrieveDatablockSMA2(STsdbReader *pReader, SSDataBlock *pDataBlock, bool *allHave, bool *hasNullSMA);

View File

@ -314,8 +314,9 @@ static int32_t tdSetRSmaInfoItemParams(SSma *pSma, SRSmaParam *param, SRSmaStat
SReadHandle handle = {.vnode = pVnode, .initTqReader = 1, .skipRollup = 1, .pStateBackend = pStreamState};
initStorageAPI(&handle.api);
pRSmaInfo->taskInfo[idx] = qCreateStreamExecTaskInfo(param->qmsg[idx], &handle, TD_VID(pVnode), 0);
if (!pRSmaInfo->taskInfo[idx]) {
code = qCreateStreamExecTaskInfo(&pRSmaInfo->taskInfo[idx], param->qmsg[idx], &handle, TD_VID(pVnode), 0);
if (!pRSmaInfo->taskInfo[idx] || (code != 0)) {
TAOS_RETURN(TSDB_CODE_RSMA_QTASKINFO_CREATE);
}

View File

@ -102,7 +102,6 @@ int32_t tqOpen(const char* path, SVnode* pVnode) {
int32_t tqInitialize(STQ* pTq) {
int32_t vgId = TD_VID(pTq->pVnode);
int32_t code = streamMetaOpen(pTq->path, pTq, tqBuildStreamTask, tqExpandStreamTask, vgId, -1,
tqStartTaskCompleteCallback, &pTq->pStreamMeta);
if (code != TSDB_CODE_SUCCESS) {
@ -110,7 +109,6 @@ int32_t tqInitialize(STQ* pTq) {
}
streamMetaLoadAllTasks(pTq->pStreamMeta);
return tqMetaOpen(pTq);
}
@ -713,8 +711,7 @@ end:
static void freePtr(void* ptr) { taosMemoryFree(*(void**)ptr); }
int32_t tqBuildStreamTask(void* pTqObj, SStreamTask* pTask, int64_t nextProcessVer) {
STQ* pTq = (STQ*)pTqObj;
STQ* pTq = (STQ*)pTqObj;
int32_t vgId = TD_VID(pTq->pVnode);
tqDebug("s-task:0x%x start to build task", pTask->id.taskId);
@ -744,16 +741,25 @@ int32_t tqBuildStreamTask(void* pTqObj, SStreamTask* pTask, int64_t nextProcessV
SSchemaWrapper* pschemaWrapper = pOutputInfo->tbSink.pSchemaWrapper;
pOutputInfo->tbSink.pTSchema = tBuildTSchema(pschemaWrapper->pSchema, pschemaWrapper->nCols, ver1);
if (pOutputInfo->tbSink.pTSchema == NULL) {
return -1;
return terrno;
}
pOutputInfo->tbSink.pTblInfo = tSimpleHashInit(10240, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT));
if (pOutputInfo->tbSink.pTblInfo == NULL) {
tqError("vgId:%d failed init sink tableInfo, code:%s", vgId, tstrerror(terrno));
return terrno;
}
tSimpleHashSetFreeFp(pOutputInfo->tbSink.pTblInfo, freePtr);
}
if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) {
SWalFilterCond cond = {.deleteMsg = 1}; // delete msg also extract from wal files
pTask->exec.pWalReader = walOpenReader(pTq->pVnode->pWal, &cond, pTask->id.taskId);
if (pTask->exec.pWalReader == NULL) {
tqError("vgId:%d failed init wal reader, code:%s", vgId, tstrerror(terrno));
return terrno;
}
}
streamTaskResetUpstreamStageInfo(pTask);
@ -1007,9 +1013,13 @@ int32_t tqProcessTaskRunReq(STQ* pTq, SRpcMsg* pMsg) {
}
int32_t code = tqStreamTaskProcessRunReq(pTq->pStreamMeta, pMsg, vnodeIsRoleLeader(pTq->pVnode));
if (code) {
tqError("vgId:%d failed to create task run req, code:%s", TD_VID(pTq->pVnode), tstrerror(code));
return code;
}
// let's continue scan data in the wal files
if (code == 0 && (pReq->reqType >= 0 || pReq->reqType == STREAM_EXEC_T_RESUME_TASK)) {
if (pReq->reqType >= 0 || pReq->reqType == STREAM_EXEC_T_RESUME_TASK) {
code = tqScanWalAsync(pTq, false); // it's ok to failed
if (code) {
tqError("vgId:%d failed to start scan wal file, code:%s", pTq->pStreamMeta->vgId, tstrerror(code));

View File

@ -351,11 +351,8 @@ int32_t extractMsgFromWal(SWalReader* pReader, void** pItem, int64_t maxVer, con
if (data == NULL) {
// todo: for all stream in this vnode, keep this offset in the offset files, and wait for a moment, and then
// retry
code = TSDB_CODE_OUT_OF_MEMORY;
terrno = code;
tqError("vgId:%d, failed to copy submit data for stream processing, since out of memory", 0);
return code;
return terrno;
}
(void)memcpy(data, pBody, len);

View File

@ -23,7 +23,7 @@ static int32_t doScanWalForAllTasks(SStreamMeta* pStreamMeta, bool* pScanIdle);
static int32_t setWalReaderStartOffset(SStreamTask* pTask, int32_t vgId);
static bool handleFillhistoryScanComplete(SStreamTask* pTask, int64_t ver);
static bool taskReadyForDataFromWal(SStreamTask* pTask);
static bool doPutDataIntoInputQ(SStreamTask* pTask, int64_t maxVer, int32_t* numOfItems);
static int32_t doPutDataIntoInputQ(SStreamTask* pTask, int64_t maxVer, int32_t* numOfItems, bool* pSucc);
static int32_t tqScanWalInFuture(STQ* pTq, int32_t numOfTasks, int32_t idleDuration);
// extract data blocks(submit/delete) from WAL, and add them into the input queue for all the sources tasks.
@ -40,7 +40,7 @@ int32_t tqScanWal(STQ* pTq) {
int32_t code = doScanWalForAllTasks(pMeta, &shouldIdle);
if (code) {
tqError("vgId:%d failed to start all tasks, try next time", vgId);
tqError("vgId:%d failed to start all tasks, try next time, code:%s", vgId, tstrerror(code));
return code;
}
@ -293,9 +293,11 @@ bool taskReadyForDataFromWal(SStreamTask* pTask) {
return true;
}
bool doPutDataIntoInputQ(SStreamTask* pTask, int64_t maxVer, int32_t* numOfItems) {
int32_t doPutDataIntoInputQ(SStreamTask* pTask, int64_t maxVer, int32_t* numOfItems, bool* pSucc) {
const char* id = pTask->id.idStr;
int32_t numOfNewItems = 0;
int32_t code = 0;
*pSucc = false;
while (1) {
if ((pTask->info.fillHistory == 1) && pTask->status.appendTranstateBlock) {
@ -304,7 +306,7 @@ bool doPutDataIntoInputQ(SStreamTask* pTask, int64_t maxVer, int32_t* numOfItems
}
SStreamQueueItem* pItem = NULL;
int32_t code = extractMsgFromWal(pTask->exec.pWalReader, (void**)&pItem, maxVer, id);
code = extractMsgFromWal(pTask->exec.pWalReader, (void**)&pItem, maxVer, id);
if (code != TSDB_CODE_SUCCESS || pItem == NULL) { // failed, continue
int64_t currentVer = walReaderGetCurrentVer(pTask->exec.pWalReader);
bool itemInFillhistory = handleFillhistoryScanComplete(pTask, currentVer);
@ -327,10 +329,17 @@ bool doPutDataIntoInputQ(SStreamTask* pTask, int64_t maxVer, int32_t* numOfItems
break;
}
} else {
tqTrace("s-task:%s append input queue failed, code:too many items, ver:%" PRId64, id, pTask->chkInfo.nextProcessVer);
code = walReaderSeekVer(pTask->exec.pWalReader, pTask->chkInfo.nextProcessVer);
if (code) {
tqError("s-task:%s failed to seek ver to:%"PRId64 " in wal", id, pTask->chkInfo.nextProcessVer);
if (code == TSDB_CODE_OUT_OF_MEMORY) {
tqError("s-task:%s failed to put data into inputQ, since out of memory", id);
} else {
tqTrace("s-task:%s append input queue failed, code:inputQ is full, ver:%" PRId64, id,
pTask->chkInfo.nextProcessVer);
code = walReaderSeekVer(pTask->exec.pWalReader, pTask->chkInfo.nextProcessVer);
if (code) {
tqError("s-task:%s failed to seek ver to:%" PRId64 " in wal", id, pTask->chkInfo.nextProcessVer);
}
code = 0; // reset the error code
}
break;
@ -339,7 +348,8 @@ bool doPutDataIntoInputQ(SStreamTask* pTask, int64_t maxVer, int32_t* numOfItems
}
*numOfItems += numOfNewItems;
return numOfNewItems > 0;
*pSucc = (numOfNewItems > 0);
return code;
}
int32_t doScanWalForAllTasks(SStreamMeta* pStreamMeta, bool* pScanIdle) {
@ -358,6 +368,7 @@ int32_t doScanWalForAllTasks(SStreamMeta* pStreamMeta, bool* pScanIdle) {
pTaskList = taosArrayDup(pStreamMeta->pTaskList, NULL);
streamMetaWUnLock(pStreamMeta);
if (pTaskList == NULL) {
tqError("vgId:%d failed to create task list dup, code:%s", vgId, tstrerror(terrno));
return terrno;
}
@ -405,7 +416,8 @@ int32_t doScanWalForAllTasks(SStreamMeta* pStreamMeta, bool* pScanIdle) {
continue;
}
bool hasNewData = doPutDataIntoInputQ(pTask, maxVer, &numOfItems);
bool hasNewData = false;
code = doPutDataIntoInputQ(pTask, maxVer, &numOfItems, &hasNewData);
streamMutexUnlock(&pTask->lock);
if ((numOfItems > 0) || hasNewData) {

View File

@ -145,12 +145,13 @@ static int32_t extractDataAndRspForNormalSubscribe(STQ* pTq, STqHandle* pHandle,
terrno = 0;
SMqDataRsp dataRsp = {0};
int code = tqInitDataRsp(&dataRsp.common, *pOffset);
int code = tqInitDataRsp(&dataRsp.common, *pOffset);
if (code != 0) {
goto end;
}
qSetTaskId(pHandle->execHandle.task, consumerId, pRequest->reqId);
code = qSetTaskId(pHandle->execHandle.task, consumerId, pRequest->reqId);
code = tqScanData(pTq, pHandle, &dataRsp, pOffset, pRequest);
if (code != 0 && terrno != TSDB_CODE_WAL_LOG_NOT_EXIST) {
goto end;

View File

@ -36,6 +36,7 @@ int32_t tqExpandStreamTask(SStreamTask* pTask) {
int64_t st = taosGetTimestampMs();
int64_t streamId = 0;
int32_t taskId = 0;
int32_t code = 0;
tqDebug("s-task:%s vgId:%d start to expand stream task", pTask->id.idStr, vgId);
@ -52,7 +53,7 @@ int32_t tqExpandStreamTask(SStreamTask* pTask) {
pTask->pState = streamStateOpen(pMeta->path, pTask, streamId, taskId);
if (pTask->pState == NULL) {
tqError("s-task:%s (vgId:%d) failed to open state for task, expand task failed", pTask->id.idStr, vgId);
return -1;
return terrno;
} else {
tqDebug("s-task:%s state:%p", pTask->id.idStr, pTask->pState);
}
@ -75,18 +76,23 @@ int32_t tqExpandStreamTask(SStreamTask* pTask) {
initStorageAPI(&handle.api);
if (pTask->info.taskLevel == TASK_LEVEL__SOURCE || pTask->info.taskLevel == TASK_LEVEL__AGG) {
pTask->exec.pExecutor = qCreateStreamExecTaskInfo(pTask->exec.qmsg, &handle, vgId, pTask->id.taskId);
if (pTask->exec.pExecutor == NULL) {
tqError("s-task:%s failed to create exec taskInfo, failed to expand task", pTask->id.idStr);
return -1;
code = qCreateStreamExecTaskInfo(&pTask->exec.pExecutor, pTask->exec.qmsg, &handle, vgId, pTask->id.taskId);
if (code) {
tqError("s-task:%s failed to expand task, code:%s", pTask->id.idStr, tstrerror(code));
return code;
}
code = qSetTaskId(pTask->exec.pExecutor, pTask->id.taskId, pTask->id.streamId);
if (code) {
return code;
}
qSetTaskId(pTask->exec.pExecutor, pTask->id.taskId, pTask->id.streamId);
}
double el = (taosGetTimestampMs() - st) / 1000.0;
tqDebug("s-task:%s vgId:%d expand stream task completed, elapsed time:%.2fsec", pTask->id.idStr, vgId, el);
return TSDB_CODE_SUCCESS;
return code;
}
void tqSetRestoreVersionInfo(SStreamTask* pTask) {
@ -840,7 +846,7 @@ int32_t tqStreamTaskProcessRunReq(SStreamMeta* pMeta, SRpcMsg* pMsg, bool isLead
int32_t idle = taosGetTimestampMs() - execTs;
tqDebug("s-task:%s task resume to run after idle for:%dms from:%" PRId64, pTask->id.idStr, idle, execTs);
streamResumeTask(pTask);
code = streamResumeTask(pTask);
} else {
int8_t status = streamTaskSetSchedStatusInactive(pTask);
tqDebug("vgId:%d s-task:%s ignore run req since not in ready state, status:%s, sched-status:%d", vgId,
@ -849,7 +855,7 @@ int32_t tqStreamTaskProcessRunReq(SStreamMeta* pMeta, SRpcMsg* pMsg, bool isLead
streamMetaReleaseTask(pMeta, pTask);
}
return 0;
return code;
}
SStreamTask* pTask = NULL;

View File

@ -5986,13 +5986,18 @@ void tsdbUntakeReadSnap2(STsdbReader* pReader, STsdbReadSnap* pSnap, bool proact
}
// if failed, do nothing
void tsdbReaderSetId2(STsdbReader* pReader, const char* idstr) {
int32_t tsdbReaderSetId(void* p, const char* idstr) {
STsdbReader* pReader = (STsdbReader*) p;
taosMemoryFreeClear(pReader->idStr);
pReader->idStr = taosStrdup(idstr);
if (pReader->idStr == NULL) {
// no need to do anything
tsdbError("%s failed to build reader id, code:%s", idstr, tstrerror(terrno));
return terrno;
}
pReader->status.fileIter.pSttBlockReader->mergeTree.idStr = pReader->idStr;
return 0;
}
void tsdbReaderSetCloseFlag(STsdbReader* pReader) { /*pReader->code = TSDB_CODE_TSC_QUERY_CANCELLED;*/ }

View File

@ -59,7 +59,7 @@ void initTsdbReaderAPI(TsdReader* pReader) {
pReader->tsdReaderGetNumOfInMemRows = tsdbGetNumOfRowsInMemTable2; // todo this function should be moved away
pReader->tsdSetQueryTableList = tsdbSetTableList2;
pReader->tsdSetReaderTaskId = (void (*)(void*, const char*))tsdbReaderSetId2;
pReader->tsdSetReaderTaskId = tsdbReaderSetId;
pReader->tsdSetFilesetDelimited = (void (*)(void*))tsdbSetFilesetDelimited;
pReader->tsdSetSetNotifyCb = (void (*)(void*, TsdReaderNotifyCbFn, void*))tsdbReaderSetNotifyCb;

View File

@ -204,28 +204,34 @@ _end:
return code;
}
void doSetTaskId(SOperatorInfo* pOperator, SStorageAPI* pAPI) {
int32_t doSetTaskId(SOperatorInfo* pOperator, SStorageAPI* pAPI) {
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
if (pOperator->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) {
SStreamScanInfo* pStreamScanInfo = pOperator->info;
if (pStreamScanInfo->pTableScanOp != NULL) {
STableScanInfo* pScanInfo = pStreamScanInfo->pTableScanOp->info;
if (pScanInfo->base.dataReader != NULL) {
pAPI->tsdReader.tsdSetReaderTaskId(pScanInfo->base.dataReader, pTaskInfo->id.str);
int32_t code = pAPI->tsdReader.tsdSetReaderTaskId(pScanInfo->base.dataReader, pTaskInfo->id.str);
if (code) {
qError("failed to set reader id for executor, code:%s", tstrerror(code));
return code;
}
}
}
} else {
doSetTaskId(pOperator->pDownstream[0], pAPI);
return doSetTaskId(pOperator->pDownstream[0], pAPI);
}
return 0;
}
void qSetTaskId(qTaskInfo_t tinfo, uint64_t taskId, uint64_t queryId) {
int32_t qSetTaskId(qTaskInfo_t tinfo, uint64_t taskId, uint64_t queryId) {
SExecTaskInfo* pTaskInfo = tinfo;
pTaskInfo->id.queryId = queryId;
buildTaskId(taskId, queryId, pTaskInfo->id.str);
// set the idstr for tsdbReader
doSetTaskId(pTaskInfo->pRoot, &pTaskInfo->storageAPI);
return doSetTaskId(pTaskInfo->pRoot, &pTaskInfo->storageAPI);
}
int32_t qSetStreamOpOpen(qTaskInfo_t tinfo) {
@ -337,33 +343,31 @@ qTaskInfo_t qCreateQueueExecTaskInfo(void* msg, SReadHandle* pReaderHandle, int3
return pTaskInfo;
}
qTaskInfo_t qCreateStreamExecTaskInfo(void* msg, SReadHandle* readers, int32_t vgId, int32_t taskId) {
int32_t qCreateStreamExecTaskInfo(qTaskInfo_t* pTaskInfo, void* msg, SReadHandle* readers, int32_t vgId, int32_t taskId) {
if (msg == NULL) {
return NULL;
return TSDB_CODE_INVALID_PARA;
}
*pTaskInfo = NULL;
SSubplan* pPlan = NULL;
int32_t code = qStringToSubplan(msg, &pPlan);
if (code != TSDB_CODE_SUCCESS) {
terrno = code;
return NULL;
return code;
}
qTaskInfo_t pTaskInfo = NULL;
code = qCreateExecTask(readers, vgId, taskId, pPlan, &pTaskInfo, NULL, 0, NULL, OPTR_EXEC_MODEL_STREAM);
code = qCreateExecTask(readers, vgId, taskId, pPlan, pTaskInfo, NULL, 0, NULL, OPTR_EXEC_MODEL_STREAM);
if (code != TSDB_CODE_SUCCESS) {
qDestroyTask(pTaskInfo);
terrno = code;
return NULL;
qDestroyTask(*pTaskInfo);
return code;
}
code = qStreamInfoResetTimewindowFilter(pTaskInfo);
code = qStreamInfoResetTimewindowFilter(*pTaskInfo);
if (code != TSDB_CODE_SUCCESS) {
qDestroyTask(pTaskInfo);
terrno = code;
return NULL;
qDestroyTask(*pTaskInfo);
}
return pTaskInfo;
return code;
}
static int32_t filterUnqualifiedTables(const SStreamScanInfo* pScanInfo, const SArray* tableIdList, const char* idstr,
@ -627,9 +631,13 @@ int32_t qCreateExecTask(SReadHandle* readHandle, int32_t vgId, uint64_t taskId,
// pSinkParam has been freed during create sinker.
code = dsCreateDataSinker(pSinkManager, pSubplan->pDataSink, handle, pSinkParam, (*pTask)->id.str);
if (code) {
qError("s-task:%s failed to create data sinker, code:%s", (*pTask)->id.str, tstrerror(code));
}
}
qDebug("subplan task create completed, TID:0x%" PRIx64 "QID:0x%" PRIx64, taskId, pSubplan->id.queryId);
qDebug("subplan task create completed, TID:0x%" PRIx64 " QID:0x%" PRIx64 " code:%s", taskId, pSubplan->id.queryId,
tstrerror(code));
_error:
// if failed to add ref for all tables in this query, abort current query

View File

@ -394,7 +394,6 @@ int32_t createOperator(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo, SReadHand
}
}
//pTaskInfo->schemaInfo.qsw = extractQueriedColumnSchema(&pTableScanNode->scan);
code = createStreamScanOperatorInfo(pHandle, pTableScanNode, pTagCond, pTableListInfo, pTaskInfo, &pOperator);
if (code) {
pTaskInfo->code = code;

View File

@ -3894,8 +3894,8 @@ static void destroyStreamScanOperatorInfo(void* param) {
if (param == NULL) {
return;
}
SStreamScanInfo* pStreamScan = (SStreamScanInfo*)param;
SStreamScanInfo* pStreamScan = (SStreamScanInfo*)param;
if (pStreamScan->pTableScanOp && pStreamScan->pTableScanOp->info) {
destroyOperator(pStreamScan->pTableScanOp);
}
@ -3914,7 +3914,10 @@ static void destroyStreamScanOperatorInfo(void* param) {
cleanupExprSupp(&pStreamScan->tbnameCalSup);
cleanupExprSupp(&pStreamScan->tagCalSup);
pStreamScan->stateStore.updateInfoDestroy(pStreamScan->pUpdateInfo);
if (pStreamScan->stateStore.updateInfoDestroy) {
pStreamScan->stateStore.updateInfoDestroy(pStreamScan->pUpdateInfo);
}
blockDataDestroy(pStreamScan->pRes);
blockDataDestroy(pStreamScan->pUpdateRes);
blockDataDestroy(pStreamScan->pDeleteDataRes);
@ -4127,16 +4130,13 @@ int32_t createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhysiNode*
}
pInfo->pBlockLists = taosArrayInit(4, sizeof(SPackedData));
if (pInfo->pBlockLists == NULL) {
code = terrno;
goto _error;
}
TSDB_CHECK_NULL(pInfo->pBlockLists, code, lino, _error, terrno);
if (pHandle->vnode) {
SOperatorInfo* pTableScanOp = NULL;
code = createTableScanOperatorInfo(pTableScanNode, pHandle, pTableListInfo, pTaskInfo, &pTableScanOp);
if (pTableScanOp == NULL || code != 0) {
qError("createTableScanOperatorInfo error, errorcode: %d", pTaskInfo->code);
qError("createTableScanOperatorInfo error, code:%d", pTaskInfo->code);
goto _error;
}
@ -4180,6 +4180,7 @@ int32_t createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhysiNode*
// set the extract column id to streamHandle
pAPI->tqReaderFn.tqReaderSetColIdList(pInfo->tqReader, pColIds);
SArray* tableIdList = NULL;
code = extractTableIdList(((STableScanInfo*)(pInfo->pTableScanOp->info))->base.pTableListInfo, &tableIdList);
QUERY_CHECK_CODE(code, lino, _error);
@ -4189,9 +4190,11 @@ int32_t createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhysiNode*
} else {
taosArrayDestroy(pColIds);
tableListDestroy(pTableListInfo);
pColIds = NULL;
}
// clear the local variable to avoid repeatly free
pColIds = NULL;
// create the pseduo columns info
if (pTableScanNode->scan.pScanPseudoCols != NULL) {
code = createExprInfo(pTableScanNode->scan.pScanPseudoCols, NULL, &pInfo->pPseudoExpr, &pInfo->numOfPseudoExpr);
@ -4268,6 +4271,10 @@ _error:
}
if (pInfo != NULL) {
STableScanInfo* p = (STableScanInfo*) pInfo->pTableScanOp->info;
if (p != NULL) {
p->base.pTableListInfo = NULL;
}
destroyStreamScanOperatorInfo(pInfo);
}

View File

@ -238,7 +238,7 @@ void initCheckpointReadyInfo(STaskCheckpointReadyInfo* pReadyInfo, int32_t up
int32_t initCheckpointReadyMsg(SStreamTask* pTask, int32_t upstreamNodeId, int32_t upstreamTaskId, int32_t childId,
int64_t checkpointId, SRpcMsg* pMsg);
void flushStateDataInExecutor(SStreamTask* pTask, SStreamQueueItem* pCheckpointBlock);
int32_t flushStateDataInExecutor(SStreamTask* pTask, SStreamQueueItem* pCheckpointBlock);
#ifdef __cplusplus

View File

@ -2573,11 +2573,15 @@ void taskDbUpdateChkpId(void* pTaskDb, int64_t chkpId) {
}
STaskDbWrapper* taskDbOpenImpl(const char* key, char* statePath, char* dbPath) {
char* err = NULL;
char** cfNames = NULL;
size_t nCf = 0;
char* err = NULL;
char** cfNames = NULL;
size_t nCf = 0;
int32_t code = 0;
int32_t lino = 0;
STaskDbWrapper* pTaskDb = taosMemoryCalloc(1, sizeof(STaskDbWrapper));
TSDB_CHECK_NULL(pTaskDb, code, lino, _EXIT, terrno);
pTaskDb->idstr = key ? taosStrdup(key) : NULL;
pTaskDb->path = statePath ? taosStrdup(statePath) : NULL;
@ -2592,6 +2596,7 @@ STaskDbWrapper* taskDbOpenImpl(const char* key, char* statePath, char* dbPath) {
pTaskDb->db = rocksdb_open(pTaskDb->pCfOpts[0], dbPath, &err);
if (pTaskDb->db == NULL) {
stError("%s open state-backend failed, reason:%s", key, err);
code = TSDB_CODE_STREAM_INTERNAL_ERROR;
goto _EXIT;
}
@ -2608,11 +2613,12 @@ STaskDbWrapper* taskDbOpenImpl(const char* key, char* statePath, char* dbPath) {
cfNames = rocksdb_list_column_families(pTaskDb->dbOpt, dbPath, &nCf, &err);
if (err != NULL) {
stError("%s failed to create column-family, %s, %" PRIzu ", reason:%s", key, dbPath, nCf, err);
code = TSDB_CODE_STREAM_INTERNAL_ERROR;
goto _EXIT;
}
}
if (taskDbOpenCfs(pTaskDb, dbPath, cfNames, nCf) != 0) {
if ((code = taskDbOpenCfs(pTaskDb, dbPath, cfNames, nCf)) != 0) {
goto _EXIT;
}
@ -2625,6 +2631,8 @@ STaskDbWrapper* taskDbOpenImpl(const char* key, char* statePath, char* dbPath) {
return pTaskDb;
_EXIT:
stError("%s taskDb open failed, %s at line:%d code:%s", key, __func__, lino, tstrerror(code));
taskDbDestroy(pTaskDb, false);
if (err) taosMemoryFree(err);
if (cfNames) rocksdb_list_column_families_destroy(cfNames, nCf);

View File

@ -21,7 +21,7 @@
#define CHECK_NOT_RSP_DURATION 10 * 1000 // 10 sec
static void processDownstreamReadyRsp(SStreamTask* pTask);
static void addIntoNodeUpdateList(SStreamTask* pTask, int32_t nodeId);
static int32_t addIntoNodeUpdateList(SStreamTask* pTask, int32_t nodeId);
static void rspMonitorFn(void* param, void* tmrId);
static void streamTaskInitTaskCheckInfo(STaskCheckInfo* pInfo, STaskOutputInfo* pOutputInfo, int64_t startTs);
static int32_t streamTaskStartCheckDownstream(STaskCheckInfo* pInfo, const char* id);
@ -226,13 +226,13 @@ int32_t streamTaskProcessCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp*
stError("s-task:%s vgId:%d self vnode-transfer/leader-change/restart detected, old stage:%" PRId64
", current stage:%" PRId64 ", not check wait for downstream task nodeUpdate, and all tasks restart",
id, pRsp->upstreamNodeId, pRsp->oldStage, pTask->pMeta->stage);
addIntoNodeUpdateList(pTask, pRsp->upstreamNodeId);
code = addIntoNodeUpdateList(pTask, pRsp->upstreamNodeId);
} else {
stError(
"s-task:%s downstream taskId:0x%x (vgId:%d) not leader, self dispatch epset needs to be updated, not check "
"downstream again, nodeUpdate needed",
id, pRsp->downstreamTaskId, pRsp->downstreamNodeId);
addIntoNodeUpdateList(pTask, pRsp->downstreamNodeId);
code = addIntoNodeUpdateList(pTask, pRsp->downstreamNodeId);
}
streamMetaAddFailedTaskSelf(pTask, now);
@ -258,6 +258,11 @@ int32_t streamTaskSendCheckRsp(const SStreamMeta* pMeta, int32_t vgId, SStreamTa
}
void* buf = rpcMallocCont(sizeof(SMsgHead) + len);
if (buf == NULL) {
stError("s-task:0x%x vgId:%d failed prepare msg, %s at line:%d code:%s", taskId, pMeta->vgId, __func__, __LINE__, tstrerror(code));
return terrno;
}
((SMsgHead*)buf)->vgId = htonl(vgId);
void* abuf = POINTER_SHIFT(buf, sizeof(SMsgHead));
@ -268,7 +273,7 @@ int32_t streamTaskSendCheckRsp(const SStreamMeta* pMeta, int32_t vgId, SStreamTa
SRpcMsg rspMsg = {.code = 0, .pCont = buf, .contLen = sizeof(SMsgHead) + len, .info = *pRpcInfo};
tmsgSendRsp(&rspMsg);
code = (code >= 0)? 0:code;
code = TMIN(code, 0);
return code;
}
@ -371,12 +376,14 @@ void processDownstreamReadyRsp(SStreamTask* pTask) {
}
}
void addIntoNodeUpdateList(SStreamTask* pTask, int32_t nodeId) {
int32_t addIntoNodeUpdateList(SStreamTask* pTask, int32_t nodeId) {
int32_t vgId = pTask->pMeta->vgId;
int32_t code = 0;;
bool existed = false;
streamMutexLock(&pTask->lock);
int32_t num = taosArrayGetSize(pTask->outputInfo.pNodeEpsetUpdateList);
bool existed = false;
for (int i = 0; i < num; ++i) {
SDownstreamTaskEpset* p = taosArrayGet(pTask->outputInfo.pNodeEpsetUpdateList, i);
if (p == NULL) {
@ -391,15 +398,19 @@ void addIntoNodeUpdateList(SStreamTask* pTask, int32_t nodeId) {
if (!existed) {
SDownstreamTaskEpset t = {.nodeId = nodeId};
void* p = taosArrayPush(pTask->outputInfo.pNodeEpsetUpdateList, &t);
void* p = taosArrayPush(pTask->outputInfo.pNodeEpsetUpdateList, &t);
if (p == NULL) {
// todo let's retry
code = terrno;
stError("s-task:%s vgId:%d failed to update epset, code:%s", pTask->id.idStr, vgId, tstrerror(code));
} else {
stInfo("s-task:%s vgId:%d downstream nodeId:%d needs to be updated, total needs updated:%d", pTask->id.idStr,
vgId, t.nodeId, (num + 1));
}
stInfo("s-task:%s vgId:%d downstream nodeId:%d needs to be updated, total needs updated:%d", pTask->id.idStr, vgId,
t.nodeId, (num + 1));
}
streamMutexUnlock(&pTask->lock);
return code;
}
void streamTaskInitTaskCheckInfo(STaskCheckInfo* pInfo, STaskOutputInfo* pOutputInfo, int64_t startTs) {
@ -629,6 +640,7 @@ void handleTimeoutDownstreamTasks(SStreamTask* pTask, SArray* pTimeoutList) {
const char* id = pTask->id.idStr;
int32_t vgId = pTask->pMeta->vgId;
int32_t numOfTimeout = taosArrayGetSize(pTimeoutList);
int32_t code = 0;
pInfo->timeoutStartTs = taosGetTimestampMs();
for (int32_t i = 0; i < numOfTimeout; ++i) {
@ -640,14 +652,13 @@ void handleTimeoutDownstreamTasks(SStreamTask* pTask, SArray* pTimeoutList) {
int32_t taskId = *px;
SDownstreamStatusInfo* p = NULL;
findCheckRspStatus(pInfo, taskId, &p);
if (p != NULL) {
if (p->status != -1 || p->rspTs != 0) {
stError("s-task:%s invalid rsp record entry, index:%d, status:%d, rspTs:%" PRId64, pTask->id.idStr, i,
p->status, p->rspTs);
stError("s-task:%s invalid rsp record entry, index:%d, status:%d, rspTs:%" PRId64, id, i, p->status, p->rspTs);
continue;
}
int32_t code = doSendCheckMsg(pTask, p);
code = doSendCheckMsg(pTask, p);
}
}
@ -666,7 +677,7 @@ void handleTimeoutDownstreamTasks(SStreamTask* pTask, SArray* pTimeoutList) {
SDownstreamStatusInfo* p = NULL;
findCheckRspStatus(pInfo, *pTaskId, &p);
if (p != NULL) {
addIntoNodeUpdateList(pTask, p->vgId);
code = addIntoNodeUpdateList(pTask, p->vgId);
stDebug("s-task:%s vgId:%d downstream task:0x%x (vgId:%d) timeout more than 100sec, add into nodeUpate list",
id, vgId, p->taskId, p->vgId);
}

View File

@ -365,7 +365,10 @@ int32_t streamProcessCheckpointTriggerBlock(SStreamTask* pTask, SStreamDataBlock
// The transfer of state may generate new data that need to dispatch to downstream tasks,
// Otherwise, those new generated data by executors that is kept in outputQ, may be lost if this program crashed
// before the next checkpoint.
flushStateDataInExecutor(pTask, (SStreamQueueItem*)pBlock);
code = flushStateDataInExecutor(pTask, (SStreamQueueItem*)pBlock);
if (code) {
return code;
}
if (type == TASK_OUTPUT__FIXED_DISPATCH || type == TASK_OUTPUT__SHUFFLE_DISPATCH) {
stDebug("s-task:%s set childIdx:%d, and add checkpoint-trigger block into outputQ", id, pTask->info.selfChildId);
@ -398,7 +401,10 @@ int32_t streamProcessCheckpointTriggerBlock(SStreamTask* pTask, SStreamDataBlock
code = streamTaskBuildCheckpoint(pTask); // todo: not handle error yet
} else { // source & agg tasks need to forward the checkpoint msg downwards
stDebug("s-task:%s process checkpoint-trigger block, all %d upstreams sent, forwards to downstream", id, num);
flushStateDataInExecutor(pTask, (SStreamQueueItem*)pBlock);
code = flushStateDataInExecutor(pTask, (SStreamQueueItem*)pBlock);
if (code) {
return code;
}
// Put the checkpoint-trigger block into outputQ, to make sure all blocks with less version have been handled by
// this task already. And then, dispatch check point msg to all downstream tasks

View File

@ -24,7 +24,7 @@
#define FILL_HISTORY_TASK_EXEC_INTERVAL 5000 // 5 sec
static int32_t streamTransferStateDoPrepare(SStreamTask* pTask);
static void streamTaskExecImpl(SStreamTask* pTask, SStreamQueueItem* pItem, int64_t* totalSize, int32_t* totalBlocks);
static int32_t streamTaskExecImpl(SStreamTask* pTask, SStreamQueueItem* pItem, int64_t* totalSize, int32_t* totalBlocks);
bool streamTaskShouldStop(const SStreamTask* pTask) {
SStreamTaskState pState = streamTaskGetStatus(pTask);
@ -95,7 +95,7 @@ static int32_t doDumpResult(SStreamTask* pTask, SStreamQueueItem* pItem, SArray*
return code;
}
void streamTaskExecImpl(SStreamTask* pTask, SStreamQueueItem* pItem, int64_t* totalSize, int32_t* totalBlocks) {
int32_t streamTaskExecImpl(SStreamTask* pTask, SStreamQueueItem* pItem, int64_t* totalSize, int32_t* totalBlocks) {
int32_t code = TSDB_CODE_SUCCESS;
void* pExecutor = pTask->exec.pExecutor;
int32_t size = 0;
@ -112,7 +112,7 @@ void streamTaskExecImpl(SStreamTask* pTask, SStreamQueueItem* pItem, int64_t* to
if (streamTaskShouldStop(pTask) || (pRes == NULL)) {
taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes);
return;
return code;
}
SSDataBlock* output = NULL;
@ -122,8 +122,13 @@ void streamTaskExecImpl(SStreamTask* pTask, SStreamQueueItem* pItem, int64_t* to
resetTaskInfo(pExecutor);
}
stError("unexpected stream execution, s-task:%s since %s", pTask->id.idStr, tstrerror(code));
continue;
if (code == TSDB_CODE_OUT_OF_MEMORY || code == TSDB_CODE_INVALID_PARA || code == TSDB_CODE_FILE_CORRUPTED) {
stFatal("s-task:%s failed to continue execute since %s", pTask->id.idStr, tstrerror(code));
return code;
} else {
qResetTaskCode(pExecutor);
continue;
}
}
if (output == NULL) {
@ -194,7 +199,7 @@ void streamTaskExecImpl(SStreamTask* pTask, SStreamQueueItem* pItem, int64_t* to
code = doDumpResult(pTask, pItem, pRes, size, totalSize, totalBlocks);
// todo: here we need continue retry to put it into output buffer
if (code != TSDB_CODE_SUCCESS) {
return;
return code;
}
pRes = NULL;
@ -208,6 +213,8 @@ void streamTaskExecImpl(SStreamTask* pTask, SStreamQueueItem* pItem, int64_t* to
} else {
taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes);
}
return code;
}
// todo contiuous try to create result blocks
@ -627,7 +634,7 @@ static void doRecordThroughput(STaskExecStatisInfo* pInfo, int64_t totalBlocks,
}
}
static void doStreamTaskExecImpl(SStreamTask* pTask, SStreamQueueItem* pBlock, int32_t num) {
static int32_t doStreamTaskExecImpl(SStreamTask* pTask, SStreamQueueItem* pBlock, int32_t num) {
const char* id = pTask->id.idStr;
int32_t blockSize = 0;
int64_t st = taosGetTimestampMs();
@ -635,23 +642,28 @@ static void doStreamTaskExecImpl(SStreamTask* pTask, SStreamQueueItem* pBlock, i
int64_t ver = pInfo->processedVer;
int64_t totalSize = 0;
int32_t totalBlocks = 0;
int32_t code = 0;
stDebug("s-task:%s start to process batch blocks, num:%d, type:%s", id, num, streamQueueItemGetTypeStr(pBlock->type));
int32_t code = doSetStreamInputBlock(pTask, pBlock, &ver, id);
code = doSetStreamInputBlock(pTask, pBlock, &ver, id);
if (code) {
stError("s-task:%s failed to set input block, not exec for these blocks", id);
return;
return code;
}
code = streamTaskExecImpl(pTask, pBlock, &totalSize, &totalBlocks);
if (code) {
return code;
}
streamTaskExecImpl(pTask, pBlock, &totalSize, &totalBlocks);
doRecordThroughput(&pTask->execInfo, totalBlocks, totalSize, blockSize, st, pTask->id.idStr);
// update the currentVer if processing the submit blocks.
if (!(pInfo->checkpointVer <= pInfo->nextProcessVer && ver >= pInfo->checkpointVer)) {
stError("s-task:%s invalid info, checkpointVer:%" PRId64 ", nextProcessVer:%" PRId64 " currentVer:%" PRId64, id,
pInfo->checkpointVer, pInfo->nextProcessVer, ver);
return;
return code;
}
if (ver != pInfo->processedVer) {
@ -660,9 +672,11 @@ static void doStreamTaskExecImpl(SStreamTask* pTask, SStreamQueueItem* pBlock, i
id, pInfo->processedVer, ver, pInfo->nextProcessVer, pInfo->checkpointVer);
pInfo->processedVer = ver;
}
return code;
}
void flushStateDataInExecutor(SStreamTask* pTask, SStreamQueueItem* pCheckpointBlock) {
int32_t flushStateDataInExecutor(SStreamTask* pTask, SStreamQueueItem* pCheckpointBlock) {
const char* id = pTask->id.idStr;
// 1. transfer the ownership of executor state
@ -703,7 +717,12 @@ void flushStateDataInExecutor(SStreamTask* pTask, SStreamQueueItem* pCheckpointB
}
// 2. flush data in executor to K/V store, which should be completed before do checkpoint in the K/V.
doStreamTaskExecImpl(pTask, pCheckpointBlock, 1);
int32_t code = doStreamTaskExecImpl(pTask, pCheckpointBlock, 1);
if(code) {
stError("s-task:%s failed to exec stream task before checkpoint, code:%s", id, tstrerror(code));
}
return code;
}
/**
@ -712,6 +731,7 @@ void flushStateDataInExecutor(SStreamTask* pTask, SStreamQueueItem* pCheckpointB
*/
static int32_t doStreamExecTask(SStreamTask* pTask) {
const char* id = pTask->id.idStr;
int32_t code = 0;
// merge multiple input data if possible in the input queue.
stDebug("s-task:%s start to extract data block from inputQ", id);
@ -784,9 +804,9 @@ static int32_t doStreamExecTask(SStreamTask* pTask) {
if (type == STREAM_INPUT__DATA_BLOCK) {
pTask->execInfo.sink.dataSize += blockSize;
stDebug("s-task:%s sink task start to sink %d blocks, size:%.2fKiB", id, numOfBlocks, SIZE_IN_KiB(blockSize));
int32_t code = doOutputResultBlockImpl(pTask, (SStreamDataBlock*)pInput);
code = doOutputResultBlockImpl(pTask, (SStreamDataBlock*)pInput);
if (code != TSDB_CODE_SUCCESS) {
// todo handle error.
return code;
}
double el = (taosGetTimestampMs() - st) / 1000.0;
@ -801,17 +821,19 @@ static int32_t doStreamExecTask(SStreamTask* pTask) {
}
if (type != STREAM_INPUT__CHECKPOINT) {
doStreamTaskExecImpl(pTask, pInput, numOfBlocks);
code = doStreamTaskExecImpl(pTask, pInput, numOfBlocks);
streamFreeQitem(pInput);
if (code) {
return code;
}
} else { // todo other thread may change the status
// do nothing after sync executor state to storage backend, untill the vnode-level checkpoint is completed.
streamMutexLock(&pTask->lock);
SStreamTaskState pState = streamTaskGetStatus(pTask);
if (pState.state == TASK_STATUS__CK) {
stDebug("s-task:%s checkpoint block received, set status:%s", id, pState.name);
int32_t code = streamTaskBuildCheckpoint(pTask); // ignore this error msg, and continue
code = streamTaskBuildCheckpoint(pTask); // ignore this error msg, and continue
} else { // todo refactor
int32_t code = 0;
if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) {
code = streamTaskSendCheckpointSourceRsp(pTask);
} else {
@ -827,7 +849,7 @@ static int32_t doStreamExecTask(SStreamTask* pTask) {
streamMutexUnlock(&pTask->lock);
streamFreeQitem(pInput);
return 0;
return code;
}
}
}
@ -858,21 +880,21 @@ bool streamTaskReadyToRun(const SStreamTask* pTask, char** pStatus) {
}
}
void streamResumeTask(SStreamTask* pTask) {
int32_t streamResumeTask(SStreamTask* pTask) {
const char* id = pTask->id.idStr;
int32_t code = 0;
if (pTask->status.schedStatus != TASK_SCHED_STATUS__ACTIVE) {
stError("s-task:%s invalid sched status:%d, not resume task", id, pTask->status.schedStatus);
return;
stError("s-task:%s invalid sched status:%d, not resume task", pTask->id.idStr, pTask->status.schedStatus);
return code;
}
while (1) {
code = doStreamExecTask(pTask);
if (code) {
stError("s-task:%s failed to exec stream task, code:%s", id, tstrerror(code));
return code;
}
// check if continue
streamMutexLock(&pTask->lock);
@ -888,7 +910,7 @@ void streamResumeTask(SStreamTask* pTask) {
stDebug("s-task:%s exec completed, status:%s, sched-status:%d, lastExecTs:%" PRId64, id, p,
pTask->status.schedStatus, pTask->status.lastExecTs);
return;
return code;
} else {
// check if this task needs to be idle for a while
if (pTask->status.schedIdleTime > 0) {
@ -896,28 +918,31 @@ void streamResumeTask(SStreamTask* pTask) {
streamMutexUnlock(&pTask->lock);
setLastExecTs(pTask, taosGetTimestampMs());
return;
return code;
}
}
streamMutexUnlock(&pTask->lock);
}
return code;
}
int32_t streamExecTask(SStreamTask* pTask) {
// this function may be executed by multi-threads, so status check is required.
const char* id = pTask->id.idStr;
int32_t code = 0;
int8_t schedStatus = streamTaskSetSchedStatusActive(pTask);
if (schedStatus == TASK_SCHED_STATUS__WAITING) {
streamResumeTask(pTask);
code = streamResumeTask(pTask);
} else {
char* p = streamTaskGetStatus(pTask).name;
stDebug("s-task:%s already started to exec by other thread, status:%s, sched-status:%d", id, p,
pTask->status.schedStatus);
}
return 0;
return code;
}
int32_t streamTaskReleaseState(SStreamTask* pTask) {

View File

@ -368,8 +368,9 @@ void streamMetaRemoveDB(void* arg, char* key) {
int32_t streamMetaOpen(const char* path, void* ahandle, FTaskBuild buildTaskFn, FTaskExpand expandTaskFn, int32_t vgId,
int64_t stage, startComplete_fn_t fn, SStreamMeta** p) {
int32_t code = 0;
QRY_PARAM_CHECK(p);
int32_t code = 0;
int32_t lino = 0;
SStreamMeta* pMeta = taosMemoryCalloc(1, sizeof(SStreamMeta));
if (pMeta == NULL) {
@ -379,23 +380,18 @@ int32_t streamMetaOpen(const char* path, void* ahandle, FTaskBuild buildTaskFn,
int32_t len = strlen(path) + 64;
char* tpath = taosMemoryCalloc(1, len);
if (tpath == NULL) {
code = terrno;
goto _err;
}
TSDB_CHECK_NULL(tpath, code, lino, _err, terrno);
sprintf(tpath, "%s%s%s", path, TD_DIRSEP, "stream");
pMeta->path = tpath;
code = streamMetaOpenTdb(pMeta);
if (code != TSDB_CODE_SUCCESS) {
goto _err;
}
TSDB_CHECK_CODE(code, lino, _err);
if ((code = streamMetaMayCvtDbFormat(pMeta)) < 0) {
stError("vgId:%d convert sub info format failed, open stream meta failed, reason: %s", pMeta->vgId,
tstrerror(terrno));
goto _err;
TSDB_CHECK_CODE(code, lino, _err);
}
if ((code = streamMetaBegin(pMeta) < 0)) {
@ -405,28 +401,17 @@ int32_t streamMetaOpen(const char* path, void* ahandle, FTaskBuild buildTaskFn,
_hash_fn_t fp = taosGetDefaultHashFunction(TSDB_DATA_TYPE_VARCHAR);
pMeta->pTasksMap = taosHashInit(64, fp, true, HASH_NO_LOCK);
if (pMeta->pTasksMap == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _err;
}
TSDB_CHECK_NULL(pMeta->pTasksMap, code, lino, _err, terrno);
pMeta->updateInfo.pTasks = taosHashInit(64, fp, false, HASH_NO_LOCK);
if (pMeta->updateInfo.pTasks == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _err;
}
TSDB_CHECK_NULL(pMeta->updateInfo.pTasks, code, lino, _err, terrno);
code = streamMetaInitStartInfo(&pMeta->startInfo);
if (code) {
goto _err;
}
TSDB_CHECK_CODE(code, lino, _err);
// task list
pMeta->pTaskList = taosArrayInit(4, sizeof(SStreamTaskId));
if (pMeta->pTaskList == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _err;
}
TSDB_CHECK_NULL(pMeta->pTaskList, code, lino, _err, terrno);
pMeta->scanInfo.scanCounter = 0;
pMeta->vgId = vgId;
@ -440,59 +425,47 @@ int32_t streamMetaOpen(const char* path, void* ahandle, FTaskBuild buildTaskFn,
pMeta->startInfo.completeFn = fn;
pMeta->pTaskDbUnique = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK);
TSDB_CHECK_NULL(pMeta->pTaskDbUnique, code, lino, _err, terrno);
pMeta->numOfPausedTasks = 0;
pMeta->numOfStreamTasks = 0;
pMeta->closeFlag = false;
stInfo("vgId:%d open stream meta succ, latest checkpoint:%" PRId64 ", stage:%" PRId64, vgId, pMeta->chkpId, stage);
pMeta->rid = taosAddRef(streamMetaId, pMeta);
// set the attribute when running on Linux OS
TdThreadRwlockAttr attr;
code = taosThreadRwlockAttrInit(&attr);
if (code != TSDB_CODE_SUCCESS) {
goto _err;
}
TSDB_CHECK_CODE(code, lino, _err);
#ifdef LINUX
code = pthread_rwlockattr_setkind_np(&attr, PTHREAD_RWLOCK_PREFER_WRITER_NONRECURSIVE_NP);
if (code != TSDB_CODE_SUCCESS) {
goto _err;
}
TSDB_CHECK_CODE(code, lino, _err);
#endif
code = taosThreadRwlockInit(&pMeta->lock, &attr);
if (code) {
goto _err;
}
TSDB_CHECK_CODE(code, lino, _err);
code = taosThreadRwlockAttrDestroy(&attr);
if (code) {
goto _err;
}
TSDB_CHECK_CODE(code, lino, _err);
int64_t* pRid = taosMemoryMalloc(sizeof(int64_t));
memcpy(pRid, &pMeta->rid, sizeof(pMeta->rid));
code = metaRefMgtAdd(pMeta->vgId, pRid);
if (code) {
goto _err;
}
TSDB_CHECK_CODE(code, lino, _err);
code = createMetaHbInfo(pRid, &pMeta->pHbInfo);
if (code != TSDB_CODE_SUCCESS) {
goto _err;
}
TSDB_CHECK_CODE(code, lino, _err);
pMeta->qHandle = taosInitScheduler(32, 1, "stream-chkp", NULL);
TSDB_CHECK_NULL(pMeta->qHandle, code, lino, _err, terrno);
code = bkdMgtCreate(tpath, (SBkdMgt**)&pMeta->bkdChkptMgt);
if (code != 0) {
goto _err;
}
TSDB_CHECK_CODE(code, lino, _err);
code = taosThreadMutexInit(&pMeta->backendMutex, NULL);
TSDB_CHECK_CODE(code, lino, _err);
*p = pMeta;
return code;
@ -526,9 +499,10 @@ _err:
if (pMeta->startInfo.pReadyTaskSet) taosHashCleanup(pMeta->startInfo.pReadyTaskSet);
if (pMeta->startInfo.pFailedTaskSet) taosHashCleanup(pMeta->startInfo.pFailedTaskSet);
if (pMeta->bkdChkptMgt) bkdMgtDestroy(pMeta->bkdChkptMgt);
taosMemoryFree(pMeta);
stError("failed to open stream meta, reason:%s", tstrerror(terrno));
stError("vgId:%d failed to open stream meta, at line:%d reason:%s", vgId, lino, tstrerror(code));
return code;
}
@ -1274,7 +1248,7 @@ void streamMetaNotifyClose(SStreamMeta* pMeta) {
void streamMetaStartHb(SStreamMeta* pMeta) {
int64_t* pRid = taosMemoryMalloc(sizeof(int64_t));
if (pRid == NULL) {
stError("vgId:%d failed to prepare the metaHb to mnode, hbMsg will not started, code: out of memory", pMeta->vgId);
stFatal("vgId:%d failed to prepare the metaHb to mnode, hbMsg will not started, code: out of memory", pMeta->vgId);
return;
}

View File

@ -287,7 +287,7 @@ int32_t streamTaskPutDataIntoInputQ(SStreamTask* pTask, SStreamQueueItem* pItem)
"s-task:%s inputQ is full, capacity(size:%d num:%dMiB), current(blocks:%d, size:%.2fMiB) stop to push data",
pTask->id.idStr, STREAM_TASK_QUEUE_CAPACITY, STREAM_TASK_QUEUE_CAPACITY_IN_SIZE, total, size);
streamDataSubmitDestroy(px);
return TSDB_CODE_OUT_OF_MEMORY;
return TSDB_CODE_STREAM_INPUTQ_FULL;
}
int32_t msgLen = px->submit.msgLen;
@ -312,7 +312,7 @@ int32_t streamTaskPutDataIntoInputQ(SStreamTask* pTask, SStreamQueueItem* pItem)
stTrace("s-task:%s input queue is full, capacity:%d size:%d MiB, current(blocks:%d, size:%.2fMiB) abort",
pTask->id.idStr, STREAM_TASK_QUEUE_CAPACITY, STREAM_TASK_QUEUE_CAPACITY_IN_SIZE, total, size);
streamFreeQitem(pItem);
return TSDB_CODE_OUT_OF_MEMORY;
return TSDB_CODE_STREAM_INPUTQ_FULL;
}
int32_t code = taosWriteQitem(pQueue, pItem);

View File

@ -101,8 +101,10 @@ int stateKeyCmpr(const void* pKey1, int kLen1, const void* pKey2, int kLen2) {
SStreamState* streamStateOpen(const char* path, void* pTask, int64_t streamId, int32_t taskId) {
int32_t code = TSDB_CODE_SUCCESS;
int32_t lino = 0;
SStreamState* pState = taosMemoryCalloc(1, sizeof(SStreamState));
stDebug("open stream state %p, %s", pState, path);
if (pState == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
QUERY_CHECK_CODE(code, lino, _end);
@ -138,7 +140,7 @@ SStreamState* streamStateOpen(const char* path, void* pTask, int64_t streamId, i
_end:
if (code != TSDB_CODE_SUCCESS) {
qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
qError("0x%x %s failed at line %d since %s", taskId, __func__, lino, tstrerror(code));
}
return NULL;
}

View File

@ -487,8 +487,7 @@ int32_t streamTaskInit(SStreamTask* pTask, SStreamMeta* pMeta, SMsgCb* pMsgCb, i
STaskOutputInfo* pOutputInfo = &pTask->outputInfo;
pOutputInfo->pTokenBucket = taosMemoryCalloc(1, sizeof(STokenBucket));
if (pOutputInfo->pTokenBucket == NULL) {
stError("s-task:%s failed to prepare the tokenBucket, code:%s", pTask->id.idStr,
tstrerror(TSDB_CODE_OUT_OF_MEMORY));
stError("s-task:%s failed to prepare the tokenBucket, code:%s", pTask->id.idStr, tstrerror(terrno));
return terrno;
}

View File

@ -818,6 +818,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_STREAM_TASK_IVLD_STATUS, "Invalid task status
TAOS_DEFINE_ERROR(TSDB_CODE_STREAM_CONFLICT_EVENT, "Stream conflict event")
TAOS_DEFINE_ERROR(TSDB_CODE_STREAM_INTERNAL_ERROR, "Stream internal error")
TAOS_DEFINE_ERROR(TSDB_CODE_STREAM_NOT_LEADER, "Stream task not on leader vnode")
TAOS_DEFINE_ERROR(TSDB_CODE_STREAM_INPUTQ_FULL, "Task input queue is full")
// TDLite
TAOS_DEFINE_ERROR(TSDB_CODE_TDLITE_IVLD_OPEN_FLAGS, "Invalid TDLite open flags")