diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index cca241a1cf..3e13eaa6e8 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -816,7 +816,6 @@ int32_t tqProcessSubscribeReq(STQ* pTq, int64_t version, char* msg, int32_t msgL // TODO version should be assigned and refed during preprocess SWalRef* pRef = walRefCommittedVer(pTq->pVnode->pWal); if (pRef == NULL) { - ASSERT(0); return -1; } int64_t ver = pRef->refVer; @@ -837,12 +836,9 @@ int32_t tqProcessSubscribeReq(STQ* pTq, int64_t version, char* msg, int32_t msgL pHandle->execHandle.task = qCreateQueueExecTaskInfo(pHandle->execHandle.execCol.qmsg, &handle, &pHandle->execHandle.numOfCols, NULL); - ASSERT(pHandle->execHandle.task); void* scanner = NULL; qExtractStreamScanner(pHandle->execHandle.task, &scanner); - ASSERT(scanner); pHandle->execHandle.pExecReader = qExtractReaderFromStreamScanner(scanner); - ASSERT(pHandle->execHandle.pExecReader); } else if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__DB) { pHandle->pWalReader = walOpenReader(pTq->pVnode->pWal, NULL); pHandle->execHandle.pExecReader = tqOpenReader(pTq->pVnode); @@ -875,8 +871,7 @@ int32_t tqProcessSubscribeReq(STQ* pTq, int64_t version, char* msg, int32_t msgL taosHashPut(pTq->pHandle, req.subKey, strlen(req.subKey), pHandle, sizeof(STqHandle)); tqDebug("try to persist handle %s consumer %" PRId64, req.subKey, pHandle->consumerId); if (tqMetaSaveHandle(pTq, req.subKey, pHandle) < 0) { - // TODO - ASSERT(0); + return -1; } } else { /*ASSERT(pExec->consumerId == req.oldConsumerId);*/ @@ -886,8 +881,7 @@ int32_t tqProcessSubscribeReq(STQ* pTq, int64_t version, char* msg, int32_t msgL atomic_add_fetch_32(&pHandle->epoch, 1); taosMemoryFree(req.qmsg); if (tqMetaSaveHandle(pTq, req.subKey, pHandle) < 0) { - // TODO - ASSERT(0); + return -1; } // close handle } diff --git a/source/dnode/vnode/src/tq/tqMeta.c b/source/dnode/vnode/src/tq/tqMeta.c index 3ad01e2370..34f57bc697 100644 --- a/source/dnode/vnode/src/tq/tqMeta.c +++ b/source/dnode/vnode/src/tq/tqMeta.c @@ -71,17 +71,14 @@ int32_t tDecodeSTqHandle(SDecoder* pDecoder, STqHandle* pHandle) { int32_t tqMetaOpen(STQ* pTq) { if (tdbOpen(pTq->path, 16 * 1024, 1, &pTq->pMetaDB, 0) < 0) { - ASSERT(0); return -1; } if (tdbTbOpen("tq.db", -1, -1, NULL, pTq->pMetaDB, &pTq->pExecStore, 0) < 0) { - ASSERT(0); return -1; } if (tdbTbOpen("tq.check.db", -1, -1, NULL, pTq->pMetaDB, &pTq->pCheckStore, 0) < 0) { - ASSERT(0); return -1; } @@ -197,40 +194,49 @@ int32_t tqMetaSaveHandle(STQ* pTq, const char* key, const STqHandle* pHandle) { int32_t code; int32_t vlen; tEncodeSize(tEncodeSTqHandle, pHandle, vlen, code); - ASSERT(code == 0); tqDebug("tq save %s(%d) consumer %" PRId64 " vgId:%d", pHandle->subKey, (int32_t)strlen(pHandle->subKey), pHandle->consumerId, TD_VID(pTq->pVnode)); void* buf = taosMemoryCalloc(1, vlen); if (buf == NULL) { - ASSERT(0); + return -1; } SEncoder encoder; tEncoderInit(&encoder, buf, vlen); if (tEncodeSTqHandle(&encoder, pHandle) < 0) { - ASSERT(0); + tEncoderClear(&encoder); + taosMemoryFree(buf); + return -1; } TXN* txn; if (tdbBegin(pTq->pMetaDB, &txn, tdbDefaultMalloc, tdbDefaultFree, NULL, TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED) < 0) { - ASSERT(0); + tEncoderClear(&encoder); + taosMemoryFree(buf); + return -1; } if (tdbTbUpsert(pTq->pExecStore, key, (int)strlen(key), buf, vlen, txn) < 0) { - ASSERT(0); + tEncoderClear(&encoder); + taosMemoryFree(buf); + return -1; } if (tdbCommit(pTq->pMetaDB, txn) < 0) { - ASSERT(0); + tEncoderClear(&encoder); + taosMemoryFree(buf); + return -1; } if (tdbPostCommit(pTq->pMetaDB, txn) < 0) { - ASSERT(0); + tEncoderClear(&encoder); + taosMemoryFree(buf); + return -1; } tEncoderClear(&encoder);