From ecbc09663f565e649689f3ea5951afa0e7a9f335 Mon Sep 17 00:00:00 2001 From: Liu Jicong Date: Fri, 9 Dec 2022 09:35:53 +0800 Subject: [PATCH 1/2] enh: tolerate exec error --- source/libs/stream/src/streamExec.c | 1 + 1 file changed, 1 insertion(+) diff --git a/source/libs/stream/src/streamExec.c b/source/libs/stream/src/streamExec.c index 6a83a9a4da..d20c2902f5 100644 --- a/source/libs/stream/src/streamExec.c +++ b/source/libs/stream/src/streamExec.c @@ -54,6 +54,7 @@ static int32_t streamTaskExecImpl(SStreamTask* pTask, const void* data, SArray* /*ASSERT(false);*/ qError("unexpected stream execution, stream %" PRId64 " task: %d, since %s", pTask->streamId, pTask->taskId, terrstr()); + continue; } if (output == NULL) { if (pItem->type == STREAM_INPUT__DATA_RETRIEVE) { From 1e51c1b49cc7bce6025f09460f4a6630eae9cf6d Mon Sep 17 00:00:00 2001 From: Liu Jicong Date: Fri, 9 Dec 2022 09:41:06 +0800 Subject: [PATCH 2/2] add debug log --- source/client/src/clientMain.c | 7 +++++-- source/client/src/clientTmq.c | 4 ++++ 2 files changed, 9 insertions(+), 2 deletions(-) diff --git a/source/client/src/clientMain.c b/source/client/src/clientMain.c index f2dec7217f..8f029e6061 100644 --- a/source/client/src/clientMain.c +++ b/source/client/src/clientMain.c @@ -21,12 +21,12 @@ #include "os.h" #include "query.h" #include "scheduler.h" +#include "tdatablock.h" #include "tglobal.h" #include "tmsg.h" #include "tref.h" #include "trpc.h" #include "version.h" -#include "tdatablock.h" #define TSC_VAR_NOT_RELEASE 1 #define TSC_VAR_RELEASED 0 @@ -178,6 +178,8 @@ void taos_free_result(TAOS_RES *res) { return; } + tscDebug("taos free res %p", res); + if (TD_RES_QUERY(res)) { SRequestObj *pRequest = (SRequestObj *)res; tscDebug("0x%" PRIx64 " taos_free_result start to free query", pRequest->requestId); @@ -796,7 +798,8 @@ static void doAsyncQueryFromParse(SMetaData *pResultMeta, void *param, int32_t c SQuery *pQuery = pRequest->pQuery; pRequest->metric.ctgEnd = taosGetTimestampUs(); - qDebug("0x%" PRIx64 " start to continue parse, reqId:0x%" PRIx64 ", code:%s", pRequest->self, pRequest->requestId, tstrerror(code)); + qDebug("0x%" PRIx64 " start to continue parse, reqId:0x%" PRIx64 ", code:%s", pRequest->self, pRequest->requestId, + tstrerror(code)); if (code == TSDB_CODE_SUCCESS) { pWrapper->pCatalogReq->forceUpdate = false; diff --git a/source/client/src/clientTmq.c b/source/client/src/clientTmq.c index 766a94caf1..db717a4e4e 100644 --- a/source/client/src/clientTmq.c +++ b/source/client/src/clientTmq.c @@ -1215,6 +1215,8 @@ int32_t tmqPollCb(void* param, SDataBuf* pMsg, int32_t code) { taosMemoryFree(pMsg->pData); taosMemoryFree(pMsg->pEpSet); + tscDebug("consumer:%" PRId64 ", put poll res into mqueue %p", tmq->consumerId, pRspWrapper); + taosWriteQitem(tmq->mqueue, pRspWrapper); tsem_post(&tmq->rspSem); @@ -1664,6 +1666,8 @@ void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) { } } + tscDebug("consumer:%" PRId64 " handle rsp %p", tmq->consumerId, rspWrapper); + if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__END_RSP) { taosFreeQitem(rspWrapper); terrno = TSDB_CODE_TQ_NO_COMMITTED_OFFSET;