diff --git a/include/common/tcommon.h b/include/common/tcommon.h index f74795a250..aad69862f0 100644 --- a/include/common/tcommon.h +++ b/include/common/tcommon.h @@ -195,7 +195,7 @@ typedef struct SDataBlockInfo { uint32_t capacity; SBlockID id; int16_t hasVarCol; - int16_t dataLoad; // denote if the data is loaded or not + int16_t dataLoad; // denote if the data is loaded or not // TODO: optimize and remove following int64_t version; // used for stream, and need serialization @@ -204,8 +204,9 @@ typedef struct SDataBlockInfo { STimeWindow calWin; // used for stream, do not serialize TSKEY watermark; // used for stream - char parTbName[TSDB_TABLE_NAME_LEN]; // used for stream partition - STag* pTag; // used for stream partition + char parTbName[TSDB_TABLE_NAME_LEN]; // used for stream partition + int32_t tagLen; + void* pTag; // used for stream partition } SDataBlockInfo; typedef struct SSDataBlock { @@ -239,22 +240,22 @@ typedef struct SVarColAttr { // pBlockAgg->numOfNull == info.rows, all data are null // pBlockAgg->numOfNull == 0, no data are null. typedef struct SColumnInfoData { - char* pData; // the corresponding block data in memory + char* pData; // the corresponding block data in memory union { char* nullbitmap; // bitmap, one bit for each item in the list SVarColAttr varmeta; }; - SColumnInfo info; // column info - bool hasNull; // if current column data has null value. + SColumnInfo info; // column info + bool hasNull; // if current column data has null value. } SColumnInfoData; typedef struct SQueryTableDataCond { uint64_t suid; - int32_t order; // desc|asc order to iterate the data block + int32_t order; // desc|asc order to iterate the data block int32_t numOfCols; SColumnInfo* colList; - int32_t* pSlotList; // the column output destation slot, and it may be null - int32_t type; // data block load type: + int32_t* pSlotList; // the column output destation slot, and it may be null + int32_t type; // data block load type: STimeWindow twindows; int64_t startVersion; int64_t endVersion; diff --git a/include/libs/stream/streamState.h b/include/libs/stream/streamState.h index 8fdac0da7f..d7bc151ecc 100644 --- a/include/libs/stream/streamState.h +++ b/include/libs/stream/streamState.h @@ -35,6 +35,7 @@ typedef struct STdbState { TTB* pFillStateDb; // todo refactor TTB* pSessionStateDb; TTB* pParNameDb; + TTB* pParTagDb; TXN* txn; } STdbState; @@ -108,6 +109,9 @@ int32_t streamStateCurPrev(SStreamState* pState, SStreamStateCur* pCur); int32_t streamStatePutParName(SStreamState* pState, int64_t groupId, const char* tbname); int32_t streamStateGetParName(SStreamState* pState, int64_t groupId, void** pVal); +int32_t streamStatePutParTag(SStreamState* pState, int64_t groupId, const void* tag, int32_t tagLen); +int32_t streamStateGetParTag(SStreamState* pState, int64_t groupId, void** tagVal, int32_t* tagLen); + #if 0 char* streamStateSessionDump(SStreamState* pState); #endif diff --git a/source/client/src/clientTmq.c b/source/client/src/clientTmq.c index c63ddea25d..1dd8117c1e 100644 --- a/source/client/src/clientTmq.c +++ b/source/client/src/clientTmq.c @@ -530,7 +530,6 @@ static int32_t tmqSendCommitReq(tmq_t* tmq, SMqClientVg* pVg, SMqClientTopic* pT int32_t tmqCommitMsgImpl(tmq_t* tmq, const TAOS_RES* msg, int8_t async, tmq_commit_cb* userCb, void* userParam) { char* topic; int32_t vgId; - ASSERT(msg != NULL); if (TD_RES_TMQ(msg)) { SMqRspObj* pRspObj = (SMqRspObj*)msg; topic = pRspObj->topic; @@ -809,8 +808,6 @@ int32_t tmqHandleAllDelayedTask(tmq_t* tmq) { taosTmrReset(tmqAssignDelayedCommitTask, tmq->autoCommitInterval, pRefId, tmqMgmt.timer, &tmq->commitTimer); } else if (*pTaskType == TMQ_DELAYED_TASK__REPORT) { - } else { - ASSERT(0); } taosFreeQitem(pTaskType); } @@ -953,10 +950,6 @@ tmq_t* tmq_consumer_new(tmq_conf_t* conf, char* errstr, int32_t errstrLen) { const char* user = conf->user == NULL ? TSDB_DEFAULT_USER : conf->user; const char* pass = conf->pass == NULL ? TSDB_DEFAULT_PASS : conf->pass; - ASSERT(user); - ASSERT(pass); - ASSERT(conf->groupId[0]); - pTmq->clientTopics = taosArrayInit(0, sizeof(SMqClientTopic)); pTmq->mqueue = taosOpenQueue(); pTmq->qall = taosAllocateQall(); @@ -1247,8 +1240,6 @@ int32_t tmqPollCb(void* param, SDataBuf* pMsg, int32_t code) { tDecodeSTaosxRsp(&decoder, &pRspWrapper->taosxRsp); tDecoderClear(&decoder); memcpy(&pRspWrapper->taosxRsp, pMsg->pData, sizeof(SMqRspHead)); - } else { - ASSERT(0); } taosMemoryFree(pMsg->pData); diff --git a/source/dnode/mnode/impl/src/mndConsumer.c b/source/dnode/mnode/impl/src/mndConsumer.c index 37e2c35225..cf2386348e 100644 --- a/source/dnode/mnode/impl/src/mndConsumer.c +++ b/source/dnode/mnode/impl/src/mndConsumer.c @@ -133,7 +133,10 @@ static int32_t mndProcessConsumerRecoverMsg(SRpcMsg *pMsg) { SMnode *pMnode = pMsg->info.node; SMqConsumerRecoverMsg *pRecoverMsg = pMsg->pCont; SMqConsumerObj *pConsumer = mndAcquireConsumer(pMnode, pRecoverMsg->consumerId); - ASSERT(pConsumer); + if (pConsumer == NULL) { + mError("cannot find consumer %" PRId64 " when processing consumer recover msg", pRecoverMsg->consumerId); + return -1; + } mInfo("receive consumer recover msg, consumer id %" PRId64 ", status %s", pRecoverMsg->consumerId, mndConsumerStatusName(pConsumer->status)); @@ -381,8 +384,6 @@ static int32_t mndProcessAskEpReq(SRpcMsg *pMsg) { return -1; } - ASSERT(strcmp(req.cgroup, pConsumer->cgroup) == 0); - atomic_store_32(&pConsumer->hbStatus, 0); // 1. check consumer status @@ -428,9 +429,8 @@ static int32_t mndProcessAskEpReq(SRpcMsg *pMsg) { for (int32_t i = 0; i < numOfTopics; i++) { char *topic = taosArrayGetP(pConsumer->currentTopics, i); SMqSubscribeObj *pSub = mndAcquireSubscribe(pMnode, pConsumer->cgroup, topic); - // txn guarantees pSub is created - ASSERT(pSub); + taosRLockLatch(&pSub->lock); SMqSubTopicEp topicEp = {0}; @@ -438,7 +438,6 @@ static int32_t mndProcessAskEpReq(SRpcMsg *pMsg) { // 2.1 fetch topic schema SMqTopicObj *pTopic = mndAcquireTopic(pMnode, topic); - ASSERT(pTopic); taosRLockLatch(&pTopic->lock); tstrncpy(topicEp.db, pTopic->db, TSDB_DB_FNAME_LEN); topicEp.schema.nCols = pTopic->schema.nCols; @@ -775,8 +774,8 @@ static int32_t mndConsumerActionUpdate(SSdb *pSdb, SMqConsumerObj *pOldConsumer, taosWLockLatch(&pOldConsumer->lock); if (pNewConsumer->updateType == CONSUMER_UPDATE__MODIFY) { - ASSERT(taosArrayGetSize(pOldConsumer->rebNewTopics) == 0); - ASSERT(taosArrayGetSize(pOldConsumer->rebRemovedTopics) == 0); + /*A(taosArrayGetSize(pOldConsumer->rebNewTopics) == 0);*/ + /*A(taosArrayGetSize(pOldConsumer->rebRemovedTopics) == 0);*/ if (taosArrayGetSize(pNewConsumer->rebNewTopics) == 0 && taosArrayGetSize(pNewConsumer->rebRemovedTopics) == 0) { pOldConsumer->status = MQ_CONSUMER_STATUS__READY; @@ -798,8 +797,8 @@ static int32_t mndConsumerActionUpdate(SSdb *pSdb, SMqConsumerObj *pOldConsumer, pOldConsumer->status = MQ_CONSUMER_STATUS__MODIFY; } } else if (pNewConsumer->updateType == CONSUMER_UPDATE__LOST) { - ASSERT(taosArrayGetSize(pOldConsumer->rebNewTopics) == 0); - ASSERT(taosArrayGetSize(pOldConsumer->rebRemovedTopics) == 0); + /*A(taosArrayGetSize(pOldConsumer->rebNewTopics) == 0);*/ + /*A(taosArrayGetSize(pOldConsumer->rebRemovedTopics) == 0);*/ int32_t sz = taosArrayGetSize(pOldConsumer->currentTopics); /*pOldConsumer->rebRemovedTopics = taosArrayInit(sz, sizeof(void *));*/ @@ -812,8 +811,8 @@ static int32_t mndConsumerActionUpdate(SSdb *pSdb, SMqConsumerObj *pOldConsumer, pOldConsumer->status = MQ_CONSUMER_STATUS__LOST; } else if (pNewConsumer->updateType == CONSUMER_UPDATE__RECOVER) { - ASSERT(taosArrayGetSize(pOldConsumer->currentTopics) == 0); - ASSERT(taosArrayGetSize(pOldConsumer->rebNewTopics) == 0); + /*A(taosArrayGetSize(pOldConsumer->currentTopics) == 0);*/ + /*A(taosArrayGetSize(pOldConsumer->rebNewTopics) == 0);*/ int32_t sz = taosArrayGetSize(pOldConsumer->assignedTopics); for (int32_t i = 0; i < sz; i++) { @@ -830,15 +829,15 @@ static int32_t mndConsumerActionUpdate(SSdb *pSdb, SMqConsumerObj *pOldConsumer, pOldConsumer->rebalanceTime = pNewConsumer->upTime; } else if (pNewConsumer->updateType == CONSUMER_UPDATE__ADD) { - ASSERT(taosArrayGetSize(pNewConsumer->rebNewTopics) == 1); - ASSERT(taosArrayGetSize(pNewConsumer->rebRemovedTopics) == 0); + /*A(taosArrayGetSize(pNewConsumer->rebNewTopics) == 1);*/ + /*A(taosArrayGetSize(pNewConsumer->rebRemovedTopics) == 0);*/ char *addedTopic = strdup(taosArrayGetP(pNewConsumer->rebNewTopics, 0)); // not exist in current topic -#if 1 +#if 0 for (int32_t i = 0; i < taosArrayGetSize(pOldConsumer->currentTopics); i++) { char *topic = taosArrayGetP(pOldConsumer->currentTopics, i); - ASSERT(strcmp(topic, addedTopic) != 0); + A(strcmp(topic, addedTopic) != 0); } #endif @@ -879,15 +878,15 @@ static int32_t mndConsumerActionUpdate(SSdb *pSdb, SMqConsumerObj *pOldConsumer, atomic_add_fetch_32(&pOldConsumer->epoch, 1); } else if (pNewConsumer->updateType == CONSUMER_UPDATE__REMOVE) { - ASSERT(taosArrayGetSize(pNewConsumer->rebNewTopics) == 0); - ASSERT(taosArrayGetSize(pNewConsumer->rebRemovedTopics) == 1); + /*A(taosArrayGetSize(pNewConsumer->rebNewTopics) == 0);*/ + /*A(taosArrayGetSize(pNewConsumer->rebRemovedTopics) == 1);*/ char *removedTopic = taosArrayGetP(pNewConsumer->rebRemovedTopics, 0); // not exist in new topic -#if 1 +#if 0 for (int32_t i = 0; i < taosArrayGetSize(pOldConsumer->rebNewTopics); i++) { char *topic = taosArrayGetP(pOldConsumer->rebNewTopics, i); - ASSERT(strcmp(topic, removedTopic) != 0); + A(strcmp(topic, removedTopic) != 0); } #endif @@ -913,7 +912,7 @@ static int32_t mndConsumerActionUpdate(SSdb *pSdb, SMqConsumerObj *pOldConsumer, } } // must find the topic - ASSERT(i < sz); + /*A(i < sz);*/ // set status if (taosArrayGetSize(pOldConsumer->rebNewTopics) == 0 && taosArrayGetSize(pOldConsumer->rebRemovedTopics) == 0) { diff --git a/source/dnode/mnode/impl/src/mndScheduler.c b/source/dnode/mnode/impl/src/mndScheduler.c index af1a29def0..bdef8000bd 100644 --- a/source/dnode/mnode/impl/src/mndScheduler.c +++ b/source/dnode/mnode/impl/src/mndScheduler.c @@ -115,13 +115,11 @@ int32_t mndAddDispatcherToInnerTask(SMnode* pMnode, SStreamObj* pStream, SStream if (pStream->fixedSinkVgId == 0) { SDbObj* pDb = mndAcquireDb(pMnode, pStream->targetDb); - ASSERT(pDb); if (pDb->cfg.numOfVgroups > 1) { isShuffle = true; pTask->outputType = TASK_OUTPUT__SHUFFLE_DISPATCH; pTask->dispatchMsgType = TDMT_STREAM_TASK_DISPATCH; if (mndExtractDbInfo(pMnode, pDb, &pTask->shuffleDispatcher.dbInfo, NULL) < 0) { - ASSERT(0); return -1; } } @@ -140,9 +138,7 @@ int32_t mndAddDispatcherToInnerTask(SMnode* pMnode, SStreamObj* pStream, SStream for (int32_t j = 0; j < sinkLvSize; j++) { SStreamTask* pLastLevelTask = taosArrayGetP(sinkLv, j); if (pLastLevelTask->nodeId == pVgInfo->vgId) { - ASSERT(pVgInfo->vgId > 0); pVgInfo->taskId = pLastLevelTask->taskId; - ASSERT(pVgInfo->taskId != 0); break; } } @@ -152,7 +148,6 @@ int32_t mndAddDispatcherToInnerTask(SMnode* pMnode, SStreamObj* pStream, SStream pTask->dispatchMsgType = TDMT_STREAM_TASK_DISPATCH; SArray* pArray = taosArrayGetP(pStream->tasks, 0); // one sink only - ASSERT(taosArrayGetSize(pArray) == 1); SStreamTask* lastLevelTask = taosArrayGetP(pArray, 0); pTask->fixedEpDispatcher.taskId = lastLevelTask->taskId; pTask->fixedEpDispatcher.nodeId = lastLevelTask->nodeId; @@ -170,7 +165,6 @@ int32_t mndAssignTaskToVg(SMnode* pMnode, SStreamTask* pTask, SSubplan* plan, co plan->execNode.epSet = pTask->epSet; if (qSubPlanToString(plan, &pTask->exec.qmsg, &msgLen) < 0) { - ASSERT(0); terrno = TSDB_CODE_QRY_INVALID_INPUT; return -1; } @@ -195,7 +189,6 @@ int32_t mndAssignTaskToSnode(SMnode* pMnode, SStreamTask* pTask, SSubplan* plan, plan->execNode.epSet = pTask->epSet; if (qSubPlanToString(plan, &pTask->exec.qmsg, &msgLen) < 0) { - ASSERT(0); terrno = TSDB_CODE_QRY_INVALID_INPUT; return -1; } @@ -222,8 +215,6 @@ int32_t mndAddShuffleSinkTasksToStream(SMnode* pMnode, SStreamObj* pStream) { void* pIter = NULL; SArray* tasks = taosArrayGetP(pStream->tasks, 0); - ASSERT(taosArrayGetSize(pStream->tasks) == 1); - while (1) { SVgObj* pVgroup = NULL; pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void**)&pVgroup); @@ -257,7 +248,10 @@ int32_t mndAddShuffleSinkTasksToStream(SMnode* pMnode, SStreamObj* pStream) { pTask->tbSink.stbUid = pStream->targetStbUid; memcpy(pTask->tbSink.stbFullName, pStream->targetSTbName, TSDB_TABLE_FNAME_LEN); pTask->tbSink.pSchemaWrapper = tCloneSSchemaWrapper(&pStream->outputSchema); - ASSERT(pTask->tbSink.pSchemaWrapper); + if (pTask->tbSink.pSchemaWrapper == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return -1; + } } sdbRelease(pSdb, pVgroup); } @@ -265,7 +259,6 @@ int32_t mndAddShuffleSinkTasksToStream(SMnode* pMnode, SStreamObj* pStream) { } int32_t mndAddFixedSinkTaskToStream(SMnode* pMnode, SStreamObj* pStream) { - ASSERT(pStream->fixedSinkVgId != 0); SArray* tasks = taosArrayGetP(pStream->tasks, 0); SStreamTask* pTask = tNewSStreamTask(pStream->uid); if (pTask == NULL) { @@ -275,8 +268,6 @@ int32_t mndAddFixedSinkTaskToStream(SMnode* pMnode, SStreamObj* pStream) { pTask->fillHistory = pStream->fillHistory; mndAddTaskToTaskSet(tasks, pTask); - ASSERT(pStream->fixedSinkVg.vgId == pStream->fixedSinkVgId); - pTask->nodeId = pStream->fixedSinkVgId; #if 0 SVgObj* pVgroup = mndAcquireVgroup(pMnode, pStream->fixedSinkVgId); @@ -311,13 +302,16 @@ int32_t mndScheduleStream(SMnode* pMnode, SStreamObj* pStream) { return -1; } int32_t planTotLevel = LIST_LENGTH(pPlan->pSubplans); - ASSERT(planTotLevel <= 2); + pStream->tasks = taosArrayInit(planTotLevel, sizeof(void*)); bool hasExtraSink = false; bool externalTargetDB = strcmp(pStream->sourceDb, pStream->targetDb) != 0; SDbObj* pDbObj = mndAcquireDb(pMnode, pStream->targetDb); - ASSERT(pDbObj != NULL); + if (pDbObj == NULL) { + terrno = TSDB_CODE_QRY_INVALID_INPUT; + return -1; + } bool multiTarget = pDbObj->cfg.numOfVgroups > 1; sdbRelease(pSdb, pDbObj); @@ -351,7 +345,10 @@ int32_t mndScheduleStream(SMnode* pMnode, SStreamObj* pStream) { SNodeListNode* inner = (SNodeListNode*)nodesListGetNode(pPlan->pSubplans, 0); SSubplan* plan = (SSubplan*)nodesListGetNode(inner->pNodeList, 0); - ASSERT(plan->subplanType == SUBPLAN_TYPE_MERGE); + if (plan->subplanType != SUBPLAN_TYPE_MERGE) { + terrno = TSDB_CODE_QRY_INVALID_INPUT; + return -1; + } pInnerTask = tNewSStreamTask(pStream->uid); if (pInnerTask == NULL) { @@ -409,7 +406,10 @@ int32_t mndScheduleStream(SMnode* pMnode, SStreamObj* pStream) { SNodeListNode* inner = (SNodeListNode*)nodesListGetNode(pPlan->pSubplans, 1); SSubplan* plan = (SSubplan*)nodesListGetNode(inner->pNodeList, 0); - ASSERT(plan->subplanType == SUBPLAN_TYPE_SCAN); + if (plan->subplanType != SUBPLAN_TYPE_SCAN) { + terrno = TSDB_CODE_QRY_INVALID_INPUT; + return -1; + } void* pIter = NULL; while (1) { @@ -471,9 +471,15 @@ int32_t mndScheduleStream(SMnode* pMnode, SStreamObj* pStream) { taosArrayPush(pStream->tasks, &taskOneLevel); SNodeListNode* inner = (SNodeListNode*)nodesListGetNode(pPlan->pSubplans, 0); - ASSERT(LIST_LENGTH(inner->pNodeList) == 1); + if (LIST_LENGTH(inner->pNodeList) != 1) { + terrno = TSDB_CODE_QRY_INVALID_INPUT; + return -1; + } SSubplan* plan = (SSubplan*)nodesListGetNode(inner->pNodeList, 0); - ASSERT(plan->subplanType == SUBPLAN_TYPE_SCAN); + if (plan->subplanType != SUBPLAN_TYPE_SCAN) { + terrno = TSDB_CODE_QRY_INVALID_INPUT; + return -1; + } void* pIter = NULL; while (1) { @@ -550,9 +556,6 @@ int32_t mndSchedInitSubEp(SMnode* pMnode, const SMqTopicObj* pTopic, SMqSubscrib plan = (SSubplan*)nodesListGetNode(inner->pNodeList, 0); } - ASSERT(pSub->unassignedVgs); - ASSERT(taosHashGetSize(pSub->consumerHash) == 0); - void* pIter = NULL; while (1) { pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void**)&pVgroup); @@ -590,10 +593,6 @@ int32_t mndSchedInitSubEp(SMnode* pMnode, const SMqTopicObj* pTopic, SMqSubscrib sdbRelease(pSdb, pVgroup); } - ASSERT(pSub->unassignedVgs->size > 0); - - ASSERT(taosHashGetSize(pSub->consumerHash) == 0); - qDestroyQueryPlan(pPlan); return 0; diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index 05c0594339..10ecd56280 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -326,13 +326,11 @@ static int32_t mndBuildStreamObjFromCreateReq(SMnode *pMnode, SStreamObj *pObj, // deserialize ast if (nodesStringToNode(pObj->ast, &pAst) < 0) { - /*ASSERT(0);*/ goto FAIL; } // extract output schema from ast if (qExtractResultSchema(pAst, (int32_t *)&pObj->outputSchema.nCols, &pObj->outputSchema.pSchema) != 0) { - /*ASSERT(0);*/ goto FAIL; } @@ -347,13 +345,11 @@ static int32_t mndBuildStreamObjFromCreateReq(SMnode *pMnode, SStreamObj *pObj, // using ast and param to build physical plan if (qCreateQueryPlan(&cxt, &pPlan, NULL) < 0) { - /*ASSERT(0);*/ goto FAIL; } // save physcial plan if (nodesNodeToString((SNode *)pPlan, false, &pObj->physicalPlan, NULL) != 0) { - /*ASSERT(0);*/ goto FAIL; } @@ -361,7 +357,7 @@ static int32_t mndBuildStreamObjFromCreateReq(SMnode *pMnode, SStreamObj *pObj, if (pCreate->numOfTags) { pObj->tagSchema.pSchema = taosMemoryCalloc(pCreate->numOfTags, sizeof(SSchema)); } - ASSERT(pCreate->numOfTags == taosArrayGetSize(pCreate->pTags)); + /*A(pCreate->numOfTags == taosArrayGetSize(pCreate->pTags));*/ for (int32_t i = 0; i < pCreate->numOfTags; i++) { SField *pField = taosArrayGet(pCreate->pTags, i); pObj->tagSchema.pSchema[i].colId = pObj->outputSchema.nCols + i + 1; @@ -378,9 +374,6 @@ FAIL: } int32_t mndPersistTaskDeployReq(STrans *pTrans, const SStreamTask *pTask) { - if (pTask->taskLevel == TASK_LEVEL__AGG) { - ASSERT(taosArrayGetSize(pTask->childEpInfo) != 0); - } SEncoder encoder; tEncoderInit(&encoder, NULL, 0); tEncodeSStreamTask(&encoder, pTask); @@ -545,8 +538,6 @@ _OVER: } static int32_t mndPersistTaskDropReq(STrans *pTrans, SStreamTask *pTask) { - ASSERT(pTask->nodeId != 0); - // vnode /*if (pTask->nodeId > 0) {*/ SVDropStreamTaskReq *pReq = taosMemoryCalloc(1, sizeof(SVDropStreamTaskReq)); @@ -800,10 +791,9 @@ static int32_t mndProcessStreamDoCheckpoint(SRpcMsg *pReq) { int32_t sz = taosArrayGetSize(pLevel); for (int32_t j = 0; j < sz; j++) { SStreamTask *pTask = taosArrayGetP(pLevel, j); - ASSERT(pTask->nodeId > 0); + /*A(pTask->nodeId > 0);*/ SVgObj *pVgObj = mndAcquireVgroup(pMnode, pTask->nodeId); if (pVgObj == NULL) { - ASSERT(0); taosRUnLockLatch(&pStream->lock); mndReleaseStream(pMnode, pStream); mndTransDrop(pTrans); @@ -863,7 +853,6 @@ static int32_t mndProcessDropStreamReq(SRpcMsg *pReq) { SMDropStreamReq dropReq = {0}; if (tDeserializeSMDropStreamReq(pReq->pCont, pReq->contLen, &dropReq) < 0) { - ASSERT(0); terrno = TSDB_CODE_INVALID_MSG; return -1; } diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 4a941b3c20..222339d387 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -92,21 +92,21 @@ STQ* tqOpen(const char* path, SVnode* pVnode) { taosHashSetFreeFp(pTq->pCheckInfo, (FDelete)tDeleteSTqCheckInfo); if (tqMetaOpen(pTq) < 0) { - ASSERT(0); + return NULL; } pTq->pOffsetStore = tqOffsetOpen(pTq); if (pTq->pOffsetStore == NULL) { - ASSERT(0); + return NULL; } pTq->pStreamMeta = streamMetaOpen(path, pTq, (FTaskExpand*)tqExpandTask, pTq->pVnode->config.vgId); if (pTq->pStreamMeta == NULL) { - ASSERT(0); + return NULL; } if (streamLoadTasks(pTq->pStreamMeta) < 0) { - ASSERT(0); + return NULL; } return pTq; @@ -166,19 +166,17 @@ int32_t tqSendMetaPollRsp(STQ* pTq, const SRpcMsg* pMsg, const SMqPollReq* pReq, int32_t tqPushDataRsp(STQ* pTq, STqPushEntry* pPushEntry) { SMqDataRsp* pRsp = &pPushEntry->dataRsp; - ASSERT(taosArrayGetSize(pRsp->blockData) == pRsp->blockNum); - ASSERT(taosArrayGetSize(pRsp->blockDataLen) == pRsp->blockNum); +#if 0 + A(taosArrayGetSize(pRsp->blockData) == pRsp->blockNum); + A(taosArrayGetSize(pRsp->blockDataLen) == pRsp->blockNum); - ASSERT(!pRsp->withSchema); - ASSERT(taosArrayGetSize(pRsp->blockSchema) == 0); + A(!pRsp->withSchema); + A(taosArrayGetSize(pRsp->blockSchema) == 0); if (pRsp->reqOffset.type == TMQ_OFFSET__LOG) { - /*if (pRsp->blockNum > 0) {*/ - /*ASSERT(pRsp->rspOffset.version > pRsp->reqOffset.version);*/ - /*} else {*/ - ASSERT(pRsp->rspOffset.version > pRsp->reqOffset.version); - /*}*/ + A(pRsp->rspOffset.version > pRsp->reqOffset.version); } +#endif int32_t len = 0; int32_t code = 0; @@ -223,19 +221,21 @@ int32_t tqPushDataRsp(STQ* pTq, STqPushEntry* pPushEntry) { } int32_t tqSendDataRsp(STQ* pTq, const SRpcMsg* pMsg, const SMqPollReq* pReq, const SMqDataRsp* pRsp) { - ASSERT(taosArrayGetSize(pRsp->blockData) == pRsp->blockNum); - ASSERT(taosArrayGetSize(pRsp->blockDataLen) == pRsp->blockNum); +#if 0 + A(taosArrayGetSize(pRsp->blockData) == pRsp->blockNum); + A(taosArrayGetSize(pRsp->blockDataLen) == pRsp->blockNum); - ASSERT(!pRsp->withSchema); - ASSERT(taosArrayGetSize(pRsp->blockSchema) == 0); + A(!pRsp->withSchema); + A(taosArrayGetSize(pRsp->blockSchema) == 0); if (pRsp->reqOffset.type == TMQ_OFFSET__LOG) { if (pRsp->blockNum > 0) { - ASSERT(pRsp->rspOffset.version > pRsp->reqOffset.version); + A(pRsp->rspOffset.version > pRsp->reqOffset.version); } else { - ASSERT(pRsp->rspOffset.version >= pRsp->reqOffset.version); + A(pRsp->rspOffset.version >= pRsp->reqOffset.version); } } +#endif int32_t len = 0; int32_t code = 0; @@ -279,22 +279,24 @@ int32_t tqSendDataRsp(STQ* pTq, const SRpcMsg* pMsg, const SMqPollReq* pReq, con } int32_t tqSendTaosxRsp(STQ* pTq, const SRpcMsg* pMsg, const SMqPollReq* pReq, const STaosxRsp* pRsp) { - ASSERT(taosArrayGetSize(pRsp->blockData) == pRsp->blockNum); - ASSERT(taosArrayGetSize(pRsp->blockDataLen) == pRsp->blockNum); +#if 0 + A(taosArrayGetSize(pRsp->blockData) == pRsp->blockNum); + A(taosArrayGetSize(pRsp->blockDataLen) == pRsp->blockNum); if (pRsp->withSchema) { - ASSERT(taosArrayGetSize(pRsp->blockSchema) == pRsp->blockNum); + A(taosArrayGetSize(pRsp->blockSchema) == pRsp->blockNum); } else { - ASSERT(taosArrayGetSize(pRsp->blockSchema) == 0); + A(taosArrayGetSize(pRsp->blockSchema) == 0); } if (pRsp->reqOffset.type == TMQ_OFFSET__LOG) { if (pRsp->blockNum > 0) { - ASSERT(pRsp->rspOffset.version > pRsp->reqOffset.version); + A(pRsp->rspOffset.version > pRsp->reqOffset.version); } else { - ASSERT(pRsp->rspOffset.version >= pRsp->reqOffset.version); + A(pRsp->rspOffset.version >= pRsp->reqOffset.version); } } +#endif int32_t len = 0; int32_t code = 0; @@ -348,7 +350,6 @@ int32_t tqProcessOffsetCommitReq(STQ* pTq, int64_t version, char* msg, int32_t m SDecoder decoder; tDecoderInit(&decoder, msg, msgLen); if (tDecodeSTqOffset(&decoder, &offset) < 0) { - ASSERT(0); return -1; } tDecoderClear(&decoder); @@ -362,8 +363,8 @@ int32_t tqProcessOffsetCommitReq(STQ* pTq, int64_t version, char* msg, int32_t m if (offset.val.version + 1 == version) { offset.val.version += 1; } - } else { - ASSERT(0); + /*} else {*/ + /*A(0);*/ } STqOffset* pOffset = tqOffsetRead(pTq->pOffsetStore, offset.subKey); if (pOffset != NULL && tqOffsetLessOrEqual(&offset, pOffset)) { @@ -371,7 +372,6 @@ int32_t tqProcessOffsetCommitReq(STQ* pTq, int64_t version, char* msg, int32_t m } if (tqOffsetWrite(pTq->pOffsetStore, &offset) < 0) { - ASSERT(0); return -1; } @@ -434,7 +434,7 @@ static int32_t tqInitDataRsp(SMqDataRsp* pRsp, const SMqPollReq* pReq, int8_t su } #endif - ASSERT(subType == TOPIC_SUB_TYPE__COLUMN); + /*A(subType == TOPIC_SUB_TYPE__COLUMN);*/ pRsp->withSchema = false; return 0; @@ -473,7 +473,6 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) { // 1.find handle STqHandle* pHandle = taosHashGet(pTq->pHandle, req.subKey, strlen(req.subKey)); - /*ASSERT(pHandle);*/ if (pHandle == NULL) { tqError("tmq poll: no consumer handle for consumer:%" PRId64 ", in vgId:%d, subkey %s", consumerId, TD_VID(pTq->pVnode), req.subKey); @@ -560,7 +559,9 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) { tqInitDataRsp(&dataRsp, &req, pHandle->execHandle.subType); // lock taosWLockLatch(&pTq->pushLock); - tqScanData(pTq, pHandle, &dataRsp, &fetchOffsetNew); + if (tqScanData(pTq, pHandle, &dataRsp, &fetchOffsetNew) < 0) { + return -1; + } #if 1 if (dataRsp.blockNum == 0 && dataRsp.reqOffset.type == TMQ_OFFSET__LOG && @@ -599,7 +600,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) { } // for taosx - ASSERT(pHandle->execHandle.subType != TOPIC_SUB_TYPE__COLUMN); + /*A(pHandle->execHandle.subType != TOPIC_SUB_TYPE__COLUMN);*/ SMqMetaRsp metaRsp = {0}; @@ -607,7 +608,9 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) { tqInitTaosxRsp(&taosxRsp, &req); if (fetchOffsetNew.type != TMQ_OFFSET__LOG) { - tqScanTaosx(pTq, pHandle, &taosxRsp, &metaRsp, &fetchOffsetNew); + if (tqScanTaosx(pTq, pHandle, &taosxRsp, &metaRsp, &fetchOffsetNew) < 0) { + return -1; + } if (metaRsp.metaRspLen > 0) { if (tqSendMetaPollRsp(pTq, pMsg, &req, &metaRsp) < 0) { @@ -690,8 +693,8 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) { } } else { - ASSERT(pHandle->fetchMeta); - ASSERT(IS_META_MSG(pHead->msgType)); + /*A(pHandle->fetchMeta);*/ + /*A(IS_META_MSG(pHead->msgType));*/ tqDebug("fetch meta msg, ver:%" PRId64 ", type:%d", pHead->version, pHead->msgType); tqOffsetResetToLog(&metaRsp.rspOffset, fetchVer); metaRsp.resMsgType = pHead->msgType; @@ -808,7 +811,6 @@ int32_t tqProcessSubscribeReq(STQ* pTq, int64_t version, char* msg, int32_t msgL // TODO version should be assigned and refed during preprocess SWalRef* pRef = walRefCommittedVer(pTq->pVnode->pWal); if (pRef == NULL) { - ASSERT(0); return -1; } int64_t ver = pRef->refVer; @@ -829,12 +831,12 @@ int32_t tqProcessSubscribeReq(STQ* pTq, int64_t version, char* msg, int32_t msgL pHandle->execHandle.task = qCreateQueueExecTaskInfo(pHandle->execHandle.execCol.qmsg, &handle, &pHandle->execHandle.numOfCols, NULL); - ASSERT(pHandle->execHandle.task); + /*A(pHandle->execHandle.task);*/ void* scanner = NULL; qExtractStreamScanner(pHandle->execHandle.task, &scanner); - ASSERT(scanner); + /*A(scanner);*/ pHandle->execHandle.pExecReader = qExtractReaderFromStreamScanner(scanner); - ASSERT(pHandle->execHandle.pExecReader); + /*A(pHandle->execHandle.pExecReader);*/ } else if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__DB) { pHandle->pWalReader = walOpenReader(pTq->pVnode->pWal, NULL); pHandle->execHandle.pExecReader = tqOpenReader(pTq->pVnode); @@ -867,19 +869,14 @@ int32_t tqProcessSubscribeReq(STQ* pTq, int64_t version, char* msg, int32_t msgL taosHashPut(pTq->pHandle, req.subKey, strlen(req.subKey), pHandle, sizeof(STqHandle)); tqDebug("try to persist handle %s consumer %" PRId64, req.subKey, pHandle->consumerId); if (tqMetaSaveHandle(pTq, req.subKey, pHandle) < 0) { - // TODO - ASSERT(0); } } else { - /*ASSERT(pExec->consumerId == req.oldConsumerId);*/ // TODO handle qmsg and exec modification atomic_store_32(&pHandle->epoch, -1); atomic_store_64(&pHandle->consumerId, req.newConsumerId); atomic_add_fetch_32(&pHandle->epoch, 1); taosMemoryFree(req.qmsg); if (tqMetaSaveHandle(pTq, req.subKey, pHandle) < 0) { - // TODO - ASSERT(0); } // close handle } @@ -888,9 +885,11 @@ int32_t tqProcessSubscribeReq(STQ* pTq, int64_t version, char* msg, int32_t msgL } int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver) { +#if 0 if (pTask->taskLevel == TASK_LEVEL__AGG) { - ASSERT(taosArrayGetSize(pTask->childEpInfo) != 0); + A(taosArrayGetSize(pTask->childEpInfo) != 0); } +#endif pTask->refCnt = 1; pTask->schedStatus = TASK_SCHED_STATUS__INACTIVE; @@ -927,7 +926,9 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver) { .pStateBackend = pTask->pState, }; pTask->exec.executor = qCreateStreamExecTaskInfo(pTask->exec.qmsg, &handle); - ASSERT(pTask->exec.executor); + if (pTask->exec.executor == NULL) { + return -1; + } } else if (pTask->taskLevel == TASK_LEVEL__AGG) { pTask->pState = streamStateOpen(pTq->pStreamMeta->path, pTask, false, -1, -1); @@ -940,7 +941,9 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver) { .pStateBackend = pTask->pState, }; pTask->exec.executor = qCreateStreamExecTaskInfo(pTask->exec.qmsg, &mgHandle); - ASSERT(pTask->exec.executor); + if (pTask->exec.executor == NULL) { + return -1; + } } // sink @@ -952,12 +955,12 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver) { pTask->tbSink.vnode = pTq->pVnode; pTask->tbSink.tbSinkFunc = tqSinkToTablePipeline; - ASSERT(pTask->tbSink.pSchemaWrapper); - ASSERT(pTask->tbSink.pSchemaWrapper->pSchema); + /*A(pTask->tbSink.pSchemaWrapper);*/ + /*A(pTask->tbSink.pSchemaWrapper->pSchema);*/ pTask->tbSink.pTSchema = tdGetSTSChemaFromSSChema(pTask->tbSink.pSchemaWrapper->pSchema, pTask->tbSink.pSchemaWrapper->nCols, 1); - ASSERT(pTask->tbSink.pTSchema); + /*A(pTask->tbSink.pTSchema);*/ } streamSetupTrigger(pTask); @@ -1003,7 +1006,8 @@ int32_t tqProcessStreamTaskCheckReq(STQ* pTq, SRpcMsg* pMsg) { int32_t len; tEncodeSize(tEncodeSStreamTaskCheckRsp, &rsp, len, code); if (code < 0) { - ASSERT(0); + tqError("unable to encode rsp %d", __LINE__); + return -1; } void* buf = rpcMallocCont(sizeof(SMsgHead) + len); ((SMsgHead*)buf)->vgId = htonl(req.upstreamNodeId); @@ -1096,12 +1100,10 @@ int32_t tqProcessTaskRecover1Req(STQ* pTq, SRpcMsg* pMsg) { if (pTask == NULL) { return -1; } - ASSERT(pReq->taskId == pTask->taskId); // check param int64_t fillVer1 = pTask->startVer; if (fillVer1 <= 0) { - ASSERT(0); streamMetaReleaseTask(pTq->pStreamMeta, pTask); return -1; } @@ -1296,7 +1298,7 @@ int32_t tqProcessDelReq(STQ* pTq, void* pReq, int32_t len, int64_t ver) { } int32_t ref = atomic_sub_fetch_32(pRef, 1); - ASSERT(ref >= 0); + /*A(ref >= 0);*/ if (ref == 0) { blockDataDestroy(pDelBlock); taosMemoryFree(pRef); diff --git a/source/dnode/vnode/src/tq/tqExec.c b/source/dnode/vnode/src/tq/tqExec.c index 093186ebbb..426fceb7ed 100644 --- a/source/dnode/vnode/src/tq/tqExec.c +++ b/source/dnode/vnode/src/tq/tqExec.c @@ -29,7 +29,6 @@ int32_t tqAddBlockDataToRsp(const SSDataBlock* pBlock, SMqDataRsp* pRsp, int32_t int32_t actualLen = blockEncode(pBlock, pRetrieve->data, numOfCols); actualLen += sizeof(SRetrieveTableRsp); - ASSERT(actualLen <= dataStrLen); taosArrayPush(pRsp->blockDataLen, &actualLen); taosArrayPush(pRsp->blockData, &buf); return 0; @@ -62,7 +61,6 @@ static int32_t tqAddTbNameToRsp(const STQ* pTq, int64_t uid, SMqDataRsp* pRsp, i int32_t tqScanData(STQ* pTq, const STqHandle* pHandle, SMqDataRsp* pRsp, STqOffsetVal* pOffset) { const STqExecHandle* pExec = &pHandle->execHandle; - ASSERT(pExec->subType == TOPIC_SUB_TYPE__COLUMN); qTaskInfo_t task = pExec->task; @@ -87,7 +85,8 @@ int32_t tqScanData(STQ* pTq, const STqHandle* pHandle, SMqDataRsp* pRsp, STqOffs uint64_t ts = 0; tqDebug("vgId:%d, tmq task start to execute", pTq->pVnode->config.vgId); if (qExecTask(task, &pDataBlock, &ts) < 0) { - ASSERT(0); + tqError("vgId:%d task exec error since %s", pTq->pVnode->config.vgId, terrstr()); + return -1; } tqDebug("vgId:%d, tmq task executed, get %p", pTq->pVnode->config.vgId, pDataBlock); @@ -105,10 +104,14 @@ int32_t tqScanData(STQ* pTq, const STqHandle* pHandle, SMqDataRsp* pRsp, STqOffs } if (qStreamExtractOffset(task, &pRsp->rspOffset) < 0) { - ASSERT(0); return -1; } - ASSERT(pRsp->rspOffset.type != 0); + + if (pRsp->rspOffset.type == 0) { + tqError("expected rsp offset: type %d %" PRId64 " %" PRId64 " %" PRId64, pRsp->rspOffset.type, pRsp->rspOffset.ts, + pRsp->rspOffset.uid, pRsp->rspOffset.version); + return -1; + } if (pRsp->withTbName) { if (pRsp->rspOffset.type == TMQ_OFFSET__LOG) { @@ -118,7 +121,6 @@ int32_t tqScanData(STQ* pTq, const STqHandle* pHandle, SMqDataRsp* pRsp, STqOffs pRsp->withTbName = false; } } - ASSERT(pRsp->withSchema == false); return 0; } @@ -148,7 +150,8 @@ int32_t tqScanTaosx(STQ* pTq, const STqHandle* pHandle, STaosxRsp* pRsp, SMqMeta uint64_t ts = 0; tqDebug("tmqsnap task start to execute"); if (qExecTask(task, &pDataBlock, &ts) < 0) { - ASSERT(0); + tqError("vgId:%d task exec error since %s", pTq->pVnode->config.vgId, terrstr()); + return -1; } tqDebug("tmqsnap task execute end, get %p", pDataBlock); @@ -215,17 +218,20 @@ int32_t tqScanTaosx(STQ* pTq, const STqHandle* pHandle, STaosxRsp* pRsp, SMqMeta break; } - if (qStreamExtractOffset(task, &pRsp->rspOffset) < 0) { - ASSERT(0); + qStreamExtractOffset(task, &pRsp->rspOffset); + + if (pRsp->rspOffset.type == 0) { + tqError("expected rsp offset: type %d %" PRId64 " %" PRId64 " %" PRId64, pRsp->rspOffset.type, pRsp->rspOffset.ts, + pRsp->rspOffset.uid, pRsp->rspOffset.version); + return -1; } - ASSERT(pRsp->rspOffset.type != 0); return 0; } int32_t tqTaosxScanLog(STQ* pTq, STqHandle* pHandle, SSubmitReq* pReq, STaosxRsp* pRsp) { STqExecHandle* pExec = &pHandle->execHandle; - ASSERT(pExec->subType != TOPIC_SUB_TYPE__COLUMN); + /*A(pExec->subType != TOPIC_SUB_TYPE__COLUMN);*/ SArray* pBlocks = taosArrayInit(0, sizeof(SSDataBlock)); SArray* pSchemas = taosArrayInit(0, sizeof(void*)); diff --git a/source/dnode/vnode/src/tq/tqMeta.c b/source/dnode/vnode/src/tq/tqMeta.c index f476f58b56..05ed8d7348 100644 --- a/source/dnode/vnode/src/tq/tqMeta.c +++ b/source/dnode/vnode/src/tq/tqMeta.c @@ -71,17 +71,14 @@ int32_t tDecodeSTqHandle(SDecoder* pDecoder, STqHandle* pHandle) { int32_t tqMetaOpen(STQ* pTq) { if (tdbOpen(pTq->path, 16 * 1024, 1, &pTq->pMetaDB, 0) < 0) { - ASSERT(0); return -1; } if (tdbTbOpen("tq.db", -1, -1, NULL, pTq->pMetaDB, &pTq->pExecStore, 0) < 0) { - ASSERT(0); return -1; } if (tdbTbOpen("tq.check.db", -1, -1, NULL, pTq->pMetaDB, &pTq->pCheckStore, 0) < 0) { - ASSERT(0); return -1; } @@ -135,19 +132,19 @@ int32_t tqMetaDeleteCheckInfo(STQ* pTq, const char* key) { if (tdbBegin(pTq->pMetaDB, &txn, tdbDefaultMalloc, tdbDefaultFree, NULL, TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED) < 0) { - ASSERT(0); + return -1; } if (tdbTbDelete(pTq->pCheckStore, key, (int)strlen(key), txn) < 0) { - /*ASSERT(0);*/ + tqWarn("vgId:%d, tq try delete checkinfo failed %s", pTq->pVnode->config.vgId, key); } if (tdbCommit(pTq->pMetaDB, txn) < 0) { - ASSERT(0); + return -1; } if (tdbPostCommit(pTq->pMetaDB, txn) < 0) { - ASSERT(0); + return -1; } return 0; @@ -156,7 +153,6 @@ int32_t tqMetaDeleteCheckInfo(STQ* pTq, const char* key) { int32_t tqMetaRestoreCheckInfo(STQ* pTq) { TBC* pCur = NULL; if (tdbTbcOpen(pTq->pCheckStore, &pCur, NULL) < 0) { - ASSERT(0); return -1; } @@ -197,40 +193,42 @@ int32_t tqMetaSaveHandle(STQ* pTq, const char* key, const STqHandle* pHandle) { int32_t code; int32_t vlen; tEncodeSize(tEncodeSTqHandle, pHandle, vlen, code); - ASSERT(code == 0); + if (code < 0) { + return -1; + } tqDebug("tq save %s(%d) consumer %" PRId64 " vgId:%d", pHandle->subKey, (int32_t)strlen(pHandle->subKey), pHandle->consumerId, TD_VID(pTq->pVnode)); void* buf = taosMemoryCalloc(1, vlen); if (buf == NULL) { - ASSERT(0); + return -1; } SEncoder encoder; tEncoderInit(&encoder, buf, vlen); if (tEncodeSTqHandle(&encoder, pHandle) < 0) { - ASSERT(0); + return -1; } TXN* txn; if (tdbBegin(pTq->pMetaDB, &txn, tdbDefaultMalloc, tdbDefaultFree, NULL, TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED) < 0) { - ASSERT(0); + return -1; } if (tdbTbUpsert(pTq->pExecStore, key, (int)strlen(key), buf, vlen, txn) < 0) { - ASSERT(0); + return -1; } if (tdbCommit(pTq->pMetaDB, txn) < 0) { - ASSERT(0); + return -1; } if (tdbPostCommit(pTq->pMetaDB, txn) < 0) { - ASSERT(0); + return -1; } tEncoderClear(&encoder); @@ -243,19 +241,18 @@ int32_t tqMetaDeleteHandle(STQ* pTq, const char* key) { if (tdbBegin(pTq->pMetaDB, &txn, tdbDefaultMalloc, tdbDefaultFree, NULL, TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED) < 0) { - ASSERT(0); + return -1; } if (tdbTbDelete(pTq->pExecStore, key, (int)strlen(key), txn) < 0) { - /*ASSERT(0);*/ } if (tdbCommit(pTq->pMetaDB, txn) < 0) { - ASSERT(0); + return -1; } if (tdbPostCommit(pTq->pMetaDB, txn) < 0) { - ASSERT(0); + return -1; } return 0; @@ -264,7 +261,6 @@ int32_t tqMetaDeleteHandle(STQ* pTq, const char* key) { int32_t tqMetaRestoreHandle(STQ* pTq) { TBC* pCur = NULL; if (tdbTbcOpen(pTq->pExecStore, &pCur, NULL) < 0) { - ASSERT(0); return -1; } @@ -284,7 +280,6 @@ int32_t tqMetaRestoreHandle(STQ* pTq) { handle.pRef = walOpenRef(pTq->pVnode->pWal); if (handle.pRef == NULL) { - ASSERT(0); return -1; } walRefVer(handle.pRef, handle.snapshotVer); @@ -300,12 +295,19 @@ int32_t tqMetaRestoreHandle(STQ* pTq) { if (handle.execHandle.subType == TOPIC_SUB_TYPE__COLUMN) { handle.execHandle.task = qCreateQueueExecTaskInfo(handle.execHandle.execCol.qmsg, &reader, &handle.execHandle.numOfCols, NULL); - ASSERT(handle.execHandle.task); + if (handle.execHandle.task == NULL) { + tqError("cannot create exec task for %s", handle.subKey); + return -1; + } void* scanner = NULL; qExtractStreamScanner(handle.execHandle.task, &scanner); - ASSERT(scanner); + if (scanner == NULL) { + tqError("cannot extract stream scanner for %s", handle.subKey); + } handle.execHandle.pExecReader = qExtractReaderFromStreamScanner(scanner); - ASSERT(handle.execHandle.pExecReader); + if (handle.execHandle.pExecReader == NULL) { + tqError("cannot extract exec reader for %s", handle.subKey); + } } else if (handle.execHandle.subType == TOPIC_SUB_TYPE__DB) { handle.pWalReader = walOpenReader(pTq->pVnode->pWal, NULL); handle.execHandle.pExecReader = tqOpenReader(pTq->pVnode); diff --git a/source/dnode/vnode/src/tq/tqOffset.c b/source/dnode/vnode/src/tq/tqOffset.c index dd56c165fd..5a4d414ab7 100644 --- a/source/dnode/vnode/src/tq/tqOffset.c +++ b/source/dnode/vnode/src/tq/tqOffset.c @@ -40,26 +40,23 @@ int32_t tqOffsetRestoreFromFile(STqOffsetStore* pStore, const char* fname) { if (code == 0) { break; } else { - ASSERT(0); - // TODO handle error + return -1; } } int32_t size = htonl(head.size); void* memBuf = taosMemoryCalloc(1, size); if ((code = taosReadFile(pFile, memBuf, size)) != size) { - ASSERT(0); - // TODO handle error + return -1; } STqOffset offset; SDecoder decoder; tDecoderInit(&decoder, memBuf, size); if (tDecodeSTqOffset(&decoder, &offset) < 0) { - ASSERT(0); + return -1; } tDecoderClear(&decoder); if (taosHashPut(pStore->pHash, offset.subKey, strlen(offset.subKey), &offset, sizeof(STqOffset)) < 0) { - ASSERT(0); - // TODO + return -1; } taosMemoryFree(memBuf); } @@ -85,7 +82,9 @@ STqOffsetStore* tqOffsetOpen(STQ* pTq) { } char* fname = tqOffsetBuildFName(pStore->pTq->path, 0); if (tqOffsetRestoreFromFile(pStore, fname) < 0) { - ASSERT(0); + taosMemoryFree(fname); + taosMemoryFree(pStore); + return NULL; } taosMemoryFree(fname); return pStore; @@ -124,7 +123,6 @@ int32_t tqOffsetCommitFile(STqOffsetStore* pStore) { const char* sysErrStr = strerror(errno); tqError("vgId:%d, cannot open file %s when commit offset since %s", pStore->pTq->pVnode->config.vgId, fname, sysErrStr); - ASSERT(0); return -1; } taosMemoryFree(fname); @@ -136,9 +134,7 @@ int32_t tqOffsetCommitFile(STqOffsetStore* pStore) { int32_t bodyLen; int32_t code; tEncodeSize(tEncodeSTqOffset, pOffset, bodyLen, code); - ASSERT(code == 0); if (code < 0) { - ASSERT(0); taosHashCancelIterate(pStore->pHash, pIter); return -1; } @@ -154,7 +150,6 @@ int32_t tqOffsetCommitFile(STqOffsetStore* pStore) { // write file int64_t writeLen; if ((writeLen = taosWriteFile(pFile, buf, totLen)) != totLen) { - ASSERT(0); tqError("write offset incomplete, len %d, write len %" PRId64, bodyLen, writeLen); taosHashCancelIterate(pStore->pHash, pIter); taosMemoryFree(buf); diff --git a/source/dnode/vnode/src/tq/tqOffsetSnapshot.c b/source/dnode/vnode/src/tq/tqOffsetSnapshot.c index b63ff8af1d..2413a792c6 100644 --- a/source/dnode/vnode/src/tq/tqOffsetSnapshot.c +++ b/source/dnode/vnode/src/tq/tqOffsetSnapshot.c @@ -56,24 +56,28 @@ int32_t tqOffsetSnapRead(STqOffsetReader* pReader, uint8_t** ppData) { TdFilePtr pFile = taosOpenFile(fname, TD_FILE_READ); if (pFile == NULL) { taosMemoryFree(fname); - return 0; + return -1; } int64_t sz = 0; if (taosStatFile(fname, &sz, NULL) < 0) { - ASSERT(0); + taosCloseFile(&pFile); + taosMemoryFree(fname); + return -1; } taosMemoryFree(fname); SSnapDataHdr* buf = taosMemoryCalloc(1, sz + sizeof(SSnapDataHdr)); if (buf == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; + taosCloseFile(&pFile); return terrno; } void* abuf = POINTER_SHIFT(buf, sizeof(SSnapDataHdr)); int64_t contLen = taosReadFile(pFile, abuf, sz); if (contLen != sz) { - ASSERT(0); + taosCloseFile(&pFile); + taosMemoryFree(buf); return -1; } buf->size = sz; @@ -122,14 +126,17 @@ int32_t tqOffsetWriterClose(STqOffsetWriter** ppWriter, int8_t rollback) { if (rollback) { if (taosRemoveFile(pWriter->fname) < 0) { - ASSERT(0); + taosMemoryFree(fname); + return -1; } } else { if (taosRenameFile(pWriter->fname, fname) < 0) { - ASSERT(0); + taosMemoryFree(fname); + return -1; } if (tqOffsetRestoreFromFile(pTq->pOffsetStore, fname) < 0) { - ASSERT(0); + taosMemoryFree(fname); + return -1; } } taosMemoryFree(fname); @@ -146,14 +153,13 @@ int32_t tqOffsetSnapWrite(STqOffsetWriter* pWriter, uint8_t* pData, uint32_t nDa TdFilePtr pFile = taosOpenFile(pWriter->fname, TD_FILE_CREATE | TD_FILE_WRITE); SSnapDataHdr* pHdr = (SSnapDataHdr*)pData; int64_t size = pHdr->size; - ASSERT(size == nData - sizeof(SSnapDataHdr)); if (pFile) { int64_t contLen = taosWriteFile(pFile, pHdr->data, size); if (contLen != size) { - ASSERT(0); + taosCloseFile(&pFile); + return -1; } } else { - ASSERT(0); return -1; } return 0; diff --git a/source/dnode/vnode/src/tq/tqPush.c b/source/dnode/vnode/src/tq/tqPush.c index f89bc20362..63c3c25218 100644 --- a/source/dnode/vnode/src/tq/tqPush.c +++ b/source/dnode/vnode/src/tq/tqPush.c @@ -25,9 +25,7 @@ void tqTmrRspFunc(void* param, void* tmrId) { static int32_t tqLoopExecFromQueue(STQ* pTq, STqHandle* pHandle, SStreamDataSubmit** ppSubmit, SMqDataRsp* pRsp) { SStreamDataSubmit* pSubmit = *ppSubmit; while (pSubmit != NULL) { - ASSERT(pSubmit->ver == pHandle->pushHandle.processedVer + 1); if (tqLogScanExec(pTq, &pHandle->execHandle, pSubmit->data, pRsp, 0) < 0) { - /*ASSERT(0);*/ } // update processed atomic_store_64(&pHandle->pushHandle.processedVer, pSubmit->ver); @@ -160,8 +158,7 @@ int32_t tqPushMsgNew(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_ if (msgType == TDMT_VND_SUBMIT) { tqLogScanExec(pTq, &pHandle->execHandle, pReq, &rsp, workerId); } else { - // TODO - ASSERT(0); + tqError("tq push unexpected msg type %d", msgType); } if (rsp.blockNum == 0) { @@ -169,9 +166,6 @@ int32_t tqPushMsgNew(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_ continue; } - ASSERT(taosArrayGetSize(rsp.blockData) == rsp.blockNum); - ASSERT(taosArrayGetSize(rsp.blockDataLen) == rsp.blockNum); - rsp.rspOffset = fetchOffset; int32_t tlen = sizeof(SMqRspHead) + tEncodeSMqDataBlkRsp(NULL, &rsp); @@ -263,7 +257,7 @@ int tqPushMsg(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_t ver) SSDataBlock* pDataBlock = NULL; uint64_t ts = 0; if (qExecTask(task, &pDataBlock, &ts) < 0) { - ASSERT(0); + tqDebug("vgId:%d, tq exec error since %s", pTq->pVnode->config.vgId, terrstr()); } if (pDataBlock == NULL) { @@ -282,7 +276,7 @@ int tqPushMsg(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_t ver) // remove from hash size_t kLen; void* key = taosHashGetKey(pIter, &kLen); - void* keyCopy = taosMemoryMalloc(kLen); + void* keyCopy = taosMemoryCalloc(1, kLen + 1); memcpy(keyCopy, key, kLen); taosArrayPush(cachedKeys, &keyCopy); @@ -296,7 +290,7 @@ int tqPushMsg(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_t ver) void* key = taosArrayGetP(cachedKeys, i); size_t kLen = *(size_t*)taosArrayGet(cachedKeyLens, i); if (taosHashRemove(pTq->pPushMgr, key, kLen) != 0) { - ASSERT(0); + tqError("vgId:%d, tq push hash remove key error, key: %s", pTq->pVnode->config.vgId, (char*)key); } } taosArrayDestroyP(cachedKeys, (FDelete)taosMemoryFree); diff --git a/source/dnode/vnode/src/tq/tqRead.c b/source/dnode/vnode/src/tq/tqRead.c index 46b31bc5b0..e5392bbd02 100644 --- a/source/dnode/vnode/src/tq/tqRead.c +++ b/source/dnode/vnode/src/tq/tqRead.c @@ -176,8 +176,6 @@ bool isValValidForTable(STqHandle* pHandle, SWalCont* pHead) { goto end; } realTbSuid = req.suid; - } else { - ASSERT(0); } end: @@ -206,7 +204,6 @@ int64_t tqFetchLog(STQ* pTq, STqHandle* pHandle, int64_t* fetchOffset, SWalCkHea code = walFetchBody(pHandle->pWalReader, ppCkHead); if (code < 0) { - ASSERT(0); *fetchOffset = offset; code = -1; goto END; @@ -220,7 +217,6 @@ int64_t tqFetchLog(STQ* pTq, STqHandle* pHandle, int64_t* fetchOffset, SWalCkHea if (IS_META_MSG(pHead->msgType)) { code = walFetchBody(pHandle->pWalReader, ppCkHead); if (code < 0) { - ASSERT(0); *fetchOffset = offset; code = -1; goto END; @@ -238,7 +234,6 @@ int64_t tqFetchLog(STQ* pTq, STqHandle* pHandle, int64_t* fetchOffset, SWalCkHea } code = walSkipFetchBody(pHandle->pWalReader, *ppCkHead); if (code < 0) { - ASSERT(0); *fetchOffset = offset; code = -1; goto END; @@ -297,11 +292,8 @@ void tqCloseReader(STqReader* pReader) { int32_t tqSeekVer(STqReader* pReader, int64_t ver) { if (walReadSeekVer(pReader->pWalReader, ver) < 0) { - ASSERT(pReader->pWalReader->curInvalid); - ASSERT(pReader->pWalReader->curVersion == ver); return -1; } - ASSERT(pReader->pWalReader->curVersion == ver); return 0; } @@ -317,7 +309,6 @@ int32_t tqNextBlock(STqReader* pReader, SFetchRet* ret) { ret->offset.version = pReader->ver; ret->fetchType = FETCH_TYPE__NONE; tqDebug("return offset %" PRId64 ", no more valid", ret->offset.version); - ASSERT(ret->offset.version >= 0); return -1; } void* body = pReader->pWalReader->pHead->head.body; @@ -340,7 +331,6 @@ int32_t tqNextBlock(STqReader* pReader, SFetchRet* ret) { memset(&ret->data, 0, sizeof(SSDataBlock)); int32_t code = tqRetrieveDataBlock(&ret->data, pReader); if (code != 0 || ret->data.info.rows == 0) { - ASSERT(0); continue; } ret->fetchType = FETCH_TYPE__DATA; @@ -351,7 +341,6 @@ int32_t tqNextBlock(STqReader* pReader, SFetchRet* ret) { if (fromProcessedMsg) { ret->offset.type = TMQ_OFFSET__LOG; ret->offset.version = pReader->ver; - ASSERT(pReader->ver >= 0); ret->fetchType = FETCH_TYPE__SEP; tqDebug("return offset %" PRId64 ", processed finish", ret->offset.version); return 0; @@ -434,7 +423,6 @@ bool tqNextDataBlockFilterOut(STqReader* pHandle, SHashObj* filterOutUids) { } if (pHandle->pBlock == NULL) return false; - ASSERT(pHandle->tbIdHash == NULL); void* ret = taosHashGet(filterOutUids, &pHandle->msgIter.uid, sizeof(int64_t)); if (ret == NULL) { return true; @@ -453,7 +441,6 @@ int32_t tqRetrieveDataBlock(SSDataBlock* pBlock, STqReader* pReader) { if (pReader->pSchema == NULL) { tqWarn("cannot found tsschema for table: uid:%" PRId64 " (suid:%" PRId64 "), version %d, possibly dropped table", pReader->msgIter.uid, pReader->msgIter.suid, pReader->cachedSchemaVer); - /*ASSERT(0);*/ pReader->cachedSchemaSuid = 0; terrno = TSDB_CODE_TQ_TABLE_SCHEMA_NOT_FOUND; return -1; @@ -464,7 +451,6 @@ int32_t tqRetrieveDataBlock(SSDataBlock* pBlock, STqReader* pReader) { if (pReader->pSchemaWrapper == NULL) { tqWarn("cannot found schema wrapper for table: suid:%" PRId64 ", version %d, possibly dropped table", pReader->msgIter.uid, pReader->cachedSchemaVer); - /*ASSERT(0);*/ pReader->cachedSchemaSuid = 0; terrno = TSDB_CODE_TQ_TABLE_SCHEMA_NOT_FOUND; return -1; @@ -567,7 +553,6 @@ int32_t tqRetrieveTaosxBlock(STqReader* pReader, SArray* blocks, SArray* schemas if (pReader->pSchema == NULL) { tqWarn("cannot found tsschema for table: uid:%" PRId64 " (suid:%" PRId64 "), version %d, possibly dropped table", pReader->msgIter.uid, pReader->msgIter.suid, pReader->cachedSchemaVer); - /*ASSERT(0);*/ pReader->cachedSchemaSuid = 0; terrno = TSDB_CODE_TQ_TABLE_SCHEMA_NOT_FOUND; return -1; @@ -578,7 +563,6 @@ int32_t tqRetrieveTaosxBlock(STqReader* pReader, SArray* blocks, SArray* schemas if (pReader->pSchemaWrapper == NULL) { tqWarn("cannot found schema wrapper for table: suid:%" PRId64 ", version %d, possibly dropped table", pReader->msgIter.uid, pReader->cachedSchemaVer); - /*ASSERT(0);*/ pReader->cachedSchemaSuid = 0; terrno = TSDB_CODE_TQ_TABLE_SCHEMA_NOT_FOUND; return -1; @@ -671,8 +655,6 @@ int32_t tqRetrieveTaosxBlock(STqReader* pReader, SArray* blocks, SArray* schemas break; } - ASSERT(sVal.valType != TD_VTYPE_NONE); - if (colDataAppend(pColData, curRow, sVal.val, sVal.valType == TD_VTYPE_NULL) < 0) { goto FAIL; } @@ -732,8 +714,6 @@ int tqReaderAddTbUidList(STqReader* pReader, const SArray* tbUidList) { } int tqReaderRemoveTbUidList(STqReader* pReader, const SArray* tbUidList) { - ASSERT(pReader->tbIdHash != NULL); - for (int32_t i = 0; i < taosArrayGetSize(tbUidList); i++) { int64_t* pKey = (int64_t*)taosArrayGet(tbUidList, i); taosHashRemove(pReader->tbIdHash, pKey, sizeof(int64_t)); @@ -750,7 +730,10 @@ int32_t tqUpdateTbUidList(STQ* pTq, const SArray* tbUidList, bool isAdd) { STqHandle* pExec = (STqHandle*)pIter; if (pExec->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) { int32_t code = qUpdateQualifiedTableId(pExec->execHandle.task, tbUidList, isAdd); - ASSERT(code == 0); + if (code != 0) { + tqError("update qualified table error for %s", pExec->subKey); + continue; + } } else if (pExec->execHandle.subType == TOPIC_SUB_TYPE__DB) { if (!isAdd) { int32_t sz = taosArrayGetSize(tbUidList); @@ -769,7 +752,7 @@ int32_t tqUpdateTbUidList(STQ* pTq, const SArray* tbUidList, bool isAdd) { int32_t code = metaGetTableEntryByUidCache(&mr, *id); if (code != TSDB_CODE_SUCCESS) { - qError("failed to get table meta, uid:%" PRIu64 " code:%s", *id, tstrerror(terrno)); + tqError("failed to get table meta, uid:%" PRIu64 " code:%s", *id, tstrerror(terrno)); continue; } @@ -790,8 +773,6 @@ int32_t tqUpdateTbUidList(STQ* pTq, const SArray* tbUidList, bool isAdd) { } else { // TODO handle delete table from stb } - } else { - ASSERT(0); } } while (1) { @@ -800,7 +781,10 @@ int32_t tqUpdateTbUidList(STQ* pTq, const SArray* tbUidList, bool isAdd) { SStreamTask* pTask = *(SStreamTask**)pIter; if (pTask->taskLevel == TASK_LEVEL__SOURCE) { int32_t code = qUpdateQualifiedTableId(pTask->exec.executor, tbUidList, isAdd); - ASSERT(code == 0); + if (code != 0) { + tqError("update qualified table error for stream task %d", pTask->taskId); + continue; + } } } return 0; diff --git a/source/dnode/vnode/src/tq/tqSink.c b/source/dnode/vnode/src/tq/tqSink.c index 5907be576a..078b50db68 100644 --- a/source/dnode/vnode/src/tq/tqSink.c +++ b/source/dnode/vnode/src/tq/tqSink.c @@ -19,7 +19,6 @@ int32_t tqBuildDeleteReq(SVnode* pVnode, const char* stbFullName, const SSDataBlock* pDataBlock, SBatchDeleteReq* deleteReq) { - ASSERT(pDataBlock->info.type == STREAM_DELETE_RESULT); int32_t totRow = pDataBlock->info.rows; SColumnInfoData* pStartTsCol = taosArrayGet(pDataBlock->pDataBlock, START_TS_COLUMN_INDEX); SColumnInfoData* pEndTsCol = taosArrayGet(pDataBlock->pDataBlock, END_TS_COLUMN_INDEX); @@ -334,8 +333,8 @@ void tqSinkToTablePipeline(SStreamTask* pTask, void* vnode, int64_t ver, void* d int32_t code; tEncodeSize(tEncodeSBatchDeleteReq, &deleteReq, len, code); if (code < 0) { - // - ASSERT(0); + terrno = TSDB_CODE_OUT_OF_MEMORY; + return; } SEncoder encoder; void* serializedDeleteReq = rpcMallocCont(len + sizeof(SMsgHead)); @@ -559,7 +558,6 @@ void tqSinkToTableMerge(SStreamTask* pTask, void* vnode, int64_t ver, void* data tqDebug("vgId:%d, task %d write into table, block num: %d", TD_VID(pVnode), pTask->taskId, (int32_t)pRes->size); - ASSERT(pTask->tbSink.pTSchema); deleteReq.deleteReqs = taosArrayInit(0, sizeof(SSingleDeleteReq)); SSubmitReq* submitReq = tqBlockToSubmit(pVnode, pRes, pTask->tbSink.pTSchema, pTask->tbSink.pSchemaWrapper, true, pTask->tbSink.stbUid, pTask->tbSink.stbFullName, &deleteReq); @@ -570,10 +568,6 @@ void tqSinkToTableMerge(SStreamTask* pTask, void* vnode, int64_t ver, void* data int32_t code; int32_t len; tEncodeSize(tEncodeSBatchDeleteReq, &deleteReq, len, code); - if (code < 0) { - // - ASSERT(0); - } SEncoder encoder; void* serializedDeleteReq = rpcMallocCont(len + sizeof(SMsgHead)); void* abuf = POINTER_SHIFT(serializedDeleteReq, sizeof(SMsgHead)); diff --git a/source/dnode/vnode/src/tq/tqSnapshot.c b/source/dnode/vnode/src/tq/tqSnapshot.c index d811d943ed..ab7093a701 100644 --- a/source/dnode/vnode/src/tq/tqSnapshot.c +++ b/source/dnode/vnode/src/tq/tqSnapshot.c @@ -100,8 +100,6 @@ int32_t tqSnapRead(STqSnapReader* pReader, uint8_t** ppData) { } } - ASSERT(pVal && vLen); - *ppData = taosMemoryMalloc(sizeof(SSnapDataHdr) + vLen); if (*ppData == NULL) { code = TSDB_CODE_OUT_OF_MEMORY; diff --git a/source/dnode/vnode/src/tq/tqStreamStateSnap.c b/source/dnode/vnode/src/tq/tqStreamStateSnap.c index b1f00bdf74..ab7093a701 100644 --- a/source/dnode/vnode/src/tq/tqStreamStateSnap.c +++ b/source/dnode/vnode/src/tq/tqStreamStateSnap.c @@ -100,8 +100,6 @@ int32_t tqSnapRead(STqSnapReader* pReader, uint8_t** ppData) { } } - ASSERT(pVal && vLen); - *ppData = taosMemoryMalloc(sizeof(SSnapDataHdr) + vLen); if (*ppData == NULL) { code = TSDB_CODE_OUT_OF_MEMORY; @@ -168,7 +166,6 @@ int32_t tqSnapWriterClose(STqSnapWriter** ppWriter, int8_t rollback) { if (rollback) { tdbAbort(pWriter->pTq->pMetaDB, pWriter->txn); - ASSERT(0); } else { code = tdbCommit(pWriter->pTq->pMetaDB, pWriter->txn); if (code) goto _err; diff --git a/source/dnode/vnode/src/tq/tqStreamTaskSnap.c b/source/dnode/vnode/src/tq/tqStreamTaskSnap.c index 305378bc93..ab7093a701 100644 --- a/source/dnode/vnode/src/tq/tqStreamTaskSnap.c +++ b/source/dnode/vnode/src/tq/tqStreamTaskSnap.c @@ -100,8 +100,6 @@ int32_t tqSnapRead(STqSnapReader* pReader, uint8_t** ppData) { } } - ASSERT(pVal && vLen); - *ppData = taosMemoryMalloc(sizeof(SSnapDataHdr) + vLen); if (*ppData == NULL) { code = TSDB_CODE_OUT_OF_MEMORY; @@ -146,7 +144,7 @@ int32_t tqSnapWriterOpen(STQ* pTq, int64_t sver, int64_t ever, STqSnapWriter** p pWriter->sver = sver; pWriter->ever = ever; - if (tdbBegin(pTq->pMetaStore, &pWriter->txn, tdbDefaultMalloc, tdbDefaultFree, NULL, 0) < 0) { + if (tdbBegin(pTq->pMetaDB, &pWriter->txn, tdbDefaultMalloc, tdbDefaultFree, NULL, 0) < 0) { code = -1; taosMemoryFree(pWriter); goto _err; @@ -167,12 +165,11 @@ int32_t tqSnapWriterClose(STqSnapWriter** ppWriter, int8_t rollback) { STQ* pTq = pWriter->pTq; if (rollback) { - tdbAbort(pWriter->pTq->pMetaStore, pWriter->txn); - ASSERT(0); + tdbAbort(pWriter->pTq->pMetaDB, pWriter->txn); } else { - code = tdbCommit(pWriter->pTq->pMetaStore, pWriter->txn); + code = tdbCommit(pWriter->pTq->pMetaDB, pWriter->txn); if (code) goto _err; - code = tdbPostCommit(pWriter->pTq->pMetaStore, pWriter->txn); + code = tdbPostCommit(pWriter->pTq->pMetaDB, pWriter->txn); if (code) goto _err; } diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index d074ceede8..1414d3b4ab 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -768,8 +768,8 @@ static SSDataBlock* doTableScan(SOperatorInfo* pOperator) { tableListGetGroupList(pTaskInfo->pTableInfoList, pInfo->currentGroupId, &pList, &num); ASSERT(pInfo->base.dataReader == NULL); - int32_t code = tsdbReaderOpen(pInfo->base.readHandle.vnode, &pInfo->base.cond, pList, num, - pInfo->pResBlock, (STsdbReader**)&pInfo->base.dataReader, GET_TASKID(pTaskInfo)); + int32_t code = tsdbReaderOpen(pInfo->base.readHandle.vnode, &pInfo->base.cond, pList, num, pInfo->pResBlock, + (STsdbReader**)&pInfo->base.dataReader, GET_TASKID(pTaskInfo)); if (code != TSDB_CODE_SUCCESS) { T_LONG_JMP(pTaskInfo->env, code); } @@ -986,8 +986,8 @@ static SSDataBlock* readPreVersionData(SOperatorInfo* pTableScanOp, uint64_t tbU SSDataBlock* pBlock = pTableScanInfo->pResBlock; STsdbReader* pReader = NULL; - int32_t code = tsdbReaderOpen(pTableScanInfo->base.readHandle.vnode, &cond, &tblInfo, 1, pBlock, (STsdbReader**)&pReader, - GET_TASKID(pTaskInfo)); + int32_t code = tsdbReaderOpen(pTableScanInfo->base.readHandle.vnode, &cond, &tblInfo, 1, pBlock, + (STsdbReader**)&pReader, GET_TASKID(pTaskInfo)); if (code != TSDB_CODE_SUCCESS) { terrno = code; T_LONG_JMP(pTaskInfo->env, code); @@ -995,7 +995,7 @@ static SSDataBlock* readPreVersionData(SOperatorInfo* pTableScanOp, uint64_t tbU } if (tsdbNextDataBlock(pReader)) { - /*SSDataBlock* p = */tsdbRetrieveDataBlock(pReader, NULL); + /*SSDataBlock* p = */ tsdbRetrieveDataBlock(pReader, NULL); doSetTagColumnData(&pTableScanInfo->base, pBlock, pTaskInfo, pBlock->info.rows); pBlock->info.id.groupId = getTableGroupId(pTaskInfo->pTableInfoList, pBlock->info.id.uid); } @@ -1224,7 +1224,7 @@ static int32_t generateIntervalScanRange(SStreamScanInfo* pInfo, SSDataBlock* pS SColumnInfoData* pSrcUidCol = taosArrayGet(pSrcBlock->pDataBlock, UID_COLUMN_INDEX); SColumnInfoData* pSrcGpCol = taosArrayGet(pSrcBlock->pDataBlock, GROUPID_COLUMN_INDEX); - uint64_t* srcUidData = (uint64_t*)pSrcUidCol->pData; + uint64_t* srcUidData = (uint64_t*)pSrcUidCol->pData; ASSERT(pSrcStartTsCol->info.type == TSDB_DATA_TYPE_TIMESTAMP); TSKEY* srcStartTsCol = (TSKEY*)pSrcStartTsCol->pData; TSKEY* srcEndTsCol = (TSKEY*)pSrcEndTsCol->pData; @@ -1347,6 +1347,36 @@ static int32_t generateScanRange(SStreamScanInfo* pInfo, SSDataBlock* pSrcBlock, return code; } +#if 0 +void calBlockTag(SStreamScanInfo* pInfo, SSDataBlock* pBlock) { + SExprSupp* pTagCalSup = &pInfo->tagCalSup; + SStreamState* pState = pInfo->pStreamScanOp->pTaskInfo->streamInfo.pState; + if (pTagCalSup == NULL || pTagCalSup->numOfExprs == 0) return; + if (pBlock == NULL || pBlock->info.rows == 0) return; + + void* tag = NULL; + int32_t tagLen = 0; + if (streamStateGetParTag(pState, pBlock->info.id.groupId, &tag, &tagLen) == 0) { + pBlock->info.tagLen = tagLen; + void* pTag = taosMemoryRealloc(pBlock->info.pTag, tagLen); + if (pTag == NULL) { + tdbFree(tag); + taosMemoryFree(pBlock->info.pTag); + pBlock->info.pTag = NULL; + pBlock->info.tagLen = 0; + return; + } + pBlock->info.pTag = pTag; + memcpy(pBlock->info.pTag, tag, tagLen); + tdbFree(tag); + return; + } else { + pBlock->info.pTag = NULL; + } + tdbFree(tag); +} +#endif + void calBlockTbName(SStreamScanInfo* pInfo, SSDataBlock* pBlock) { SExprSupp* pTbNameCalSup = &pInfo->tbnameCalSup; SStreamState* pState = pInfo->pStreamScanOp->pTaskInfo->streamInfo.pState; @@ -1354,10 +1384,12 @@ void calBlockTbName(SStreamScanInfo* pInfo, SSDataBlock* pBlock) { if (pBlock == NULL || pBlock->info.rows == 0) return; void* tbname = NULL; - if (streamStateGetParName(pInfo->pStreamScanOp->pTaskInfo->streamInfo.pState, pBlock->info.id.groupId, &tbname) < 0) { - pBlock->info.parTbName[0] = 0; - } else { + if (streamStateGetParName(pState, pBlock->info.id.groupId, &tbname) == 0) { memcpy(pBlock->info.parTbName, tbname, TSDB_TABLE_NAME_LEN); + tdbFree(tbname); + return; + } else { + pBlock->info.parTbName[0] = 0; } tdbFree(tbname); @@ -2285,7 +2317,8 @@ SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhys if (pHandle->initTableReader) { pTSInfo->scanMode = TABLE_SCAN__TABLE_ORDER; pTSInfo->base.dataReader = NULL; - code = tsdbReaderOpen(pHandle->vnode, &pTSInfo->base.cond, pList, num, pTSInfo->pResBlock, &pTSInfo->base.dataReader, NULL); + code = tsdbReaderOpen(pHandle->vnode, &pTSInfo->base.cond, pList, num, pTSInfo->pResBlock, + &pTSInfo->base.dataReader, NULL); if (code != 0) { terrno = code; destroyTableScanOperatorInfo(pTableScanOp); @@ -2355,7 +2388,8 @@ SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhys pOperator->exprSupp.numOfExprs = taosArrayGetSize(pInfo->pRes->pDataBlock); __optr_fn_t nextFn = pTaskInfo->execModel == OPTR_EXEC_MODEL_STREAM ? doStreamScan : doQueueScan; - pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, nextFn, NULL, destroyStreamScanOperatorInfo, optrDefaultBufFn, NULL); + pOperator->fpSet = + createOperatorFpSet(optrDummyOpenFn, nextFn, NULL, destroyStreamScanOperatorInfo, optrDefaultBufFn, NULL); return pOperator; @@ -2492,7 +2526,8 @@ SOperatorInfo* createTagScanOperatorInfo(SReadHandle* pReadHandle, STagScanPhysi initResultSizeInfo(&pOperator->resultInfo, 4096); blockDataEnsureCapacity(pInfo->pRes, pOperator->resultInfo.capacity); - pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doTagScan, NULL, destroyTagScanOperatorInfo, optrDefaultBufFn, NULL); + pOperator->fpSet = + createOperatorFpSet(optrDummyOpenFn, doTagScan, NULL, destroyTagScanOperatorInfo, optrDefaultBufFn, NULL); return pOperator; @@ -2513,11 +2548,12 @@ static SSDataBlock* getTableDataBlockImpl(void* param) { SQueryTableDataCond* pQueryCond = taosArrayGet(pInfo->queryConds, readIdx); - int64_t st = taosGetTimestampUs(); - void* p = tableListGetInfo(pTaskInfo->pTableInfoList, readIdx + pInfo->tableStartIndex); + int64_t st = taosGetTimestampUs(); + void* p = tableListGetInfo(pTaskInfo->pTableInfoList, readIdx + pInfo->tableStartIndex); SReadHandle* pHandle = &pInfo->base.readHandle; - int32_t code = tsdbReaderOpen(pHandle->vnode, pQueryCond, p, 1, pBlock, &pInfo->base.dataReader, GET_TASKID(pTaskInfo)); + int32_t code = + tsdbReaderOpen(pHandle->vnode, pQueryCond, p, 1, pBlock, &pInfo->base.dataReader, GET_TASKID(pTaskInfo)); if (code != 0) { T_LONG_JMP(pTaskInfo->env, code); } @@ -2915,8 +2951,8 @@ static void buildVnodeGroupedNtbTableCount(STableCountScanOperatorInfo* SSDataBlock* pRes, char* dbName); static void buildVnodeFilteredTbCount(SOperatorInfo* pOperator, STableCountScanOperatorInfo* pInfo, STableCountScanSupp* pSupp, SSDataBlock* pRes, char* dbName); -static void buildVnodeGroupedTableCount(SOperatorInfo* pOperator, STableCountScanOperatorInfo* pInfo, - STableCountScanSupp* pSupp, SSDataBlock* pRes, int32_t vgId, char* dbName); +static void buildVnodeGroupedTableCount(SOperatorInfo* pOperator, STableCountScanOperatorInfo* pInfo, + STableCountScanSupp* pSupp, SSDataBlock* pRes, int32_t vgId, char* dbName); static SSDataBlock* buildVnodeDbTableCount(SOperatorInfo* pOperator, STableCountScanOperatorInfo* pInfo, STableCountScanSupp* pSupp, SSDataBlock* pRes); static void buildSysDbGroupedTableCount(SOperatorInfo* pOperator, STableCountScanOperatorInfo* pInfo, @@ -3041,8 +3077,8 @@ SOperatorInfo* createTableCountScanOperatorInfo(SReadHandle* readHandle, STableC setOperatorInfo(pOperator, "TableCountScanOperator", QUERY_NODE_PHYSICAL_PLAN_TABLE_COUNT_SCAN, false, OP_NOT_OPENED, pInfo, pTaskInfo); - pOperator->fpSet = - createOperatorFpSet(optrDummyOpenFn, doTableCountScan, NULL, destoryTableCountScanOperator, optrDefaultBufFn, NULL); + pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doTableCountScan, NULL, destoryTableCountScanOperator, + optrDefaultBufFn, NULL); return pOperator; _error: diff --git a/source/libs/stream/src/streamState.c b/source/libs/stream/src/streamState.c index af1d738de0..cf388da92c 100644 --- a/source/libs/stream/src/streamState.c +++ b/source/libs/stream/src/streamState.c @@ -159,6 +159,11 @@ SStreamState* streamStateOpen(char* path, SStreamTask* pTask, bool specPath, int goto _err; } + if (tdbTbOpen("partag.state.db", sizeof(int64_t), -1, NULL, pState->pTdbState->db, &pState->pTdbState->pParTagDb, 0) < + 0) { + goto _err; + } + if (streamStateBegin(pState) < 0) { goto _err; } @@ -173,6 +178,7 @@ _err: tdbTbClose(pState->pTdbState->pFillStateDb); tdbTbClose(pState->pTdbState->pSessionStateDb); tdbTbClose(pState->pTdbState->pParNameDb); + tdbTbClose(pState->pTdbState->pParTagDb); tdbClose(pState->pTdbState->db); streamStateDestroy(pState); return NULL; @@ -186,6 +192,7 @@ void streamStateClose(SStreamState* pState) { tdbTbClose(pState->pTdbState->pFillStateDb); tdbTbClose(pState->pTdbState->pSessionStateDb); tdbTbClose(pState->pTdbState->pParNameDb); + tdbTbClose(pState->pTdbState->pParTagDb); tdbClose(pState->pTdbState->db); streamStateDestroy(pState); @@ -821,10 +828,17 @@ _end: return res; } +int32_t streamStatePutParTag(SStreamState* pState, int64_t groupId, const void* tag, int32_t tagLen) { + return tdbTbUpsert(pState->pTdbState->pParTagDb, &groupId, sizeof(int64_t), tag, tagLen, pState->pTdbState->txn); +} + +int32_t streamStateGetParTag(SStreamState* pState, int64_t groupId, void** tagVal, int32_t* tagLen) { + return tdbTbGet(pState->pTdbState->pParTagDb, &groupId, sizeof(int64_t), tagVal, tagLen); +} + int32_t streamStatePutParName(SStreamState* pState, int64_t groupId, const char tbname[TSDB_TABLE_NAME_LEN]) { - tdbTbUpsert(pState->pTdbState->pParNameDb, &groupId, sizeof(int64_t), tbname, TSDB_TABLE_NAME_LEN, - pState->pTdbState->txn); - return 0; + return tdbTbUpsert(pState->pTdbState->pParNameDb, &groupId, sizeof(int64_t), tbname, TSDB_TABLE_NAME_LEN, + pState->pTdbState->txn); } int32_t streamStateGetParName(SStreamState* pState, int64_t groupId, void** pVal) { diff --git a/source/libs/wal/src/walMeta.c b/source/libs/wal/src/walMeta.c index 8e6628bb21..da8cf24627 100644 --- a/source/libs/wal/src/walMeta.c +++ b/source/libs/wal/src/walMeta.c @@ -25,7 +25,7 @@ bool FORCE_INLINE walLogExist(SWal* pWal, int64_t ver) { } bool FORCE_INLINE walIsEmpty(SWal* pWal) { - return (pWal->vers.firstVer == -1 || pWal->vers.lastVer < pWal->vers.firstVer); // [firstVer, lastVer + 1) + return (pWal->vers.firstVer == -1 || pWal->vers.lastVer < pWal->vers.firstVer); // [firstVer, lastVer + 1) } int64_t FORCE_INLINE walGetFirstVer(SWal* pWal) { return pWal->vers.firstVer; } @@ -49,7 +49,6 @@ static FORCE_INLINE int walBuildTmpMetaName(SWal* pWal, char* buf) { static FORCE_INLINE int64_t walScanLogGetLastVer(SWal* pWal, int32_t fileIdx) { int32_t sz = taosArrayGetSize(pWal->fileInfoSet); terrno = TSDB_CODE_SUCCESS; - ASSERT(fileIdx >= 0 && fileIdx < sz); SWalFileInfo* pFileInfo = taosArrayGet(pWal->fileInfoSet, fileIdx); char fnameStr[WAL_FILE_LEN]; @@ -101,7 +100,6 @@ static FORCE_INLINE int64_t walScanLogGetLastVer(SWal* pWal, int32_t fileIdx) { offsetBackward = offset; } - ASSERT(offset <= end); readSize = end - offset; capacity = readSize + sizeof(magic); @@ -257,7 +255,6 @@ static void walRebuildFileInfoSet(SArray* metaLogList, SArray* actualLogList) { SWalFileInfo* pLogInfo = taosArrayGet(actualLogList, i); while (j < metaFileNum) { SWalFileInfo* pMetaInfo = taosArrayGet(metaLogList, j); - ASSERT(pMetaInfo != NULL); if (pMetaInfo->firstVer < pLogInfo->firstVer) { j++; } else if (pMetaInfo->firstVer == pLogInfo->firstVer) { @@ -385,7 +382,6 @@ int walCheckAndRepairMeta(SWal* pWal) { taosArrayDestroy(actualLog); int32_t sz = taosArrayGetSize(pWal->fileInfoSet); - ASSERT(sz == actualFileNum); // scan and determine the lastVer int32_t fileIdx = sz; @@ -403,8 +399,6 @@ int walCheckAndRepairMeta(SWal* pWal) { return -1; } - ASSERT(pFileInfo->firstVer >= 0); - if (pFileInfo->lastVer >= pFileInfo->firstVer && fileSize == pFileInfo->fileSize) { totSize += pFileInfo->fileSize; continue; @@ -417,7 +411,6 @@ int walCheckAndRepairMeta(SWal* pWal) { wError("failed to scan wal last ver since %s", terrstr()); return -1; } - ASSERT(pFileInfo->fileSize == 0); // remove the empty wal log, and its idx wInfo("vgId:%d, wal remove empty file %s", pWal->cfg.vgId, fnameStr); taosRemoveFile(fnameStr); @@ -477,8 +470,7 @@ int walReadLogHead(TdFilePtr pLogFile, int64_t offset, SWalCkHead* pCkHead) { } int walCheckAndRepairIdxFile(SWal* pWal, int32_t fileIdx) { - int32_t sz = taosArrayGetSize(pWal->fileInfoSet); - ASSERT(fileIdx >= 0 && fileIdx < sz); + int32_t sz = taosArrayGetSize(pWal->fileInfoSet); SWalFileInfo* pFileInfo = taosArrayGet(pWal->fileInfoSet, fileIdx); char fnameStr[WAL_FILE_LEN]; walBuildIdxName(pWal, pFileInfo->firstVer, fnameStr); @@ -492,7 +484,6 @@ int walCheckAndRepairIdxFile(SWal* pWal, int32_t fileIdx) { return -1; } - ASSERT(pFileInfo->fileSize > 0 && pFileInfo->firstVer >= 0 && pFileInfo->lastVer >= pFileInfo->firstVer); if (fileSize == (pFileInfo->lastVer - pFileInfo->firstVer + 1) * sizeof(SWalIdxEntry)) { return 0; } @@ -556,7 +547,7 @@ int walCheckAndRepairIdxFile(SWal* pWal, int32_t fileIdx) { } offset += sizeof(SWalIdxEntry); - ASSERT(offset == (idxEntry.ver - pFileInfo->firstVer + 1) * sizeof(SWalIdxEntry)); + /*A(offset == (idxEntry.ver - pFileInfo->firstVer + 1) * sizeof(SWalIdxEntry));*/ // ftruncate idx file if (offset < fileSize) { @@ -577,7 +568,7 @@ int walCheckAndRepairIdxFile(SWal* pWal, int32_t fileIdx) { } while (idxEntry.ver < pFileInfo->lastVer) { - ASSERT(idxEntry.ver == ckHead.head.version); + /*A(idxEntry.ver == ckHead.head.version);*/ idxEntry.ver += 1; idxEntry.offset += sizeof(SWalCkHead) + ckHead.head.bodyLen; @@ -649,8 +640,7 @@ int walRollFileInfo(SWal* pWal) { } char* walMetaSerialize(SWal* pWal) { - char buf[30]; - ASSERT(pWal->fileInfoSet); + char buf[30]; int sz = taosArrayGetSize(pWal->fileInfoSet); cJSON* pRoot = cJSON_CreateObject(); cJSON* pMeta = cJSON_CreateObject(); @@ -707,7 +697,7 @@ char* walMetaSerialize(SWal* pWal) { } int walMetaDeserialize(SWal* pWal, const char* bytes) { - ASSERT(taosArrayGetSize(pWal->fileInfoSet) == 0); + /*A(taosArrayGetSize(pWal->fileInfoSet) == 0);*/ cJSON *pRoot, *pMeta, *pFiles, *pInfoJson, *pField; pRoot = cJSON_Parse(bytes); if (!pRoot) goto _err; @@ -823,7 +813,9 @@ int walSaveMeta(SWal* pWal) { // flush to a tmpfile n = walBuildTmpMetaName(pWal, tmpFnameStr); - ASSERT(n < sizeof(tmpFnameStr) && "Buffer overflow of file name"); + if (n >= sizeof(tmpFnameStr)) { + return -1; + } TdFilePtr pMetaFile = taosOpenFile(tmpFnameStr, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_TRUNC); if (pMetaFile == NULL) { @@ -854,7 +846,9 @@ int walSaveMeta(SWal* pWal) { // rename it n = walBuildMetaName(pWal, metaVer + 1, fnameStr); - ASSERT(n < sizeof(fnameStr) && "Buffer overflow of file name"); + if (n >= sizeof(fnameStr)) { + goto _err; + } if (taosRenameFile(tmpFnameStr, fnameStr) < 0) { wError("failed to rename file due to %s. dest:%s", strerror(errno), fnameStr); @@ -877,7 +871,6 @@ _err: } int walLoadMeta(SWal* pWal) { - ASSERT(pWal->fileInfoSet->size == 0); // find existing meta file int metaVer = walFindCurMetaVer(pWal); if (metaVer == -1) { diff --git a/source/libs/wal/src/walRead.c b/source/libs/wal/src/walRead.c index ed02d29e3b..a89c4bb546 100644 --- a/source/libs/wal/src/walRead.c +++ b/source/libs/wal/src/walRead.c @@ -97,7 +97,6 @@ int32_t walNextValidMsg(SWalReader *pReader) { return -1; } fetchVer++; - ASSERT(fetchVer == pReader->curVersion); } } pReader->curStopped = 1; @@ -132,7 +131,6 @@ static int64_t walReadSeekFilePos(SWalReader *pReader, int64_t fileFirstVer, int return -1; } - ASSERT(entry.ver == ver); ret = taosLSeekFile(pLogTFile, entry.offset, SEEK_SET); if (ret < 0) { terrno = TAOS_SYSTEM_ERROR(errno); @@ -241,7 +239,6 @@ static int32_t walFetchHeadNew(SWalReader *pRead, int64_t fetchVer) { if (pRead->curInvalid || pRead->curVersion != fetchVer) { if (walReadSeekVer(pRead, fetchVer) < 0) { - ASSERT(0); pRead->curVersion = fetchVer; pRead->curInvalid = 1; return -1; @@ -262,7 +259,6 @@ static int32_t walFetchHeadNew(SWalReader *pRead, int64_t fetchVer) { } else { terrno = TSDB_CODE_WAL_FILE_CORRUPTED; } - ASSERT(0); pRead->curInvalid = 1; return -1; } @@ -299,7 +295,6 @@ static int32_t walFetchBodyNew(SWalReader *pRead) { terrno = TSDB_CODE_WAL_FILE_CORRUPTED; } pRead->curInvalid = 1; - ASSERT(0); return -1; } @@ -308,7 +303,6 @@ static int32_t walFetchBodyNew(SWalReader *pRead) { pRead->pHead->head.version, ver); pRead->curInvalid = 1; terrno = TSDB_CODE_WAL_FILE_CORRUPTED; - ASSERT(0); return -1; } @@ -316,7 +310,6 @@ static int32_t walFetchBodyNew(SWalReader *pRead) { wError("vgId:%d, wal fetch body error:%" PRId64 ", since body checksum not passed", pRead->pWal->cfg.vgId, ver); pRead->curInvalid = 1; terrno = TSDB_CODE_WAL_FILE_CORRUPTED; - ASSERT(0); return -1; } @@ -328,14 +321,10 @@ static int32_t walFetchBodyNew(SWalReader *pRead) { static int32_t walSkipFetchBodyNew(SWalReader *pRead) { int64_t code; - ASSERT(pRead->curVersion == pRead->pHead->head.version); - ASSERT(pRead->curInvalid == 0); - code = taosLSeekFile(pRead->pLogFile, pRead->pHead->head.bodyLen, SEEK_CUR); if (code < 0) { terrno = TAOS_SYSTEM_ERROR(errno); pRead->curInvalid = 1; - ASSERT(0); return -1; } @@ -384,7 +373,6 @@ int32_t walFetchHead(SWalReader *pRead, int64_t ver, SWalCkHead *pHead) { } else { terrno = TSDB_CODE_WAL_FILE_CORRUPTED; } - ASSERT(0); pRead->curInvalid = 1; return -1; } @@ -410,9 +398,6 @@ int32_t walSkipFetchBody(SWalReader *pRead, const SWalCkHead *pHead) { pRead->pWal->cfg.vgId, pHead->head.version, pRead->pWal->vers.firstVer, pRead->pWal->vers.commitVer, pRead->pWal->vers.lastVer, pRead->pWal->vers.appliedVer); - ASSERT(pRead->curVersion == pHead->head.version); - ASSERT(pRead->curInvalid == 0); - code = taosLSeekFile(pRead->pLogFile, pHead->head.bodyLen, SEEK_CUR); if (code < 0) { terrno = TAOS_SYSTEM_ERROR(errno); @@ -447,7 +432,6 @@ int32_t walFetchBody(SWalReader *pRead, SWalCkHead **ppHead) { if (pReadHead->bodyLen != taosReadFile(pRead->pLogFile, pReadHead->body, pReadHead->bodyLen)) { if (pReadHead->bodyLen < 0) { - ASSERT(0); terrno = TAOS_SYSTEM_ERROR(errno); wError("vgId:%d, wal fetch body error:%" PRId64 ", read request index:%" PRId64 ", since %s", pRead->pWal->cfg.vgId, pReadHead->version, ver, tstrerror(terrno)); @@ -457,12 +441,10 @@ int32_t walFetchBody(SWalReader *pRead, SWalCkHead **ppHead) { terrno = TSDB_CODE_WAL_FILE_CORRUPTED; } pRead->curInvalid = 1; - ASSERT(0); return -1; } if (pReadHead->version != ver) { - ASSERT(0); wError("vgId:%d, wal fetch body error, index:%" PRId64 ", read request index:%" PRId64, pRead->pWal->cfg.vgId, pReadHead->version, ver); pRead->curInvalid = 1; @@ -471,7 +453,6 @@ int32_t walFetchBody(SWalReader *pRead, SWalCkHead **ppHead) { } if (walValidBodyCksum(*ppHead) != 0) { - ASSERT(0); wError("vgId:%d, wal fetch body error, index:%" PRId64 ", since body checksum not passed", pRead->pWal->cfg.vgId, ver); pRead->curInvalid = 1; diff --git a/source/libs/wal/src/walRef.c b/source/libs/wal/src/walRef.c index e86111109c..fa04ba3e58 100644 --- a/source/libs/wal/src/walRef.c +++ b/source/libs/wal/src/walRef.c @@ -61,7 +61,7 @@ int32_t walRefVer(SWalRef *pRef, int64_t ver) { SWalFileInfo tmpInfo; tmpInfo.firstVer = ver; SWalFileInfo *pRet = taosArraySearch(pWal->fileInfoSet, &tmpInfo, compareWalFileInfo, TD_LE); - ASSERT(pRet != NULL); + /*A(pRet != NULL);*/ pRef->refFile = pRet->firstVer; taosThreadMutexUnlock(&pWal->mutex); @@ -80,6 +80,7 @@ void walUnrefVer(SWalRef *pRef) { SWalRef *walRefCommittedVer(SWal *pWal) { SWalRef *pRef = walOpenRef(pWal); if (pRef == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; return NULL; } taosThreadMutexLock(&pWal->mutex); @@ -91,7 +92,7 @@ SWalRef *walRefCommittedVer(SWal *pWal) { SWalFileInfo tmpInfo; tmpInfo.firstVer = ver; SWalFileInfo *pRet = taosArraySearch(pWal->fileInfoSet, &tmpInfo, compareWalFileInfo, TD_LE); - ASSERT(pRet != NULL); + /*A(pRet != NULL);*/ pRef->refFile = pRet->firstVer; taosThreadMutexUnlock(&pWal->mutex); diff --git a/source/libs/wal/src/walSeek.c b/source/libs/wal/src/walSeek.c index 2cb6614b01..cbfd0ef741 100644 --- a/source/libs/wal/src/walSeek.c +++ b/source/libs/wal/src/walSeek.c @@ -40,7 +40,6 @@ static int64_t walSeekWritePos(SWal* pWal, int64_t ver) { terrno = TAOS_SYSTEM_ERROR(errno); return -1; } - ASSERT(entry.ver == ver); code = taosLSeekFile(pLogTFile, entry.offset, SEEK_SET); if (code < 0) { terrno = TAOS_SYSTEM_ERROR(errno); @@ -53,8 +52,7 @@ static int64_t walSeekWritePos(SWal* pWal, int64_t ver) { int walInitWriteFile(SWal* pWal) { TdFilePtr pIdxTFile, pLogTFile; SWalFileInfo* pRet = taosArrayGetLast(pWal->fileInfoSet); - ASSERT(pRet != NULL); - int64_t fileFirstVer = pRet->firstVer; + int64_t fileFirstVer = pRet->firstVer; char fnameStr[WAL_FILE_LEN]; walBuildIdxName(pWal, fileFirstVer, fnameStr); @@ -109,9 +107,8 @@ int64_t walChangeWrite(SWal* pWal, int64_t ver) { tmpInfo.firstVer = ver; // bsearch in fileSet int32_t idx = taosArraySearchIdx(pWal->fileInfoSet, &tmpInfo, compareWalFileInfo, TD_LE); - ASSERT(idx != -1); + /*A(idx != -1);*/ SWalFileInfo* pFileInfo = taosArrayGet(pWal->fileInfoSet, idx); - /*ASSERT(pFileInfo != NULL);*/ int64_t fileFirstVer = pFileInfo->firstVer; walBuildIdxName(pWal, fileFirstVer, fnameStr); diff --git a/source/libs/wal/src/walWrite.c b/source/libs/wal/src/walWrite.c index a5c7bf1abd..4233c089a4 100644 --- a/source/libs/wal/src/walWrite.c +++ b/source/libs/wal/src/walWrite.c @@ -87,12 +87,10 @@ int32_t walApplyVer(SWal *pWal, int64_t ver) { } int32_t walCommit(SWal *pWal, int64_t ver) { - ASSERT(pWal->vers.commitVer >= pWal->vers.snapshotVer); - ASSERT(pWal->vers.commitVer <= pWal->vers.lastVer); if (ver < pWal->vers.commitVer) { return 0; } - if (ver > pWal->vers.lastVer) { + if (ver > pWal->vers.lastVer || pWal->vers.commitVer < pWal->vers.snapshotVer) { terrno = TSDB_CODE_WAL_INVALID_VER; return -1; } @@ -138,25 +136,21 @@ int32_t walRollback(SWal *pWal, int64_t ver) { TdFilePtr pIdxFile = taosOpenFile(fnameStr, TD_FILE_WRITE | TD_FILE_READ | TD_FILE_APPEND); if (pIdxFile == NULL) { - ASSERT(0); taosThreadMutexUnlock(&pWal->mutex); return -1; } int64_t idxOff = walGetVerIdxOffset(pWal, ver); code = taosLSeekFile(pIdxFile, idxOff, SEEK_SET); if (code < 0) { - ASSERT(0); taosThreadMutexUnlock(&pWal->mutex); return -1; } // read idx file and get log file pos SWalIdxEntry entry; if (taosReadFile(pIdxFile, &entry, sizeof(SWalIdxEntry)) != sizeof(SWalIdxEntry)) { - ASSERT(0); taosThreadMutexUnlock(&pWal->mutex); return -1; } - ASSERT(entry.ver == ver); walBuildLogName(pWal, walGetCurFileFirstVer(pWal), fnameStr); TdFilePtr pLogFile = taosOpenFile(fnameStr, TD_FILE_WRITE | TD_FILE_READ | TD_FILE_APPEND); @@ -176,24 +170,19 @@ int32_t walRollback(SWal *pWal, int64_t ver) { } // validate offset SWalCkHead head; - ASSERT(taosValidFile(pLogFile)); - int64_t size = taosReadFile(pLogFile, &head, sizeof(SWalCkHead)); + int64_t size = taosReadFile(pLogFile, &head, sizeof(SWalCkHead)); if (size != sizeof(SWalCkHead)) { - ASSERT(0); taosThreadMutexUnlock(&pWal->mutex); return -1; } code = walValidHeadCksum(&head); - ASSERT(code == 0); if (code != 0) { terrno = TSDB_CODE_WAL_FILE_CORRUPTED; - ASSERT(0); taosThreadMutexUnlock(&pWal->mutex); return -1; } if (head.head.version != ver) { - ASSERT(0); terrno = TSDB_CODE_WAL_FILE_CORRUPTED; taosThreadMutexUnlock(&pWal->mutex); return -1; @@ -202,22 +191,22 @@ int32_t walRollback(SWal *pWal, int64_t ver) { // truncate old files code = taosFtruncateFile(pLogFile, entry.offset); if (code < 0) { - ASSERT(0); terrno = TAOS_SYSTEM_ERROR(errno); taosThreadMutexUnlock(&pWal->mutex); return -1; } code = taosFtruncateFile(pIdxFile, idxOff); if (code < 0) { - ASSERT(0); terrno = TAOS_SYSTEM_ERROR(errno); taosThreadMutexUnlock(&pWal->mutex); return -1; } pWal->vers.lastVer = ver - 1; +#if 0 if (pWal->vers.lastVer < pWal->vers.firstVer) { - ASSERT(pWal->vers.lastVer == pWal->vers.firstVer - 1); + A(pWal->vers.lastVer == pWal->vers.firstVer - 1); } +#endif ((SWalFileInfo *)taosArrayGetLast(pWal->fileInfoSet))->lastVer = ver - 1; ((SWalFileInfo *)taosArrayGetLast(pWal->fileInfoSet))->fileSize = entry.offset; taosCloseFile(&pIdxFile); @@ -386,7 +375,8 @@ int32_t walEndSnapshot(SWal *pWal) { walBuildIdxName(pWal, pInfo->firstVer, fnameStr); wDebug("vgId:%d, wal remove file %s", pWal->cfg.vgId, fnameStr); if (taosRemoveFile(fnameStr) < 0 && errno != ENOENT) { - ASSERT(0); + wError("vgId:%d, failed to remove idx file %s due to %s", pWal->cfg.vgId, fnameStr, strerror(errno)); + goto END; } } taosArrayClear(pWal->toDeleteFiles); @@ -441,7 +431,6 @@ int32_t walRollImpl(SWal *pWal) { pWal->pIdxFile = pIdxFile; pWal->pLogFile = pLogFile; pWal->writeCur = taosArrayGetSize(pWal->fileInfoSet) - 1; - ASSERT(pWal->writeCur >= 0); pWal->lastRollSeq = walGetSeq(); @@ -458,8 +447,7 @@ END: static int32_t walWriteIndex(SWal *pWal, int64_t ver, int64_t offset) { SWalIdxEntry entry = {.ver = ver, .offset = offset}; SWalFileInfo *pFileInfo = walGetCurFileInfo(pWal); - ASSERT(pFileInfo != NULL); - ASSERT(pFileInfo->firstVer >= 0); + int64_t idxOffset = (entry.ver - pFileInfo->firstVer) * sizeof(SWalIdxEntry); wDebug("vgId:%d, write index, index:%" PRId64 ", offset:%" PRId64 ", at %" PRId64, pWal->cfg.vgId, ver, offset, idxOffset); @@ -476,7 +464,6 @@ static int32_t walWriteIndex(SWal *pWal, int64_t ver, int64_t offset) { if (endOffset < 0) { wFatal("vgId:%d, failed to seek end of idxfile due to %s. ver:%" PRId64 "", pWal->cfg.vgId, strerror(errno), ver); } - ASSERT(endOffset == idxOffset + sizeof(SWalIdxEntry) && "Offset of idx entries misaligned"); return 0; } @@ -486,9 +473,7 @@ static FORCE_INLINE int32_t walWriteImpl(SWal *pWal, int64_t index, tmsg_t msgTy int64_t offset = walGetCurFileOffset(pWal); SWalFileInfo *pFileInfo = walGetCurFileInfo(pWal); - ASSERT(pFileInfo != NULL); - ASSERT(pFileInfo->firstVer != -1); pWal->writeHead.head.version = index; pWal->writeHead.head.bodyLen = bodyLen; pWal->writeHead.head.msgType = msgType; @@ -525,7 +510,6 @@ static FORCE_INLINE int32_t walWriteImpl(SWal *pWal, int64_t index, tmsg_t msgTy // set status if (pWal->vers.firstVer == -1) { - ASSERT(index == 0); pWal->vers.firstVer = 0; } pWal->vers.lastVer = index; @@ -541,7 +525,6 @@ END: wFatal("vgId:%d, failed to ftruncate logfile to offset:%" PRId64 " during recovery due to %s", pWal->cfg.vgId, offset, strerror(errno)); terrno = TAOS_SYSTEM_ERROR(errno); - ASSERT(0 && "failed to recover from error"); } int64_t idxOffset = (index - pFileInfo->firstVer) * sizeof(SWalIdxEntry); @@ -549,7 +532,6 @@ END: wFatal("vgId:%d, failed to ftruncate idxfile to offset:%" PRId64 "during recovery due to %s", pWal->cfg.vgId, idxOffset, strerror(errno)); terrno = TAOS_SYSTEM_ERROR(errno); - ASSERT(0 && "failed to recover from error"); } return -1; } @@ -576,8 +558,6 @@ int64_t walAppendLog(SWal *pWal, int64_t index, tmsg_t msgType, SWalSyncInfo syn } } - ASSERT(pWal->pLogFile != NULL && pWal->pIdxFile != NULL && pWal->writeCur >= 0); - if (walWriteImpl(pWal, index, msgType, syncMeta, body, bodyLen) < 0) { taosThreadMutexUnlock(&pWal->mutex); return -1; @@ -614,8 +594,6 @@ int32_t walWriteWithSyncInfo(SWal *pWal, int64_t index, tmsg_t msgType, SWalSync } } - ASSERT(pWal->pIdxFile != NULL && pWal->pLogFile != NULL && pWal->writeCur >= 0); - if (walWriteImpl(pWal, index, msgType, syncMeta, body, bodyLen) < 0) { taosThreadMutexUnlock(&pWal->mutex); return -1;