diff --git a/include/common/tcommon.h b/include/common/tcommon.h index f74795a250..aad69862f0 100644 --- a/include/common/tcommon.h +++ b/include/common/tcommon.h @@ -195,7 +195,7 @@ typedef struct SDataBlockInfo { uint32_t capacity; SBlockID id; int16_t hasVarCol; - int16_t dataLoad; // denote if the data is loaded or not + int16_t dataLoad; // denote if the data is loaded or not // TODO: optimize and remove following int64_t version; // used for stream, and need serialization @@ -204,8 +204,9 @@ typedef struct SDataBlockInfo { STimeWindow calWin; // used for stream, do not serialize TSKEY watermark; // used for stream - char parTbName[TSDB_TABLE_NAME_LEN]; // used for stream partition - STag* pTag; // used for stream partition + char parTbName[TSDB_TABLE_NAME_LEN]; // used for stream partition + int32_t tagLen; + void* pTag; // used for stream partition } SDataBlockInfo; typedef struct SSDataBlock { @@ -239,22 +240,22 @@ typedef struct SVarColAttr { // pBlockAgg->numOfNull == info.rows, all data are null // pBlockAgg->numOfNull == 0, no data are null. typedef struct SColumnInfoData { - char* pData; // the corresponding block data in memory + char* pData; // the corresponding block data in memory union { char* nullbitmap; // bitmap, one bit for each item in the list SVarColAttr varmeta; }; - SColumnInfo info; // column info - bool hasNull; // if current column data has null value. + SColumnInfo info; // column info + bool hasNull; // if current column data has null value. } SColumnInfoData; typedef struct SQueryTableDataCond { uint64_t suid; - int32_t order; // desc|asc order to iterate the data block + int32_t order; // desc|asc order to iterate the data block int32_t numOfCols; SColumnInfo* colList; - int32_t* pSlotList; // the column output destation slot, and it may be null - int32_t type; // data block load type: + int32_t* pSlotList; // the column output destation slot, and it may be null + int32_t type; // data block load type: STimeWindow twindows; int64_t startVersion; int64_t endVersion; diff --git a/include/libs/stream/streamState.h b/include/libs/stream/streamState.h index 8fdac0da7f..d7bc151ecc 100644 --- a/include/libs/stream/streamState.h +++ b/include/libs/stream/streamState.h @@ -35,6 +35,7 @@ typedef struct STdbState { TTB* pFillStateDb; // todo refactor TTB* pSessionStateDb; TTB* pParNameDb; + TTB* pParTagDb; TXN* txn; } STdbState; @@ -108,6 +109,9 @@ int32_t streamStateCurPrev(SStreamState* pState, SStreamStateCur* pCur); int32_t streamStatePutParName(SStreamState* pState, int64_t groupId, const char* tbname); int32_t streamStateGetParName(SStreamState* pState, int64_t groupId, void** pVal); +int32_t streamStatePutParTag(SStreamState* pState, int64_t groupId, const void* tag, int32_t tagLen); +int32_t streamStateGetParTag(SStreamState* pState, int64_t groupId, void** tagVal, int32_t* tagLen); + #if 0 char* streamStateSessionDump(SStreamState* pState); #endif diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 4a941b3c20..222339d387 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -92,21 +92,21 @@ STQ* tqOpen(const char* path, SVnode* pVnode) { taosHashSetFreeFp(pTq->pCheckInfo, (FDelete)tDeleteSTqCheckInfo); if (tqMetaOpen(pTq) < 0) { - ASSERT(0); + return NULL; } pTq->pOffsetStore = tqOffsetOpen(pTq); if (pTq->pOffsetStore == NULL) { - ASSERT(0); + return NULL; } pTq->pStreamMeta = streamMetaOpen(path, pTq, (FTaskExpand*)tqExpandTask, pTq->pVnode->config.vgId); if (pTq->pStreamMeta == NULL) { - ASSERT(0); + return NULL; } if (streamLoadTasks(pTq->pStreamMeta) < 0) { - ASSERT(0); + return NULL; } return pTq; @@ -166,19 +166,17 @@ int32_t tqSendMetaPollRsp(STQ* pTq, const SRpcMsg* pMsg, const SMqPollReq* pReq, int32_t tqPushDataRsp(STQ* pTq, STqPushEntry* pPushEntry) { SMqDataRsp* pRsp = &pPushEntry->dataRsp; - ASSERT(taosArrayGetSize(pRsp->blockData) == pRsp->blockNum); - ASSERT(taosArrayGetSize(pRsp->blockDataLen) == pRsp->blockNum); +#if 0 + A(taosArrayGetSize(pRsp->blockData) == pRsp->blockNum); + A(taosArrayGetSize(pRsp->blockDataLen) == pRsp->blockNum); - ASSERT(!pRsp->withSchema); - ASSERT(taosArrayGetSize(pRsp->blockSchema) == 0); + A(!pRsp->withSchema); + A(taosArrayGetSize(pRsp->blockSchema) == 0); if (pRsp->reqOffset.type == TMQ_OFFSET__LOG) { - /*if (pRsp->blockNum > 0) {*/ - /*ASSERT(pRsp->rspOffset.version > pRsp->reqOffset.version);*/ - /*} else {*/ - ASSERT(pRsp->rspOffset.version > pRsp->reqOffset.version); - /*}*/ + A(pRsp->rspOffset.version > pRsp->reqOffset.version); } +#endif int32_t len = 0; int32_t code = 0; @@ -223,19 +221,21 @@ int32_t tqPushDataRsp(STQ* pTq, STqPushEntry* pPushEntry) { } int32_t tqSendDataRsp(STQ* pTq, const SRpcMsg* pMsg, const SMqPollReq* pReq, const SMqDataRsp* pRsp) { - ASSERT(taosArrayGetSize(pRsp->blockData) == pRsp->blockNum); - ASSERT(taosArrayGetSize(pRsp->blockDataLen) == pRsp->blockNum); +#if 0 + A(taosArrayGetSize(pRsp->blockData) == pRsp->blockNum); + A(taosArrayGetSize(pRsp->blockDataLen) == pRsp->blockNum); - ASSERT(!pRsp->withSchema); - ASSERT(taosArrayGetSize(pRsp->blockSchema) == 0); + A(!pRsp->withSchema); + A(taosArrayGetSize(pRsp->blockSchema) == 0); if (pRsp->reqOffset.type == TMQ_OFFSET__LOG) { if (pRsp->blockNum > 0) { - ASSERT(pRsp->rspOffset.version > pRsp->reqOffset.version); + A(pRsp->rspOffset.version > pRsp->reqOffset.version); } else { - ASSERT(pRsp->rspOffset.version >= pRsp->reqOffset.version); + A(pRsp->rspOffset.version >= pRsp->reqOffset.version); } } +#endif int32_t len = 0; int32_t code = 0; @@ -279,22 +279,24 @@ int32_t tqSendDataRsp(STQ* pTq, const SRpcMsg* pMsg, const SMqPollReq* pReq, con } int32_t tqSendTaosxRsp(STQ* pTq, const SRpcMsg* pMsg, const SMqPollReq* pReq, const STaosxRsp* pRsp) { - ASSERT(taosArrayGetSize(pRsp->blockData) == pRsp->blockNum); - ASSERT(taosArrayGetSize(pRsp->blockDataLen) == pRsp->blockNum); +#if 0 + A(taosArrayGetSize(pRsp->blockData) == pRsp->blockNum); + A(taosArrayGetSize(pRsp->blockDataLen) == pRsp->blockNum); if (pRsp->withSchema) { - ASSERT(taosArrayGetSize(pRsp->blockSchema) == pRsp->blockNum); + A(taosArrayGetSize(pRsp->blockSchema) == pRsp->blockNum); } else { - ASSERT(taosArrayGetSize(pRsp->blockSchema) == 0); + A(taosArrayGetSize(pRsp->blockSchema) == 0); } if (pRsp->reqOffset.type == TMQ_OFFSET__LOG) { if (pRsp->blockNum > 0) { - ASSERT(pRsp->rspOffset.version > pRsp->reqOffset.version); + A(pRsp->rspOffset.version > pRsp->reqOffset.version); } else { - ASSERT(pRsp->rspOffset.version >= pRsp->reqOffset.version); + A(pRsp->rspOffset.version >= pRsp->reqOffset.version); } } +#endif int32_t len = 0; int32_t code = 0; @@ -348,7 +350,6 @@ int32_t tqProcessOffsetCommitReq(STQ* pTq, int64_t version, char* msg, int32_t m SDecoder decoder; tDecoderInit(&decoder, msg, msgLen); if (tDecodeSTqOffset(&decoder, &offset) < 0) { - ASSERT(0); return -1; } tDecoderClear(&decoder); @@ -362,8 +363,8 @@ int32_t tqProcessOffsetCommitReq(STQ* pTq, int64_t version, char* msg, int32_t m if (offset.val.version + 1 == version) { offset.val.version += 1; } - } else { - ASSERT(0); + /*} else {*/ + /*A(0);*/ } STqOffset* pOffset = tqOffsetRead(pTq->pOffsetStore, offset.subKey); if (pOffset != NULL && tqOffsetLessOrEqual(&offset, pOffset)) { @@ -371,7 +372,6 @@ int32_t tqProcessOffsetCommitReq(STQ* pTq, int64_t version, char* msg, int32_t m } if (tqOffsetWrite(pTq->pOffsetStore, &offset) < 0) { - ASSERT(0); return -1; } @@ -434,7 +434,7 @@ static int32_t tqInitDataRsp(SMqDataRsp* pRsp, const SMqPollReq* pReq, int8_t su } #endif - ASSERT(subType == TOPIC_SUB_TYPE__COLUMN); + /*A(subType == TOPIC_SUB_TYPE__COLUMN);*/ pRsp->withSchema = false; return 0; @@ -473,7 +473,6 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) { // 1.find handle STqHandle* pHandle = taosHashGet(pTq->pHandle, req.subKey, strlen(req.subKey)); - /*ASSERT(pHandle);*/ if (pHandle == NULL) { tqError("tmq poll: no consumer handle for consumer:%" PRId64 ", in vgId:%d, subkey %s", consumerId, TD_VID(pTq->pVnode), req.subKey); @@ -560,7 +559,9 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) { tqInitDataRsp(&dataRsp, &req, pHandle->execHandle.subType); // lock taosWLockLatch(&pTq->pushLock); - tqScanData(pTq, pHandle, &dataRsp, &fetchOffsetNew); + if (tqScanData(pTq, pHandle, &dataRsp, &fetchOffsetNew) < 0) { + return -1; + } #if 1 if (dataRsp.blockNum == 0 && dataRsp.reqOffset.type == TMQ_OFFSET__LOG && @@ -599,7 +600,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) { } // for taosx - ASSERT(pHandle->execHandle.subType != TOPIC_SUB_TYPE__COLUMN); + /*A(pHandle->execHandle.subType != TOPIC_SUB_TYPE__COLUMN);*/ SMqMetaRsp metaRsp = {0}; @@ -607,7 +608,9 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) { tqInitTaosxRsp(&taosxRsp, &req); if (fetchOffsetNew.type != TMQ_OFFSET__LOG) { - tqScanTaosx(pTq, pHandle, &taosxRsp, &metaRsp, &fetchOffsetNew); + if (tqScanTaosx(pTq, pHandle, &taosxRsp, &metaRsp, &fetchOffsetNew) < 0) { + return -1; + } if (metaRsp.metaRspLen > 0) { if (tqSendMetaPollRsp(pTq, pMsg, &req, &metaRsp) < 0) { @@ -690,8 +693,8 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) { } } else { - ASSERT(pHandle->fetchMeta); - ASSERT(IS_META_MSG(pHead->msgType)); + /*A(pHandle->fetchMeta);*/ + /*A(IS_META_MSG(pHead->msgType));*/ tqDebug("fetch meta msg, ver:%" PRId64 ", type:%d", pHead->version, pHead->msgType); tqOffsetResetToLog(&metaRsp.rspOffset, fetchVer); metaRsp.resMsgType = pHead->msgType; @@ -808,7 +811,6 @@ int32_t tqProcessSubscribeReq(STQ* pTq, int64_t version, char* msg, int32_t msgL // TODO version should be assigned and refed during preprocess SWalRef* pRef = walRefCommittedVer(pTq->pVnode->pWal); if (pRef == NULL) { - ASSERT(0); return -1; } int64_t ver = pRef->refVer; @@ -829,12 +831,12 @@ int32_t tqProcessSubscribeReq(STQ* pTq, int64_t version, char* msg, int32_t msgL pHandle->execHandle.task = qCreateQueueExecTaskInfo(pHandle->execHandle.execCol.qmsg, &handle, &pHandle->execHandle.numOfCols, NULL); - ASSERT(pHandle->execHandle.task); + /*A(pHandle->execHandle.task);*/ void* scanner = NULL; qExtractStreamScanner(pHandle->execHandle.task, &scanner); - ASSERT(scanner); + /*A(scanner);*/ pHandle->execHandle.pExecReader = qExtractReaderFromStreamScanner(scanner); - ASSERT(pHandle->execHandle.pExecReader); + /*A(pHandle->execHandle.pExecReader);*/ } else if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__DB) { pHandle->pWalReader = walOpenReader(pTq->pVnode->pWal, NULL); pHandle->execHandle.pExecReader = tqOpenReader(pTq->pVnode); @@ -867,19 +869,14 @@ int32_t tqProcessSubscribeReq(STQ* pTq, int64_t version, char* msg, int32_t msgL taosHashPut(pTq->pHandle, req.subKey, strlen(req.subKey), pHandle, sizeof(STqHandle)); tqDebug("try to persist handle %s consumer %" PRId64, req.subKey, pHandle->consumerId); if (tqMetaSaveHandle(pTq, req.subKey, pHandle) < 0) { - // TODO - ASSERT(0); } } else { - /*ASSERT(pExec->consumerId == req.oldConsumerId);*/ // TODO handle qmsg and exec modification atomic_store_32(&pHandle->epoch, -1); atomic_store_64(&pHandle->consumerId, req.newConsumerId); atomic_add_fetch_32(&pHandle->epoch, 1); taosMemoryFree(req.qmsg); if (tqMetaSaveHandle(pTq, req.subKey, pHandle) < 0) { - // TODO - ASSERT(0); } // close handle } @@ -888,9 +885,11 @@ int32_t tqProcessSubscribeReq(STQ* pTq, int64_t version, char* msg, int32_t msgL } int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver) { +#if 0 if (pTask->taskLevel == TASK_LEVEL__AGG) { - ASSERT(taosArrayGetSize(pTask->childEpInfo) != 0); + A(taosArrayGetSize(pTask->childEpInfo) != 0); } +#endif pTask->refCnt = 1; pTask->schedStatus = TASK_SCHED_STATUS__INACTIVE; @@ -927,7 +926,9 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver) { .pStateBackend = pTask->pState, }; pTask->exec.executor = qCreateStreamExecTaskInfo(pTask->exec.qmsg, &handle); - ASSERT(pTask->exec.executor); + if (pTask->exec.executor == NULL) { + return -1; + } } else if (pTask->taskLevel == TASK_LEVEL__AGG) { pTask->pState = streamStateOpen(pTq->pStreamMeta->path, pTask, false, -1, -1); @@ -940,7 +941,9 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver) { .pStateBackend = pTask->pState, }; pTask->exec.executor = qCreateStreamExecTaskInfo(pTask->exec.qmsg, &mgHandle); - ASSERT(pTask->exec.executor); + if (pTask->exec.executor == NULL) { + return -1; + } } // sink @@ -952,12 +955,12 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver) { pTask->tbSink.vnode = pTq->pVnode; pTask->tbSink.tbSinkFunc = tqSinkToTablePipeline; - ASSERT(pTask->tbSink.pSchemaWrapper); - ASSERT(pTask->tbSink.pSchemaWrapper->pSchema); + /*A(pTask->tbSink.pSchemaWrapper);*/ + /*A(pTask->tbSink.pSchemaWrapper->pSchema);*/ pTask->tbSink.pTSchema = tdGetSTSChemaFromSSChema(pTask->tbSink.pSchemaWrapper->pSchema, pTask->tbSink.pSchemaWrapper->nCols, 1); - ASSERT(pTask->tbSink.pTSchema); + /*A(pTask->tbSink.pTSchema);*/ } streamSetupTrigger(pTask); @@ -1003,7 +1006,8 @@ int32_t tqProcessStreamTaskCheckReq(STQ* pTq, SRpcMsg* pMsg) { int32_t len; tEncodeSize(tEncodeSStreamTaskCheckRsp, &rsp, len, code); if (code < 0) { - ASSERT(0); + tqError("unable to encode rsp %d", __LINE__); + return -1; } void* buf = rpcMallocCont(sizeof(SMsgHead) + len); ((SMsgHead*)buf)->vgId = htonl(req.upstreamNodeId); @@ -1096,12 +1100,10 @@ int32_t tqProcessTaskRecover1Req(STQ* pTq, SRpcMsg* pMsg) { if (pTask == NULL) { return -1; } - ASSERT(pReq->taskId == pTask->taskId); // check param int64_t fillVer1 = pTask->startVer; if (fillVer1 <= 0) { - ASSERT(0); streamMetaReleaseTask(pTq->pStreamMeta, pTask); return -1; } @@ -1296,7 +1298,7 @@ int32_t tqProcessDelReq(STQ* pTq, void* pReq, int32_t len, int64_t ver) { } int32_t ref = atomic_sub_fetch_32(pRef, 1); - ASSERT(ref >= 0); + /*A(ref >= 0);*/ if (ref == 0) { blockDataDestroy(pDelBlock); taosMemoryFree(pRef); diff --git a/source/dnode/vnode/src/tq/tqExec.c b/source/dnode/vnode/src/tq/tqExec.c index 093186ebbb..426fceb7ed 100644 --- a/source/dnode/vnode/src/tq/tqExec.c +++ b/source/dnode/vnode/src/tq/tqExec.c @@ -29,7 +29,6 @@ int32_t tqAddBlockDataToRsp(const SSDataBlock* pBlock, SMqDataRsp* pRsp, int32_t int32_t actualLen = blockEncode(pBlock, pRetrieve->data, numOfCols); actualLen += sizeof(SRetrieveTableRsp); - ASSERT(actualLen <= dataStrLen); taosArrayPush(pRsp->blockDataLen, &actualLen); taosArrayPush(pRsp->blockData, &buf); return 0; @@ -62,7 +61,6 @@ static int32_t tqAddTbNameToRsp(const STQ* pTq, int64_t uid, SMqDataRsp* pRsp, i int32_t tqScanData(STQ* pTq, const STqHandle* pHandle, SMqDataRsp* pRsp, STqOffsetVal* pOffset) { const STqExecHandle* pExec = &pHandle->execHandle; - ASSERT(pExec->subType == TOPIC_SUB_TYPE__COLUMN); qTaskInfo_t task = pExec->task; @@ -87,7 +85,8 @@ int32_t tqScanData(STQ* pTq, const STqHandle* pHandle, SMqDataRsp* pRsp, STqOffs uint64_t ts = 0; tqDebug("vgId:%d, tmq task start to execute", pTq->pVnode->config.vgId); if (qExecTask(task, &pDataBlock, &ts) < 0) { - ASSERT(0); + tqError("vgId:%d task exec error since %s", pTq->pVnode->config.vgId, terrstr()); + return -1; } tqDebug("vgId:%d, tmq task executed, get %p", pTq->pVnode->config.vgId, pDataBlock); @@ -105,10 +104,14 @@ int32_t tqScanData(STQ* pTq, const STqHandle* pHandle, SMqDataRsp* pRsp, STqOffs } if (qStreamExtractOffset(task, &pRsp->rspOffset) < 0) { - ASSERT(0); return -1; } - ASSERT(pRsp->rspOffset.type != 0); + + if (pRsp->rspOffset.type == 0) { + tqError("expected rsp offset: type %d %" PRId64 " %" PRId64 " %" PRId64, pRsp->rspOffset.type, pRsp->rspOffset.ts, + pRsp->rspOffset.uid, pRsp->rspOffset.version); + return -1; + } if (pRsp->withTbName) { if (pRsp->rspOffset.type == TMQ_OFFSET__LOG) { @@ -118,7 +121,6 @@ int32_t tqScanData(STQ* pTq, const STqHandle* pHandle, SMqDataRsp* pRsp, STqOffs pRsp->withTbName = false; } } - ASSERT(pRsp->withSchema == false); return 0; } @@ -148,7 +150,8 @@ int32_t tqScanTaosx(STQ* pTq, const STqHandle* pHandle, STaosxRsp* pRsp, SMqMeta uint64_t ts = 0; tqDebug("tmqsnap task start to execute"); if (qExecTask(task, &pDataBlock, &ts) < 0) { - ASSERT(0); + tqError("vgId:%d task exec error since %s", pTq->pVnode->config.vgId, terrstr()); + return -1; } tqDebug("tmqsnap task execute end, get %p", pDataBlock); @@ -215,17 +218,20 @@ int32_t tqScanTaosx(STQ* pTq, const STqHandle* pHandle, STaosxRsp* pRsp, SMqMeta break; } - if (qStreamExtractOffset(task, &pRsp->rspOffset) < 0) { - ASSERT(0); + qStreamExtractOffset(task, &pRsp->rspOffset); + + if (pRsp->rspOffset.type == 0) { + tqError("expected rsp offset: type %d %" PRId64 " %" PRId64 " %" PRId64, pRsp->rspOffset.type, pRsp->rspOffset.ts, + pRsp->rspOffset.uid, pRsp->rspOffset.version); + return -1; } - ASSERT(pRsp->rspOffset.type != 0); return 0; } int32_t tqTaosxScanLog(STQ* pTq, STqHandle* pHandle, SSubmitReq* pReq, STaosxRsp* pRsp) { STqExecHandle* pExec = &pHandle->execHandle; - ASSERT(pExec->subType != TOPIC_SUB_TYPE__COLUMN); + /*A(pExec->subType != TOPIC_SUB_TYPE__COLUMN);*/ SArray* pBlocks = taosArrayInit(0, sizeof(SSDataBlock)); SArray* pSchemas = taosArrayInit(0, sizeof(void*)); diff --git a/source/dnode/vnode/src/tq/tqMeta.c b/source/dnode/vnode/src/tq/tqMeta.c index f476f58b56..05ed8d7348 100644 --- a/source/dnode/vnode/src/tq/tqMeta.c +++ b/source/dnode/vnode/src/tq/tqMeta.c @@ -71,17 +71,14 @@ int32_t tDecodeSTqHandle(SDecoder* pDecoder, STqHandle* pHandle) { int32_t tqMetaOpen(STQ* pTq) { if (tdbOpen(pTq->path, 16 * 1024, 1, &pTq->pMetaDB, 0) < 0) { - ASSERT(0); return -1; } if (tdbTbOpen("tq.db", -1, -1, NULL, pTq->pMetaDB, &pTq->pExecStore, 0) < 0) { - ASSERT(0); return -1; } if (tdbTbOpen("tq.check.db", -1, -1, NULL, pTq->pMetaDB, &pTq->pCheckStore, 0) < 0) { - ASSERT(0); return -1; } @@ -135,19 +132,19 @@ int32_t tqMetaDeleteCheckInfo(STQ* pTq, const char* key) { if (tdbBegin(pTq->pMetaDB, &txn, tdbDefaultMalloc, tdbDefaultFree, NULL, TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED) < 0) { - ASSERT(0); + return -1; } if (tdbTbDelete(pTq->pCheckStore, key, (int)strlen(key), txn) < 0) { - /*ASSERT(0);*/ + tqWarn("vgId:%d, tq try delete checkinfo failed %s", pTq->pVnode->config.vgId, key); } if (tdbCommit(pTq->pMetaDB, txn) < 0) { - ASSERT(0); + return -1; } if (tdbPostCommit(pTq->pMetaDB, txn) < 0) { - ASSERT(0); + return -1; } return 0; @@ -156,7 +153,6 @@ int32_t tqMetaDeleteCheckInfo(STQ* pTq, const char* key) { int32_t tqMetaRestoreCheckInfo(STQ* pTq) { TBC* pCur = NULL; if (tdbTbcOpen(pTq->pCheckStore, &pCur, NULL) < 0) { - ASSERT(0); return -1; } @@ -197,40 +193,42 @@ int32_t tqMetaSaveHandle(STQ* pTq, const char* key, const STqHandle* pHandle) { int32_t code; int32_t vlen; tEncodeSize(tEncodeSTqHandle, pHandle, vlen, code); - ASSERT(code == 0); + if (code < 0) { + return -1; + } tqDebug("tq save %s(%d) consumer %" PRId64 " vgId:%d", pHandle->subKey, (int32_t)strlen(pHandle->subKey), pHandle->consumerId, TD_VID(pTq->pVnode)); void* buf = taosMemoryCalloc(1, vlen); if (buf == NULL) { - ASSERT(0); + return -1; } SEncoder encoder; tEncoderInit(&encoder, buf, vlen); if (tEncodeSTqHandle(&encoder, pHandle) < 0) { - ASSERT(0); + return -1; } TXN* txn; if (tdbBegin(pTq->pMetaDB, &txn, tdbDefaultMalloc, tdbDefaultFree, NULL, TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED) < 0) { - ASSERT(0); + return -1; } if (tdbTbUpsert(pTq->pExecStore, key, (int)strlen(key), buf, vlen, txn) < 0) { - ASSERT(0); + return -1; } if (tdbCommit(pTq->pMetaDB, txn) < 0) { - ASSERT(0); + return -1; } if (tdbPostCommit(pTq->pMetaDB, txn) < 0) { - ASSERT(0); + return -1; } tEncoderClear(&encoder); @@ -243,19 +241,18 @@ int32_t tqMetaDeleteHandle(STQ* pTq, const char* key) { if (tdbBegin(pTq->pMetaDB, &txn, tdbDefaultMalloc, tdbDefaultFree, NULL, TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED) < 0) { - ASSERT(0); + return -1; } if (tdbTbDelete(pTq->pExecStore, key, (int)strlen(key), txn) < 0) { - /*ASSERT(0);*/ } if (tdbCommit(pTq->pMetaDB, txn) < 0) { - ASSERT(0); + return -1; } if (tdbPostCommit(pTq->pMetaDB, txn) < 0) { - ASSERT(0); + return -1; } return 0; @@ -264,7 +261,6 @@ int32_t tqMetaDeleteHandle(STQ* pTq, const char* key) { int32_t tqMetaRestoreHandle(STQ* pTq) { TBC* pCur = NULL; if (tdbTbcOpen(pTq->pExecStore, &pCur, NULL) < 0) { - ASSERT(0); return -1; } @@ -284,7 +280,6 @@ int32_t tqMetaRestoreHandle(STQ* pTq) { handle.pRef = walOpenRef(pTq->pVnode->pWal); if (handle.pRef == NULL) { - ASSERT(0); return -1; } walRefVer(handle.pRef, handle.snapshotVer); @@ -300,12 +295,19 @@ int32_t tqMetaRestoreHandle(STQ* pTq) { if (handle.execHandle.subType == TOPIC_SUB_TYPE__COLUMN) { handle.execHandle.task = qCreateQueueExecTaskInfo(handle.execHandle.execCol.qmsg, &reader, &handle.execHandle.numOfCols, NULL); - ASSERT(handle.execHandle.task); + if (handle.execHandle.task == NULL) { + tqError("cannot create exec task for %s", handle.subKey); + return -1; + } void* scanner = NULL; qExtractStreamScanner(handle.execHandle.task, &scanner); - ASSERT(scanner); + if (scanner == NULL) { + tqError("cannot extract stream scanner for %s", handle.subKey); + } handle.execHandle.pExecReader = qExtractReaderFromStreamScanner(scanner); - ASSERT(handle.execHandle.pExecReader); + if (handle.execHandle.pExecReader == NULL) { + tqError("cannot extract exec reader for %s", handle.subKey); + } } else if (handle.execHandle.subType == TOPIC_SUB_TYPE__DB) { handle.pWalReader = walOpenReader(pTq->pVnode->pWal, NULL); handle.execHandle.pExecReader = tqOpenReader(pTq->pVnode); diff --git a/source/dnode/vnode/src/tq/tqOffset.c b/source/dnode/vnode/src/tq/tqOffset.c index dd56c165fd..5a4d414ab7 100644 --- a/source/dnode/vnode/src/tq/tqOffset.c +++ b/source/dnode/vnode/src/tq/tqOffset.c @@ -40,26 +40,23 @@ int32_t tqOffsetRestoreFromFile(STqOffsetStore* pStore, const char* fname) { if (code == 0) { break; } else { - ASSERT(0); - // TODO handle error + return -1; } } int32_t size = htonl(head.size); void* memBuf = taosMemoryCalloc(1, size); if ((code = taosReadFile(pFile, memBuf, size)) != size) { - ASSERT(0); - // TODO handle error + return -1; } STqOffset offset; SDecoder decoder; tDecoderInit(&decoder, memBuf, size); if (tDecodeSTqOffset(&decoder, &offset) < 0) { - ASSERT(0); + return -1; } tDecoderClear(&decoder); if (taosHashPut(pStore->pHash, offset.subKey, strlen(offset.subKey), &offset, sizeof(STqOffset)) < 0) { - ASSERT(0); - // TODO + return -1; } taosMemoryFree(memBuf); } @@ -85,7 +82,9 @@ STqOffsetStore* tqOffsetOpen(STQ* pTq) { } char* fname = tqOffsetBuildFName(pStore->pTq->path, 0); if (tqOffsetRestoreFromFile(pStore, fname) < 0) { - ASSERT(0); + taosMemoryFree(fname); + taosMemoryFree(pStore); + return NULL; } taosMemoryFree(fname); return pStore; @@ -124,7 +123,6 @@ int32_t tqOffsetCommitFile(STqOffsetStore* pStore) { const char* sysErrStr = strerror(errno); tqError("vgId:%d, cannot open file %s when commit offset since %s", pStore->pTq->pVnode->config.vgId, fname, sysErrStr); - ASSERT(0); return -1; } taosMemoryFree(fname); @@ -136,9 +134,7 @@ int32_t tqOffsetCommitFile(STqOffsetStore* pStore) { int32_t bodyLen; int32_t code; tEncodeSize(tEncodeSTqOffset, pOffset, bodyLen, code); - ASSERT(code == 0); if (code < 0) { - ASSERT(0); taosHashCancelIterate(pStore->pHash, pIter); return -1; } @@ -154,7 +150,6 @@ int32_t tqOffsetCommitFile(STqOffsetStore* pStore) { // write file int64_t writeLen; if ((writeLen = taosWriteFile(pFile, buf, totLen)) != totLen) { - ASSERT(0); tqError("write offset incomplete, len %d, write len %" PRId64, bodyLen, writeLen); taosHashCancelIterate(pStore->pHash, pIter); taosMemoryFree(buf); diff --git a/source/dnode/vnode/src/tq/tqOffsetSnapshot.c b/source/dnode/vnode/src/tq/tqOffsetSnapshot.c index b63ff8af1d..2413a792c6 100644 --- a/source/dnode/vnode/src/tq/tqOffsetSnapshot.c +++ b/source/dnode/vnode/src/tq/tqOffsetSnapshot.c @@ -56,24 +56,28 @@ int32_t tqOffsetSnapRead(STqOffsetReader* pReader, uint8_t** ppData) { TdFilePtr pFile = taosOpenFile(fname, TD_FILE_READ); if (pFile == NULL) { taosMemoryFree(fname); - return 0; + return -1; } int64_t sz = 0; if (taosStatFile(fname, &sz, NULL) < 0) { - ASSERT(0); + taosCloseFile(&pFile); + taosMemoryFree(fname); + return -1; } taosMemoryFree(fname); SSnapDataHdr* buf = taosMemoryCalloc(1, sz + sizeof(SSnapDataHdr)); if (buf == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; + taosCloseFile(&pFile); return terrno; } void* abuf = POINTER_SHIFT(buf, sizeof(SSnapDataHdr)); int64_t contLen = taosReadFile(pFile, abuf, sz); if (contLen != sz) { - ASSERT(0); + taosCloseFile(&pFile); + taosMemoryFree(buf); return -1; } buf->size = sz; @@ -122,14 +126,17 @@ int32_t tqOffsetWriterClose(STqOffsetWriter** ppWriter, int8_t rollback) { if (rollback) { if (taosRemoveFile(pWriter->fname) < 0) { - ASSERT(0); + taosMemoryFree(fname); + return -1; } } else { if (taosRenameFile(pWriter->fname, fname) < 0) { - ASSERT(0); + taosMemoryFree(fname); + return -1; } if (tqOffsetRestoreFromFile(pTq->pOffsetStore, fname) < 0) { - ASSERT(0); + taosMemoryFree(fname); + return -1; } } taosMemoryFree(fname); @@ -146,14 +153,13 @@ int32_t tqOffsetSnapWrite(STqOffsetWriter* pWriter, uint8_t* pData, uint32_t nDa TdFilePtr pFile = taosOpenFile(pWriter->fname, TD_FILE_CREATE | TD_FILE_WRITE); SSnapDataHdr* pHdr = (SSnapDataHdr*)pData; int64_t size = pHdr->size; - ASSERT(size == nData - sizeof(SSnapDataHdr)); if (pFile) { int64_t contLen = taosWriteFile(pFile, pHdr->data, size); if (contLen != size) { - ASSERT(0); + taosCloseFile(&pFile); + return -1; } } else { - ASSERT(0); return -1; } return 0; diff --git a/source/dnode/vnode/src/tq/tqPush.c b/source/dnode/vnode/src/tq/tqPush.c index f89bc20362..63c3c25218 100644 --- a/source/dnode/vnode/src/tq/tqPush.c +++ b/source/dnode/vnode/src/tq/tqPush.c @@ -25,9 +25,7 @@ void tqTmrRspFunc(void* param, void* tmrId) { static int32_t tqLoopExecFromQueue(STQ* pTq, STqHandle* pHandle, SStreamDataSubmit** ppSubmit, SMqDataRsp* pRsp) { SStreamDataSubmit* pSubmit = *ppSubmit; while (pSubmit != NULL) { - ASSERT(pSubmit->ver == pHandle->pushHandle.processedVer + 1); if (tqLogScanExec(pTq, &pHandle->execHandle, pSubmit->data, pRsp, 0) < 0) { - /*ASSERT(0);*/ } // update processed atomic_store_64(&pHandle->pushHandle.processedVer, pSubmit->ver); @@ -160,8 +158,7 @@ int32_t tqPushMsgNew(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_ if (msgType == TDMT_VND_SUBMIT) { tqLogScanExec(pTq, &pHandle->execHandle, pReq, &rsp, workerId); } else { - // TODO - ASSERT(0); + tqError("tq push unexpected msg type %d", msgType); } if (rsp.blockNum == 0) { @@ -169,9 +166,6 @@ int32_t tqPushMsgNew(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_ continue; } - ASSERT(taosArrayGetSize(rsp.blockData) == rsp.blockNum); - ASSERT(taosArrayGetSize(rsp.blockDataLen) == rsp.blockNum); - rsp.rspOffset = fetchOffset; int32_t tlen = sizeof(SMqRspHead) + tEncodeSMqDataBlkRsp(NULL, &rsp); @@ -263,7 +257,7 @@ int tqPushMsg(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_t ver) SSDataBlock* pDataBlock = NULL; uint64_t ts = 0; if (qExecTask(task, &pDataBlock, &ts) < 0) { - ASSERT(0); + tqDebug("vgId:%d, tq exec error since %s", pTq->pVnode->config.vgId, terrstr()); } if (pDataBlock == NULL) { @@ -282,7 +276,7 @@ int tqPushMsg(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_t ver) // remove from hash size_t kLen; void* key = taosHashGetKey(pIter, &kLen); - void* keyCopy = taosMemoryMalloc(kLen); + void* keyCopy = taosMemoryCalloc(1, kLen + 1); memcpy(keyCopy, key, kLen); taosArrayPush(cachedKeys, &keyCopy); @@ -296,7 +290,7 @@ int tqPushMsg(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_t ver) void* key = taosArrayGetP(cachedKeys, i); size_t kLen = *(size_t*)taosArrayGet(cachedKeyLens, i); if (taosHashRemove(pTq->pPushMgr, key, kLen) != 0) { - ASSERT(0); + tqError("vgId:%d, tq push hash remove key error, key: %s", pTq->pVnode->config.vgId, (char*)key); } } taosArrayDestroyP(cachedKeys, (FDelete)taosMemoryFree); diff --git a/source/dnode/vnode/src/tq/tqRead.c b/source/dnode/vnode/src/tq/tqRead.c index 46b31bc5b0..e5392bbd02 100644 --- a/source/dnode/vnode/src/tq/tqRead.c +++ b/source/dnode/vnode/src/tq/tqRead.c @@ -176,8 +176,6 @@ bool isValValidForTable(STqHandle* pHandle, SWalCont* pHead) { goto end; } realTbSuid = req.suid; - } else { - ASSERT(0); } end: @@ -206,7 +204,6 @@ int64_t tqFetchLog(STQ* pTq, STqHandle* pHandle, int64_t* fetchOffset, SWalCkHea code = walFetchBody(pHandle->pWalReader, ppCkHead); if (code < 0) { - ASSERT(0); *fetchOffset = offset; code = -1; goto END; @@ -220,7 +217,6 @@ int64_t tqFetchLog(STQ* pTq, STqHandle* pHandle, int64_t* fetchOffset, SWalCkHea if (IS_META_MSG(pHead->msgType)) { code = walFetchBody(pHandle->pWalReader, ppCkHead); if (code < 0) { - ASSERT(0); *fetchOffset = offset; code = -1; goto END; @@ -238,7 +234,6 @@ int64_t tqFetchLog(STQ* pTq, STqHandle* pHandle, int64_t* fetchOffset, SWalCkHea } code = walSkipFetchBody(pHandle->pWalReader, *ppCkHead); if (code < 0) { - ASSERT(0); *fetchOffset = offset; code = -1; goto END; @@ -297,11 +292,8 @@ void tqCloseReader(STqReader* pReader) { int32_t tqSeekVer(STqReader* pReader, int64_t ver) { if (walReadSeekVer(pReader->pWalReader, ver) < 0) { - ASSERT(pReader->pWalReader->curInvalid); - ASSERT(pReader->pWalReader->curVersion == ver); return -1; } - ASSERT(pReader->pWalReader->curVersion == ver); return 0; } @@ -317,7 +309,6 @@ int32_t tqNextBlock(STqReader* pReader, SFetchRet* ret) { ret->offset.version = pReader->ver; ret->fetchType = FETCH_TYPE__NONE; tqDebug("return offset %" PRId64 ", no more valid", ret->offset.version); - ASSERT(ret->offset.version >= 0); return -1; } void* body = pReader->pWalReader->pHead->head.body; @@ -340,7 +331,6 @@ int32_t tqNextBlock(STqReader* pReader, SFetchRet* ret) { memset(&ret->data, 0, sizeof(SSDataBlock)); int32_t code = tqRetrieveDataBlock(&ret->data, pReader); if (code != 0 || ret->data.info.rows == 0) { - ASSERT(0); continue; } ret->fetchType = FETCH_TYPE__DATA; @@ -351,7 +341,6 @@ int32_t tqNextBlock(STqReader* pReader, SFetchRet* ret) { if (fromProcessedMsg) { ret->offset.type = TMQ_OFFSET__LOG; ret->offset.version = pReader->ver; - ASSERT(pReader->ver >= 0); ret->fetchType = FETCH_TYPE__SEP; tqDebug("return offset %" PRId64 ", processed finish", ret->offset.version); return 0; @@ -434,7 +423,6 @@ bool tqNextDataBlockFilterOut(STqReader* pHandle, SHashObj* filterOutUids) { } if (pHandle->pBlock == NULL) return false; - ASSERT(pHandle->tbIdHash == NULL); void* ret = taosHashGet(filterOutUids, &pHandle->msgIter.uid, sizeof(int64_t)); if (ret == NULL) { return true; @@ -453,7 +441,6 @@ int32_t tqRetrieveDataBlock(SSDataBlock* pBlock, STqReader* pReader) { if (pReader->pSchema == NULL) { tqWarn("cannot found tsschema for table: uid:%" PRId64 " (suid:%" PRId64 "), version %d, possibly dropped table", pReader->msgIter.uid, pReader->msgIter.suid, pReader->cachedSchemaVer); - /*ASSERT(0);*/ pReader->cachedSchemaSuid = 0; terrno = TSDB_CODE_TQ_TABLE_SCHEMA_NOT_FOUND; return -1; @@ -464,7 +451,6 @@ int32_t tqRetrieveDataBlock(SSDataBlock* pBlock, STqReader* pReader) { if (pReader->pSchemaWrapper == NULL) { tqWarn("cannot found schema wrapper for table: suid:%" PRId64 ", version %d, possibly dropped table", pReader->msgIter.uid, pReader->cachedSchemaVer); - /*ASSERT(0);*/ pReader->cachedSchemaSuid = 0; terrno = TSDB_CODE_TQ_TABLE_SCHEMA_NOT_FOUND; return -1; @@ -567,7 +553,6 @@ int32_t tqRetrieveTaosxBlock(STqReader* pReader, SArray* blocks, SArray* schemas if (pReader->pSchema == NULL) { tqWarn("cannot found tsschema for table: uid:%" PRId64 " (suid:%" PRId64 "), version %d, possibly dropped table", pReader->msgIter.uid, pReader->msgIter.suid, pReader->cachedSchemaVer); - /*ASSERT(0);*/ pReader->cachedSchemaSuid = 0; terrno = TSDB_CODE_TQ_TABLE_SCHEMA_NOT_FOUND; return -1; @@ -578,7 +563,6 @@ int32_t tqRetrieveTaosxBlock(STqReader* pReader, SArray* blocks, SArray* schemas if (pReader->pSchemaWrapper == NULL) { tqWarn("cannot found schema wrapper for table: suid:%" PRId64 ", version %d, possibly dropped table", pReader->msgIter.uid, pReader->cachedSchemaVer); - /*ASSERT(0);*/ pReader->cachedSchemaSuid = 0; terrno = TSDB_CODE_TQ_TABLE_SCHEMA_NOT_FOUND; return -1; @@ -671,8 +655,6 @@ int32_t tqRetrieveTaosxBlock(STqReader* pReader, SArray* blocks, SArray* schemas break; } - ASSERT(sVal.valType != TD_VTYPE_NONE); - if (colDataAppend(pColData, curRow, sVal.val, sVal.valType == TD_VTYPE_NULL) < 0) { goto FAIL; } @@ -732,8 +714,6 @@ int tqReaderAddTbUidList(STqReader* pReader, const SArray* tbUidList) { } int tqReaderRemoveTbUidList(STqReader* pReader, const SArray* tbUidList) { - ASSERT(pReader->tbIdHash != NULL); - for (int32_t i = 0; i < taosArrayGetSize(tbUidList); i++) { int64_t* pKey = (int64_t*)taosArrayGet(tbUidList, i); taosHashRemove(pReader->tbIdHash, pKey, sizeof(int64_t)); @@ -750,7 +730,10 @@ int32_t tqUpdateTbUidList(STQ* pTq, const SArray* tbUidList, bool isAdd) { STqHandle* pExec = (STqHandle*)pIter; if (pExec->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) { int32_t code = qUpdateQualifiedTableId(pExec->execHandle.task, tbUidList, isAdd); - ASSERT(code == 0); + if (code != 0) { + tqError("update qualified table error for %s", pExec->subKey); + continue; + } } else if (pExec->execHandle.subType == TOPIC_SUB_TYPE__DB) { if (!isAdd) { int32_t sz = taosArrayGetSize(tbUidList); @@ -769,7 +752,7 @@ int32_t tqUpdateTbUidList(STQ* pTq, const SArray* tbUidList, bool isAdd) { int32_t code = metaGetTableEntryByUidCache(&mr, *id); if (code != TSDB_CODE_SUCCESS) { - qError("failed to get table meta, uid:%" PRIu64 " code:%s", *id, tstrerror(terrno)); + tqError("failed to get table meta, uid:%" PRIu64 " code:%s", *id, tstrerror(terrno)); continue; } @@ -790,8 +773,6 @@ int32_t tqUpdateTbUidList(STQ* pTq, const SArray* tbUidList, bool isAdd) { } else { // TODO handle delete table from stb } - } else { - ASSERT(0); } } while (1) { @@ -800,7 +781,10 @@ int32_t tqUpdateTbUidList(STQ* pTq, const SArray* tbUidList, bool isAdd) { SStreamTask* pTask = *(SStreamTask**)pIter; if (pTask->taskLevel == TASK_LEVEL__SOURCE) { int32_t code = qUpdateQualifiedTableId(pTask->exec.executor, tbUidList, isAdd); - ASSERT(code == 0); + if (code != 0) { + tqError("update qualified table error for stream task %d", pTask->taskId); + continue; + } } } return 0; diff --git a/source/dnode/vnode/src/tq/tqSink.c b/source/dnode/vnode/src/tq/tqSink.c index 5907be576a..078b50db68 100644 --- a/source/dnode/vnode/src/tq/tqSink.c +++ b/source/dnode/vnode/src/tq/tqSink.c @@ -19,7 +19,6 @@ int32_t tqBuildDeleteReq(SVnode* pVnode, const char* stbFullName, const SSDataBlock* pDataBlock, SBatchDeleteReq* deleteReq) { - ASSERT(pDataBlock->info.type == STREAM_DELETE_RESULT); int32_t totRow = pDataBlock->info.rows; SColumnInfoData* pStartTsCol = taosArrayGet(pDataBlock->pDataBlock, START_TS_COLUMN_INDEX); SColumnInfoData* pEndTsCol = taosArrayGet(pDataBlock->pDataBlock, END_TS_COLUMN_INDEX); @@ -334,8 +333,8 @@ void tqSinkToTablePipeline(SStreamTask* pTask, void* vnode, int64_t ver, void* d int32_t code; tEncodeSize(tEncodeSBatchDeleteReq, &deleteReq, len, code); if (code < 0) { - // - ASSERT(0); + terrno = TSDB_CODE_OUT_OF_MEMORY; + return; } SEncoder encoder; void* serializedDeleteReq = rpcMallocCont(len + sizeof(SMsgHead)); @@ -559,7 +558,6 @@ void tqSinkToTableMerge(SStreamTask* pTask, void* vnode, int64_t ver, void* data tqDebug("vgId:%d, task %d write into table, block num: %d", TD_VID(pVnode), pTask->taskId, (int32_t)pRes->size); - ASSERT(pTask->tbSink.pTSchema); deleteReq.deleteReqs = taosArrayInit(0, sizeof(SSingleDeleteReq)); SSubmitReq* submitReq = tqBlockToSubmit(pVnode, pRes, pTask->tbSink.pTSchema, pTask->tbSink.pSchemaWrapper, true, pTask->tbSink.stbUid, pTask->tbSink.stbFullName, &deleteReq); @@ -570,10 +568,6 @@ void tqSinkToTableMerge(SStreamTask* pTask, void* vnode, int64_t ver, void* data int32_t code; int32_t len; tEncodeSize(tEncodeSBatchDeleteReq, &deleteReq, len, code); - if (code < 0) { - // - ASSERT(0); - } SEncoder encoder; void* serializedDeleteReq = rpcMallocCont(len + sizeof(SMsgHead)); void* abuf = POINTER_SHIFT(serializedDeleteReq, sizeof(SMsgHead)); diff --git a/source/dnode/vnode/src/tq/tqSnapshot.c b/source/dnode/vnode/src/tq/tqSnapshot.c index d811d943ed..ab7093a701 100644 --- a/source/dnode/vnode/src/tq/tqSnapshot.c +++ b/source/dnode/vnode/src/tq/tqSnapshot.c @@ -100,8 +100,6 @@ int32_t tqSnapRead(STqSnapReader* pReader, uint8_t** ppData) { } } - ASSERT(pVal && vLen); - *ppData = taosMemoryMalloc(sizeof(SSnapDataHdr) + vLen); if (*ppData == NULL) { code = TSDB_CODE_OUT_OF_MEMORY; diff --git a/source/dnode/vnode/src/tq/tqStreamStateSnap.c b/source/dnode/vnode/src/tq/tqStreamStateSnap.c index b1f00bdf74..ab7093a701 100644 --- a/source/dnode/vnode/src/tq/tqStreamStateSnap.c +++ b/source/dnode/vnode/src/tq/tqStreamStateSnap.c @@ -100,8 +100,6 @@ int32_t tqSnapRead(STqSnapReader* pReader, uint8_t** ppData) { } } - ASSERT(pVal && vLen); - *ppData = taosMemoryMalloc(sizeof(SSnapDataHdr) + vLen); if (*ppData == NULL) { code = TSDB_CODE_OUT_OF_MEMORY; @@ -168,7 +166,6 @@ int32_t tqSnapWriterClose(STqSnapWriter** ppWriter, int8_t rollback) { if (rollback) { tdbAbort(pWriter->pTq->pMetaDB, pWriter->txn); - ASSERT(0); } else { code = tdbCommit(pWriter->pTq->pMetaDB, pWriter->txn); if (code) goto _err; diff --git a/source/dnode/vnode/src/tq/tqStreamTaskSnap.c b/source/dnode/vnode/src/tq/tqStreamTaskSnap.c index 305378bc93..ab7093a701 100644 --- a/source/dnode/vnode/src/tq/tqStreamTaskSnap.c +++ b/source/dnode/vnode/src/tq/tqStreamTaskSnap.c @@ -100,8 +100,6 @@ int32_t tqSnapRead(STqSnapReader* pReader, uint8_t** ppData) { } } - ASSERT(pVal && vLen); - *ppData = taosMemoryMalloc(sizeof(SSnapDataHdr) + vLen); if (*ppData == NULL) { code = TSDB_CODE_OUT_OF_MEMORY; @@ -146,7 +144,7 @@ int32_t tqSnapWriterOpen(STQ* pTq, int64_t sver, int64_t ever, STqSnapWriter** p pWriter->sver = sver; pWriter->ever = ever; - if (tdbBegin(pTq->pMetaStore, &pWriter->txn, tdbDefaultMalloc, tdbDefaultFree, NULL, 0) < 0) { + if (tdbBegin(pTq->pMetaDB, &pWriter->txn, tdbDefaultMalloc, tdbDefaultFree, NULL, 0) < 0) { code = -1; taosMemoryFree(pWriter); goto _err; @@ -167,12 +165,11 @@ int32_t tqSnapWriterClose(STqSnapWriter** ppWriter, int8_t rollback) { STQ* pTq = pWriter->pTq; if (rollback) { - tdbAbort(pWriter->pTq->pMetaStore, pWriter->txn); - ASSERT(0); + tdbAbort(pWriter->pTq->pMetaDB, pWriter->txn); } else { - code = tdbCommit(pWriter->pTq->pMetaStore, pWriter->txn); + code = tdbCommit(pWriter->pTq->pMetaDB, pWriter->txn); if (code) goto _err; - code = tdbPostCommit(pWriter->pTq->pMetaStore, pWriter->txn); + code = tdbPostCommit(pWriter->pTq->pMetaDB, pWriter->txn); if (code) goto _err; } diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index d074ceede8..1414d3b4ab 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -768,8 +768,8 @@ static SSDataBlock* doTableScan(SOperatorInfo* pOperator) { tableListGetGroupList(pTaskInfo->pTableInfoList, pInfo->currentGroupId, &pList, &num); ASSERT(pInfo->base.dataReader == NULL); - int32_t code = tsdbReaderOpen(pInfo->base.readHandle.vnode, &pInfo->base.cond, pList, num, - pInfo->pResBlock, (STsdbReader**)&pInfo->base.dataReader, GET_TASKID(pTaskInfo)); + int32_t code = tsdbReaderOpen(pInfo->base.readHandle.vnode, &pInfo->base.cond, pList, num, pInfo->pResBlock, + (STsdbReader**)&pInfo->base.dataReader, GET_TASKID(pTaskInfo)); if (code != TSDB_CODE_SUCCESS) { T_LONG_JMP(pTaskInfo->env, code); } @@ -986,8 +986,8 @@ static SSDataBlock* readPreVersionData(SOperatorInfo* pTableScanOp, uint64_t tbU SSDataBlock* pBlock = pTableScanInfo->pResBlock; STsdbReader* pReader = NULL; - int32_t code = tsdbReaderOpen(pTableScanInfo->base.readHandle.vnode, &cond, &tblInfo, 1, pBlock, (STsdbReader**)&pReader, - GET_TASKID(pTaskInfo)); + int32_t code = tsdbReaderOpen(pTableScanInfo->base.readHandle.vnode, &cond, &tblInfo, 1, pBlock, + (STsdbReader**)&pReader, GET_TASKID(pTaskInfo)); if (code != TSDB_CODE_SUCCESS) { terrno = code; T_LONG_JMP(pTaskInfo->env, code); @@ -995,7 +995,7 @@ static SSDataBlock* readPreVersionData(SOperatorInfo* pTableScanOp, uint64_t tbU } if (tsdbNextDataBlock(pReader)) { - /*SSDataBlock* p = */tsdbRetrieveDataBlock(pReader, NULL); + /*SSDataBlock* p = */ tsdbRetrieveDataBlock(pReader, NULL); doSetTagColumnData(&pTableScanInfo->base, pBlock, pTaskInfo, pBlock->info.rows); pBlock->info.id.groupId = getTableGroupId(pTaskInfo->pTableInfoList, pBlock->info.id.uid); } @@ -1224,7 +1224,7 @@ static int32_t generateIntervalScanRange(SStreamScanInfo* pInfo, SSDataBlock* pS SColumnInfoData* pSrcUidCol = taosArrayGet(pSrcBlock->pDataBlock, UID_COLUMN_INDEX); SColumnInfoData* pSrcGpCol = taosArrayGet(pSrcBlock->pDataBlock, GROUPID_COLUMN_INDEX); - uint64_t* srcUidData = (uint64_t*)pSrcUidCol->pData; + uint64_t* srcUidData = (uint64_t*)pSrcUidCol->pData; ASSERT(pSrcStartTsCol->info.type == TSDB_DATA_TYPE_TIMESTAMP); TSKEY* srcStartTsCol = (TSKEY*)pSrcStartTsCol->pData; TSKEY* srcEndTsCol = (TSKEY*)pSrcEndTsCol->pData; @@ -1347,6 +1347,36 @@ static int32_t generateScanRange(SStreamScanInfo* pInfo, SSDataBlock* pSrcBlock, return code; } +#if 0 +void calBlockTag(SStreamScanInfo* pInfo, SSDataBlock* pBlock) { + SExprSupp* pTagCalSup = &pInfo->tagCalSup; + SStreamState* pState = pInfo->pStreamScanOp->pTaskInfo->streamInfo.pState; + if (pTagCalSup == NULL || pTagCalSup->numOfExprs == 0) return; + if (pBlock == NULL || pBlock->info.rows == 0) return; + + void* tag = NULL; + int32_t tagLen = 0; + if (streamStateGetParTag(pState, pBlock->info.id.groupId, &tag, &tagLen) == 0) { + pBlock->info.tagLen = tagLen; + void* pTag = taosMemoryRealloc(pBlock->info.pTag, tagLen); + if (pTag == NULL) { + tdbFree(tag); + taosMemoryFree(pBlock->info.pTag); + pBlock->info.pTag = NULL; + pBlock->info.tagLen = 0; + return; + } + pBlock->info.pTag = pTag; + memcpy(pBlock->info.pTag, tag, tagLen); + tdbFree(tag); + return; + } else { + pBlock->info.pTag = NULL; + } + tdbFree(tag); +} +#endif + void calBlockTbName(SStreamScanInfo* pInfo, SSDataBlock* pBlock) { SExprSupp* pTbNameCalSup = &pInfo->tbnameCalSup; SStreamState* pState = pInfo->pStreamScanOp->pTaskInfo->streamInfo.pState; @@ -1354,10 +1384,12 @@ void calBlockTbName(SStreamScanInfo* pInfo, SSDataBlock* pBlock) { if (pBlock == NULL || pBlock->info.rows == 0) return; void* tbname = NULL; - if (streamStateGetParName(pInfo->pStreamScanOp->pTaskInfo->streamInfo.pState, pBlock->info.id.groupId, &tbname) < 0) { - pBlock->info.parTbName[0] = 0; - } else { + if (streamStateGetParName(pState, pBlock->info.id.groupId, &tbname) == 0) { memcpy(pBlock->info.parTbName, tbname, TSDB_TABLE_NAME_LEN); + tdbFree(tbname); + return; + } else { + pBlock->info.parTbName[0] = 0; } tdbFree(tbname); @@ -2285,7 +2317,8 @@ SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhys if (pHandle->initTableReader) { pTSInfo->scanMode = TABLE_SCAN__TABLE_ORDER; pTSInfo->base.dataReader = NULL; - code = tsdbReaderOpen(pHandle->vnode, &pTSInfo->base.cond, pList, num, pTSInfo->pResBlock, &pTSInfo->base.dataReader, NULL); + code = tsdbReaderOpen(pHandle->vnode, &pTSInfo->base.cond, pList, num, pTSInfo->pResBlock, + &pTSInfo->base.dataReader, NULL); if (code != 0) { terrno = code; destroyTableScanOperatorInfo(pTableScanOp); @@ -2355,7 +2388,8 @@ SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhys pOperator->exprSupp.numOfExprs = taosArrayGetSize(pInfo->pRes->pDataBlock); __optr_fn_t nextFn = pTaskInfo->execModel == OPTR_EXEC_MODEL_STREAM ? doStreamScan : doQueueScan; - pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, nextFn, NULL, destroyStreamScanOperatorInfo, optrDefaultBufFn, NULL); + pOperator->fpSet = + createOperatorFpSet(optrDummyOpenFn, nextFn, NULL, destroyStreamScanOperatorInfo, optrDefaultBufFn, NULL); return pOperator; @@ -2492,7 +2526,8 @@ SOperatorInfo* createTagScanOperatorInfo(SReadHandle* pReadHandle, STagScanPhysi initResultSizeInfo(&pOperator->resultInfo, 4096); blockDataEnsureCapacity(pInfo->pRes, pOperator->resultInfo.capacity); - pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doTagScan, NULL, destroyTagScanOperatorInfo, optrDefaultBufFn, NULL); + pOperator->fpSet = + createOperatorFpSet(optrDummyOpenFn, doTagScan, NULL, destroyTagScanOperatorInfo, optrDefaultBufFn, NULL); return pOperator; @@ -2513,11 +2548,12 @@ static SSDataBlock* getTableDataBlockImpl(void* param) { SQueryTableDataCond* pQueryCond = taosArrayGet(pInfo->queryConds, readIdx); - int64_t st = taosGetTimestampUs(); - void* p = tableListGetInfo(pTaskInfo->pTableInfoList, readIdx + pInfo->tableStartIndex); + int64_t st = taosGetTimestampUs(); + void* p = tableListGetInfo(pTaskInfo->pTableInfoList, readIdx + pInfo->tableStartIndex); SReadHandle* pHandle = &pInfo->base.readHandle; - int32_t code = tsdbReaderOpen(pHandle->vnode, pQueryCond, p, 1, pBlock, &pInfo->base.dataReader, GET_TASKID(pTaskInfo)); + int32_t code = + tsdbReaderOpen(pHandle->vnode, pQueryCond, p, 1, pBlock, &pInfo->base.dataReader, GET_TASKID(pTaskInfo)); if (code != 0) { T_LONG_JMP(pTaskInfo->env, code); } @@ -2915,8 +2951,8 @@ static void buildVnodeGroupedNtbTableCount(STableCountScanOperatorInfo* SSDataBlock* pRes, char* dbName); static void buildVnodeFilteredTbCount(SOperatorInfo* pOperator, STableCountScanOperatorInfo* pInfo, STableCountScanSupp* pSupp, SSDataBlock* pRes, char* dbName); -static void buildVnodeGroupedTableCount(SOperatorInfo* pOperator, STableCountScanOperatorInfo* pInfo, - STableCountScanSupp* pSupp, SSDataBlock* pRes, int32_t vgId, char* dbName); +static void buildVnodeGroupedTableCount(SOperatorInfo* pOperator, STableCountScanOperatorInfo* pInfo, + STableCountScanSupp* pSupp, SSDataBlock* pRes, int32_t vgId, char* dbName); static SSDataBlock* buildVnodeDbTableCount(SOperatorInfo* pOperator, STableCountScanOperatorInfo* pInfo, STableCountScanSupp* pSupp, SSDataBlock* pRes); static void buildSysDbGroupedTableCount(SOperatorInfo* pOperator, STableCountScanOperatorInfo* pInfo, @@ -3041,8 +3077,8 @@ SOperatorInfo* createTableCountScanOperatorInfo(SReadHandle* readHandle, STableC setOperatorInfo(pOperator, "TableCountScanOperator", QUERY_NODE_PHYSICAL_PLAN_TABLE_COUNT_SCAN, false, OP_NOT_OPENED, pInfo, pTaskInfo); - pOperator->fpSet = - createOperatorFpSet(optrDummyOpenFn, doTableCountScan, NULL, destoryTableCountScanOperator, optrDefaultBufFn, NULL); + pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doTableCountScan, NULL, destoryTableCountScanOperator, + optrDefaultBufFn, NULL); return pOperator; _error: diff --git a/source/libs/stream/src/streamState.c b/source/libs/stream/src/streamState.c index af1d738de0..cf388da92c 100644 --- a/source/libs/stream/src/streamState.c +++ b/source/libs/stream/src/streamState.c @@ -159,6 +159,11 @@ SStreamState* streamStateOpen(char* path, SStreamTask* pTask, bool specPath, int goto _err; } + if (tdbTbOpen("partag.state.db", sizeof(int64_t), -1, NULL, pState->pTdbState->db, &pState->pTdbState->pParTagDb, 0) < + 0) { + goto _err; + } + if (streamStateBegin(pState) < 0) { goto _err; } @@ -173,6 +178,7 @@ _err: tdbTbClose(pState->pTdbState->pFillStateDb); tdbTbClose(pState->pTdbState->pSessionStateDb); tdbTbClose(pState->pTdbState->pParNameDb); + tdbTbClose(pState->pTdbState->pParTagDb); tdbClose(pState->pTdbState->db); streamStateDestroy(pState); return NULL; @@ -186,6 +192,7 @@ void streamStateClose(SStreamState* pState) { tdbTbClose(pState->pTdbState->pFillStateDb); tdbTbClose(pState->pTdbState->pSessionStateDb); tdbTbClose(pState->pTdbState->pParNameDb); + tdbTbClose(pState->pTdbState->pParTagDb); tdbClose(pState->pTdbState->db); streamStateDestroy(pState); @@ -821,10 +828,17 @@ _end: return res; } +int32_t streamStatePutParTag(SStreamState* pState, int64_t groupId, const void* tag, int32_t tagLen) { + return tdbTbUpsert(pState->pTdbState->pParTagDb, &groupId, sizeof(int64_t), tag, tagLen, pState->pTdbState->txn); +} + +int32_t streamStateGetParTag(SStreamState* pState, int64_t groupId, void** tagVal, int32_t* tagLen) { + return tdbTbGet(pState->pTdbState->pParTagDb, &groupId, sizeof(int64_t), tagVal, tagLen); +} + int32_t streamStatePutParName(SStreamState* pState, int64_t groupId, const char tbname[TSDB_TABLE_NAME_LEN]) { - tdbTbUpsert(pState->pTdbState->pParNameDb, &groupId, sizeof(int64_t), tbname, TSDB_TABLE_NAME_LEN, - pState->pTdbState->txn); - return 0; + return tdbTbUpsert(pState->pTdbState->pParNameDb, &groupId, sizeof(int64_t), tbname, TSDB_TABLE_NAME_LEN, + pState->pTdbState->txn); } int32_t streamStateGetParName(SStreamState* pState, int64_t groupId, void** pVal) { diff --git a/source/libs/wal/src/walRef.c b/source/libs/wal/src/walRef.c index e86111109c..2200454018 100644 --- a/source/libs/wal/src/walRef.c +++ b/source/libs/wal/src/walRef.c @@ -80,6 +80,7 @@ void walUnrefVer(SWalRef *pRef) { SWalRef *walRefCommittedVer(SWal *pWal) { SWalRef *pRef = walOpenRef(pWal); if (pRef == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; return NULL; } taosThreadMutexLock(&pWal->mutex);