diff --git a/source/libs/executor/src/executor.c b/source/libs/executor/src/executor.c index 0033e14a2d..cd43c5c99e 100644 --- a/source/libs/executor/src/executor.c +++ b/source/libs/executor/src/executor.c @@ -362,7 +362,7 @@ int32_t qCreateStreamExecTaskInfo(qTaskInfo_t* pTaskInfo, void* msg, SReadHandle return code; } - code = qStreamInfoResetTimewindowFilter(pTaskInfo); + code = qStreamInfoResetTimewindowFilter(*pTaskInfo); if (code != TSDB_CODE_SUCCESS) { qDestroyTask(*pTaskInfo); } @@ -631,9 +631,13 @@ int32_t qCreateExecTask(SReadHandle* readHandle, int32_t vgId, uint64_t taskId, // pSinkParam has been freed during create sinker. code = dsCreateDataSinker(pSinkManager, pSubplan->pDataSink, handle, pSinkParam, (*pTask)->id.str); + if (code) { + qError("s-task:%s failed to create data sinker, code:%s", (*pTask)->id.str, tstrerror(code)); + } } - qDebug("subplan task create completed, TID:0x%" PRIx64 "QID:0x%" PRIx64, taskId, pSubplan->id.queryId); + qDebug("subplan task create completed, TID:0x%" PRIx64 " QID:0x%" PRIx64 " code:%s", taskId, pSubplan->id.queryId, + tstrerror(code)); _error: // if failed to add ref for all tables in this query, abort current query diff --git a/source/libs/executor/src/operator.c b/source/libs/executor/src/operator.c index 88b9f6bf55..fe2f3f8dfe 100644 --- a/source/libs/executor/src/operator.c +++ b/source/libs/executor/src/operator.c @@ -394,7 +394,6 @@ int32_t createOperator(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo, SReadHand } } - //pTaskInfo->schemaInfo.qsw = extractQueriedColumnSchema(&pTableScanNode->scan); code = createStreamScanOperatorInfo(pHandle, pTableScanNode, pTagCond, pTableListInfo, pTaskInfo, &pOperator); if (code) { pTaskInfo->code = code; diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 4369b1df54..b6b5c5484e 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -3894,8 +3894,8 @@ static void destroyStreamScanOperatorInfo(void* param) { if (param == NULL) { return; } - SStreamScanInfo* pStreamScan = (SStreamScanInfo*)param; + SStreamScanInfo* pStreamScan = (SStreamScanInfo*)param; if (pStreamScan->pTableScanOp && pStreamScan->pTableScanOp->info) { destroyOperator(pStreamScan->pTableScanOp); } @@ -3914,7 +3914,10 @@ static void destroyStreamScanOperatorInfo(void* param) { cleanupExprSupp(&pStreamScan->tbnameCalSup); cleanupExprSupp(&pStreamScan->tagCalSup); - pStreamScan->stateStore.updateInfoDestroy(pStreamScan->pUpdateInfo); + if (pStreamScan->stateStore.updateInfoDestroy) { + pStreamScan->stateStore.updateInfoDestroy(pStreamScan->pUpdateInfo); + } + blockDataDestroy(pStreamScan->pRes); blockDataDestroy(pStreamScan->pUpdateRes); blockDataDestroy(pStreamScan->pDeleteDataRes); @@ -4127,16 +4130,13 @@ int32_t createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhysiNode* } pInfo->pBlockLists = taosArrayInit(4, sizeof(SPackedData)); - if (pInfo->pBlockLists == NULL) { - code = terrno; - goto _error; - } + TSDB_CHECK_NULL(pInfo->pBlockLists, code, lino, _error, terrno); if (pHandle->vnode) { SOperatorInfo* pTableScanOp = NULL; code = createTableScanOperatorInfo(pTableScanNode, pHandle, pTableListInfo, pTaskInfo, &pTableScanOp); if (pTableScanOp == NULL || code != 0) { - qError("createTableScanOperatorInfo error, errorcode: %d", pTaskInfo->code); + qError("createTableScanOperatorInfo error, code:%d", pTaskInfo->code); goto _error; } @@ -4180,6 +4180,7 @@ int32_t createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhysiNode* // set the extract column id to streamHandle pAPI->tqReaderFn.tqReaderSetColIdList(pInfo->tqReader, pColIds); + SArray* tableIdList = NULL; code = extractTableIdList(((STableScanInfo*)(pInfo->pTableScanOp->info))->base.pTableListInfo, &tableIdList); QUERY_CHECK_CODE(code, lino, _error); @@ -4189,9 +4190,11 @@ int32_t createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhysiNode* } else { taosArrayDestroy(pColIds); tableListDestroy(pTableListInfo); - pColIds = NULL; } + // clear the local variable to avoid repeatly free + pColIds = NULL; + // create the pseduo columns info if (pTableScanNode->scan.pScanPseudoCols != NULL) { code = createExprInfo(pTableScanNode->scan.pScanPseudoCols, NULL, &pInfo->pPseudoExpr, &pInfo->numOfPseudoExpr); @@ -4268,6 +4271,10 @@ _error: } if (pInfo != NULL) { + STableScanInfo* p = (STableScanInfo*) pInfo->pTableScanOp->info; + if (p != NULL) { + p->base.pTableListInfo = NULL; + } destroyStreamScanOperatorInfo(pInfo); } diff --git a/source/libs/stream/inc/streamInt.h b/source/libs/stream/inc/streamInt.h index 350bd35490..a5c5c1b775 100644 --- a/source/libs/stream/inc/streamInt.h +++ b/source/libs/stream/inc/streamInt.h @@ -238,7 +238,7 @@ void initCheckpointReadyInfo(STaskCheckpointReadyInfo* pReadyInfo, int32_t up int32_t initCheckpointReadyMsg(SStreamTask* pTask, int32_t upstreamNodeId, int32_t upstreamTaskId, int32_t childId, int64_t checkpointId, SRpcMsg* pMsg); -void flushStateDataInExecutor(SStreamTask* pTask, SStreamQueueItem* pCheckpointBlock); +int32_t flushStateDataInExecutor(SStreamTask* pTask, SStreamQueueItem* pCheckpointBlock); #ifdef __cplusplus diff --git a/source/libs/stream/src/streamBackendRocksdb.c b/source/libs/stream/src/streamBackendRocksdb.c index d79e5eb143..d9b6671d9b 100644 --- a/source/libs/stream/src/streamBackendRocksdb.c +++ b/source/libs/stream/src/streamBackendRocksdb.c @@ -2573,11 +2573,15 @@ void taskDbUpdateChkpId(void* pTaskDb, int64_t chkpId) { } STaskDbWrapper* taskDbOpenImpl(const char* key, char* statePath, char* dbPath) { - char* err = NULL; - char** cfNames = NULL; - size_t nCf = 0; + char* err = NULL; + char** cfNames = NULL; + size_t nCf = 0; + int32_t code = 0; + int32_t lino = 0; STaskDbWrapper* pTaskDb = taosMemoryCalloc(1, sizeof(STaskDbWrapper)); + TSDB_CHECK_NULL(pTaskDb, code, lino, _EXIT, terrno); + pTaskDb->idstr = key ? taosStrdup(key) : NULL; pTaskDb->path = statePath ? taosStrdup(statePath) : NULL; @@ -2592,6 +2596,7 @@ STaskDbWrapper* taskDbOpenImpl(const char* key, char* statePath, char* dbPath) { pTaskDb->db = rocksdb_open(pTaskDb->pCfOpts[0], dbPath, &err); if (pTaskDb->db == NULL) { stError("%s open state-backend failed, reason:%s", key, err); + code = TSDB_CODE_STREAM_INTERNAL_ERROR; goto _EXIT; } @@ -2608,11 +2613,12 @@ STaskDbWrapper* taskDbOpenImpl(const char* key, char* statePath, char* dbPath) { cfNames = rocksdb_list_column_families(pTaskDb->dbOpt, dbPath, &nCf, &err); if (err != NULL) { stError("%s failed to create column-family, %s, %" PRIzu ", reason:%s", key, dbPath, nCf, err); + code = TSDB_CODE_STREAM_INTERNAL_ERROR; goto _EXIT; } } - if (taskDbOpenCfs(pTaskDb, dbPath, cfNames, nCf) != 0) { + if ((code = taskDbOpenCfs(pTaskDb, dbPath, cfNames, nCf)) != 0) { goto _EXIT; } @@ -2625,6 +2631,8 @@ STaskDbWrapper* taskDbOpenImpl(const char* key, char* statePath, char* dbPath) { return pTaskDb; _EXIT: + stError("%s taskDb open failed, %s at line:%d code:%s", key, __func__, lino, tstrerror(code)); + taskDbDestroy(pTaskDb, false); if (err) taosMemoryFree(err); if (cfNames) rocksdb_list_column_families_destroy(cfNames, nCf); diff --git a/source/libs/stream/src/streamCheckStatus.c b/source/libs/stream/src/streamCheckStatus.c index 6353904b07..76e74db33f 100644 --- a/source/libs/stream/src/streamCheckStatus.c +++ b/source/libs/stream/src/streamCheckStatus.c @@ -258,6 +258,11 @@ int32_t streamTaskSendCheckRsp(const SStreamMeta* pMeta, int32_t vgId, SStreamTa } void* buf = rpcMallocCont(sizeof(SMsgHead) + len); + if (buf == NULL) { + stError("s-task:0x%x vgId:%d failed prepare msg, %s at line:%d code:%s", taskId, pMeta->vgId, __func__, __LINE__, tstrerror(code)); + return terrno; + } + ((SMsgHead*)buf)->vgId = htonl(vgId); void* abuf = POINTER_SHIFT(buf, sizeof(SMsgHead)); @@ -268,7 +273,7 @@ int32_t streamTaskSendCheckRsp(const SStreamMeta* pMeta, int32_t vgId, SStreamTa SRpcMsg rspMsg = {.code = 0, .pCont = buf, .contLen = sizeof(SMsgHead) + len, .info = *pRpcInfo}; tmsgSendRsp(&rspMsg); - code = (code >= 0)? 0:code; + code = TMIN(code, 0); return code; } diff --git a/source/libs/stream/src/streamCheckpoint.c b/source/libs/stream/src/streamCheckpoint.c index b0f6f45110..35d5ba4e08 100644 --- a/source/libs/stream/src/streamCheckpoint.c +++ b/source/libs/stream/src/streamCheckpoint.c @@ -365,7 +365,10 @@ int32_t streamProcessCheckpointTriggerBlock(SStreamTask* pTask, SStreamDataBlock // The transfer of state may generate new data that need to dispatch to downstream tasks, // Otherwise, those new generated data by executors that is kept in outputQ, may be lost if this program crashed // before the next checkpoint. - flushStateDataInExecutor(pTask, (SStreamQueueItem*)pBlock); + code = flushStateDataInExecutor(pTask, (SStreamQueueItem*)pBlock); + if (code) { + return code; + } if (type == TASK_OUTPUT__FIXED_DISPATCH || type == TASK_OUTPUT__SHUFFLE_DISPATCH) { stDebug("s-task:%s set childIdx:%d, and add checkpoint-trigger block into outputQ", id, pTask->info.selfChildId); @@ -398,7 +401,10 @@ int32_t streamProcessCheckpointTriggerBlock(SStreamTask* pTask, SStreamDataBlock code = streamTaskBuildCheckpoint(pTask); // todo: not handle error yet } else { // source & agg tasks need to forward the checkpoint msg downwards stDebug("s-task:%s process checkpoint-trigger block, all %d upstreams sent, forwards to downstream", id, num); - flushStateDataInExecutor(pTask, (SStreamQueueItem*)pBlock); + code = flushStateDataInExecutor(pTask, (SStreamQueueItem*)pBlock); + if (code) { + return code; + } // Put the checkpoint-trigger block into outputQ, to make sure all blocks with less version have been handled by // this task already. And then, dispatch check point msg to all downstream tasks diff --git a/source/libs/stream/src/streamExec.c b/source/libs/stream/src/streamExec.c index 4fc00a6f59..88e40b247b 100644 --- a/source/libs/stream/src/streamExec.c +++ b/source/libs/stream/src/streamExec.c @@ -676,7 +676,7 @@ static int32_t doStreamTaskExecImpl(SStreamTask* pTask, SStreamQueueItem* pBlock return code; } -void flushStateDataInExecutor(SStreamTask* pTask, SStreamQueueItem* pCheckpointBlock) { +int32_t flushStateDataInExecutor(SStreamTask* pTask, SStreamQueueItem* pCheckpointBlock) { const char* id = pTask->id.idStr; // 1. transfer the ownership of executor state @@ -717,7 +717,12 @@ void flushStateDataInExecutor(SStreamTask* pTask, SStreamQueueItem* pCheckpointB } // 2. flush data in executor to K/V store, which should be completed before do checkpoint in the K/V. - doStreamTaskExecImpl(pTask, pCheckpointBlock, 1); + int32_t code = doStreamTaskExecImpl(pTask, pCheckpointBlock, 1); + if(code) { + stError("s-task:%s failed to exec stream task before checkpoint, code:%s", id, tstrerror(code)); + } + + return code; } /**