Merge pull request #19059 from taosdata/feature/stream
refactor: remove assert
This commit is contained in:
commit
13dfaaea59
|
@ -195,7 +195,7 @@ typedef struct SDataBlockInfo {
|
|||
uint32_t capacity;
|
||||
SBlockID id;
|
||||
int16_t hasVarCol;
|
||||
int16_t dataLoad; // denote if the data is loaded or not
|
||||
int16_t dataLoad; // denote if the data is loaded or not
|
||||
|
||||
// TODO: optimize and remove following
|
||||
int64_t version; // used for stream, and need serialization
|
||||
|
@ -204,8 +204,9 @@ typedef struct SDataBlockInfo {
|
|||
STimeWindow calWin; // used for stream, do not serialize
|
||||
TSKEY watermark; // used for stream
|
||||
|
||||
char parTbName[TSDB_TABLE_NAME_LEN]; // used for stream partition
|
||||
STag* pTag; // used for stream partition
|
||||
char parTbName[TSDB_TABLE_NAME_LEN]; // used for stream partition
|
||||
int32_t tagLen;
|
||||
void* pTag; // used for stream partition
|
||||
} SDataBlockInfo;
|
||||
|
||||
typedef struct SSDataBlock {
|
||||
|
@ -239,22 +240,22 @@ typedef struct SVarColAttr {
|
|||
// pBlockAgg->numOfNull == info.rows, all data are null
|
||||
// pBlockAgg->numOfNull == 0, no data are null.
|
||||
typedef struct SColumnInfoData {
|
||||
char* pData; // the corresponding block data in memory
|
||||
char* pData; // the corresponding block data in memory
|
||||
union {
|
||||
char* nullbitmap; // bitmap, one bit for each item in the list
|
||||
SVarColAttr varmeta;
|
||||
};
|
||||
SColumnInfo info; // column info
|
||||
bool hasNull; // if current column data has null value.
|
||||
SColumnInfo info; // column info
|
||||
bool hasNull; // if current column data has null value.
|
||||
} SColumnInfoData;
|
||||
|
||||
typedef struct SQueryTableDataCond {
|
||||
uint64_t suid;
|
||||
int32_t order; // desc|asc order to iterate the data block
|
||||
int32_t order; // desc|asc order to iterate the data block
|
||||
int32_t numOfCols;
|
||||
SColumnInfo* colList;
|
||||
int32_t* pSlotList; // the column output destation slot, and it may be null
|
||||
int32_t type; // data block load type:
|
||||
int32_t* pSlotList; // the column output destation slot, and it may be null
|
||||
int32_t type; // data block load type:
|
||||
STimeWindow twindows;
|
||||
int64_t startVersion;
|
||||
int64_t endVersion;
|
||||
|
|
|
@ -35,6 +35,7 @@ typedef struct STdbState {
|
|||
TTB* pFillStateDb; // todo refactor
|
||||
TTB* pSessionStateDb;
|
||||
TTB* pParNameDb;
|
||||
TTB* pParTagDb;
|
||||
TXN* txn;
|
||||
} STdbState;
|
||||
|
||||
|
@ -108,6 +109,9 @@ int32_t streamStateCurPrev(SStreamState* pState, SStreamStateCur* pCur);
|
|||
int32_t streamStatePutParName(SStreamState* pState, int64_t groupId, const char* tbname);
|
||||
int32_t streamStateGetParName(SStreamState* pState, int64_t groupId, void** pVal);
|
||||
|
||||
int32_t streamStatePutParTag(SStreamState* pState, int64_t groupId, const void* tag, int32_t tagLen);
|
||||
int32_t streamStateGetParTag(SStreamState* pState, int64_t groupId, void** tagVal, int32_t* tagLen);
|
||||
|
||||
#if 0
|
||||
char* streamStateSessionDump(SStreamState* pState);
|
||||
#endif
|
||||
|
|
|
@ -530,7 +530,6 @@ static int32_t tmqSendCommitReq(tmq_t* tmq, SMqClientVg* pVg, SMqClientTopic* pT
|
|||
int32_t tmqCommitMsgImpl(tmq_t* tmq, const TAOS_RES* msg, int8_t async, tmq_commit_cb* userCb, void* userParam) {
|
||||
char* topic;
|
||||
int32_t vgId;
|
||||
ASSERT(msg != NULL);
|
||||
if (TD_RES_TMQ(msg)) {
|
||||
SMqRspObj* pRspObj = (SMqRspObj*)msg;
|
||||
topic = pRspObj->topic;
|
||||
|
@ -809,8 +808,6 @@ int32_t tmqHandleAllDelayedTask(tmq_t* tmq) {
|
|||
|
||||
taosTmrReset(tmqAssignDelayedCommitTask, tmq->autoCommitInterval, pRefId, tmqMgmt.timer, &tmq->commitTimer);
|
||||
} else if (*pTaskType == TMQ_DELAYED_TASK__REPORT) {
|
||||
} else {
|
||||
ASSERT(0);
|
||||
}
|
||||
taosFreeQitem(pTaskType);
|
||||
}
|
||||
|
@ -953,10 +950,6 @@ tmq_t* tmq_consumer_new(tmq_conf_t* conf, char* errstr, int32_t errstrLen) {
|
|||
const char* user = conf->user == NULL ? TSDB_DEFAULT_USER : conf->user;
|
||||
const char* pass = conf->pass == NULL ? TSDB_DEFAULT_PASS : conf->pass;
|
||||
|
||||
ASSERT(user);
|
||||
ASSERT(pass);
|
||||
ASSERT(conf->groupId[0]);
|
||||
|
||||
pTmq->clientTopics = taosArrayInit(0, sizeof(SMqClientTopic));
|
||||
pTmq->mqueue = taosOpenQueue();
|
||||
pTmq->qall = taosAllocateQall();
|
||||
|
@ -1247,8 +1240,6 @@ int32_t tmqPollCb(void* param, SDataBuf* pMsg, int32_t code) {
|
|||
tDecodeSTaosxRsp(&decoder, &pRspWrapper->taosxRsp);
|
||||
tDecoderClear(&decoder);
|
||||
memcpy(&pRspWrapper->taosxRsp, pMsg->pData, sizeof(SMqRspHead));
|
||||
} else {
|
||||
ASSERT(0);
|
||||
}
|
||||
|
||||
taosMemoryFree(pMsg->pData);
|
||||
|
|
|
@ -133,7 +133,10 @@ static int32_t mndProcessConsumerRecoverMsg(SRpcMsg *pMsg) {
|
|||
SMnode *pMnode = pMsg->info.node;
|
||||
SMqConsumerRecoverMsg *pRecoverMsg = pMsg->pCont;
|
||||
SMqConsumerObj *pConsumer = mndAcquireConsumer(pMnode, pRecoverMsg->consumerId);
|
||||
ASSERT(pConsumer);
|
||||
if (pConsumer == NULL) {
|
||||
mError("cannot find consumer %" PRId64 " when processing consumer recover msg", pRecoverMsg->consumerId);
|
||||
return -1;
|
||||
}
|
||||
|
||||
mInfo("receive consumer recover msg, consumer id %" PRId64 ", status %s", pRecoverMsg->consumerId,
|
||||
mndConsumerStatusName(pConsumer->status));
|
||||
|
@ -381,8 +384,6 @@ static int32_t mndProcessAskEpReq(SRpcMsg *pMsg) {
|
|||
return -1;
|
||||
}
|
||||
|
||||
ASSERT(strcmp(req.cgroup, pConsumer->cgroup) == 0);
|
||||
|
||||
atomic_store_32(&pConsumer->hbStatus, 0);
|
||||
|
||||
// 1. check consumer status
|
||||
|
@ -428,9 +429,8 @@ static int32_t mndProcessAskEpReq(SRpcMsg *pMsg) {
|
|||
for (int32_t i = 0; i < numOfTopics; i++) {
|
||||
char *topic = taosArrayGetP(pConsumer->currentTopics, i);
|
||||
SMqSubscribeObj *pSub = mndAcquireSubscribe(pMnode, pConsumer->cgroup, topic);
|
||||
|
||||
// txn guarantees pSub is created
|
||||
ASSERT(pSub);
|
||||
|
||||
taosRLockLatch(&pSub->lock);
|
||||
|
||||
SMqSubTopicEp topicEp = {0};
|
||||
|
@ -438,7 +438,6 @@ static int32_t mndProcessAskEpReq(SRpcMsg *pMsg) {
|
|||
|
||||
// 2.1 fetch topic schema
|
||||
SMqTopicObj *pTopic = mndAcquireTopic(pMnode, topic);
|
||||
ASSERT(pTopic);
|
||||
taosRLockLatch(&pTopic->lock);
|
||||
tstrncpy(topicEp.db, pTopic->db, TSDB_DB_FNAME_LEN);
|
||||
topicEp.schema.nCols = pTopic->schema.nCols;
|
||||
|
@ -775,8 +774,8 @@ static int32_t mndConsumerActionUpdate(SSdb *pSdb, SMqConsumerObj *pOldConsumer,
|
|||
taosWLockLatch(&pOldConsumer->lock);
|
||||
|
||||
if (pNewConsumer->updateType == CONSUMER_UPDATE__MODIFY) {
|
||||
ASSERT(taosArrayGetSize(pOldConsumer->rebNewTopics) == 0);
|
||||
ASSERT(taosArrayGetSize(pOldConsumer->rebRemovedTopics) == 0);
|
||||
/*A(taosArrayGetSize(pOldConsumer->rebNewTopics) == 0);*/
|
||||
/*A(taosArrayGetSize(pOldConsumer->rebRemovedTopics) == 0);*/
|
||||
|
||||
if (taosArrayGetSize(pNewConsumer->rebNewTopics) == 0 && taosArrayGetSize(pNewConsumer->rebRemovedTopics) == 0) {
|
||||
pOldConsumer->status = MQ_CONSUMER_STATUS__READY;
|
||||
|
@ -798,8 +797,8 @@ static int32_t mndConsumerActionUpdate(SSdb *pSdb, SMqConsumerObj *pOldConsumer,
|
|||
pOldConsumer->status = MQ_CONSUMER_STATUS__MODIFY;
|
||||
}
|
||||
} else if (pNewConsumer->updateType == CONSUMER_UPDATE__LOST) {
|
||||
ASSERT(taosArrayGetSize(pOldConsumer->rebNewTopics) == 0);
|
||||
ASSERT(taosArrayGetSize(pOldConsumer->rebRemovedTopics) == 0);
|
||||
/*A(taosArrayGetSize(pOldConsumer->rebNewTopics) == 0);*/
|
||||
/*A(taosArrayGetSize(pOldConsumer->rebRemovedTopics) == 0);*/
|
||||
|
||||
int32_t sz = taosArrayGetSize(pOldConsumer->currentTopics);
|
||||
/*pOldConsumer->rebRemovedTopics = taosArrayInit(sz, sizeof(void *));*/
|
||||
|
@ -812,8 +811,8 @@ static int32_t mndConsumerActionUpdate(SSdb *pSdb, SMqConsumerObj *pOldConsumer,
|
|||
|
||||
pOldConsumer->status = MQ_CONSUMER_STATUS__LOST;
|
||||
} else if (pNewConsumer->updateType == CONSUMER_UPDATE__RECOVER) {
|
||||
ASSERT(taosArrayGetSize(pOldConsumer->currentTopics) == 0);
|
||||
ASSERT(taosArrayGetSize(pOldConsumer->rebNewTopics) == 0);
|
||||
/*A(taosArrayGetSize(pOldConsumer->currentTopics) == 0);*/
|
||||
/*A(taosArrayGetSize(pOldConsumer->rebNewTopics) == 0);*/
|
||||
|
||||
int32_t sz = taosArrayGetSize(pOldConsumer->assignedTopics);
|
||||
for (int32_t i = 0; i < sz; i++) {
|
||||
|
@ -830,15 +829,15 @@ static int32_t mndConsumerActionUpdate(SSdb *pSdb, SMqConsumerObj *pOldConsumer,
|
|||
pOldConsumer->rebalanceTime = pNewConsumer->upTime;
|
||||
|
||||
} else if (pNewConsumer->updateType == CONSUMER_UPDATE__ADD) {
|
||||
ASSERT(taosArrayGetSize(pNewConsumer->rebNewTopics) == 1);
|
||||
ASSERT(taosArrayGetSize(pNewConsumer->rebRemovedTopics) == 0);
|
||||
/*A(taosArrayGetSize(pNewConsumer->rebNewTopics) == 1);*/
|
||||
/*A(taosArrayGetSize(pNewConsumer->rebRemovedTopics) == 0);*/
|
||||
|
||||
char *addedTopic = strdup(taosArrayGetP(pNewConsumer->rebNewTopics, 0));
|
||||
// not exist in current topic
|
||||
#if 1
|
||||
#if 0
|
||||
for (int32_t i = 0; i < taosArrayGetSize(pOldConsumer->currentTopics); i++) {
|
||||
char *topic = taosArrayGetP(pOldConsumer->currentTopics, i);
|
||||
ASSERT(strcmp(topic, addedTopic) != 0);
|
||||
A(strcmp(topic, addedTopic) != 0);
|
||||
}
|
||||
#endif
|
||||
|
||||
|
@ -879,15 +878,15 @@ static int32_t mndConsumerActionUpdate(SSdb *pSdb, SMqConsumerObj *pOldConsumer,
|
|||
|
||||
atomic_add_fetch_32(&pOldConsumer->epoch, 1);
|
||||
} else if (pNewConsumer->updateType == CONSUMER_UPDATE__REMOVE) {
|
||||
ASSERT(taosArrayGetSize(pNewConsumer->rebNewTopics) == 0);
|
||||
ASSERT(taosArrayGetSize(pNewConsumer->rebRemovedTopics) == 1);
|
||||
/*A(taosArrayGetSize(pNewConsumer->rebNewTopics) == 0);*/
|
||||
/*A(taosArrayGetSize(pNewConsumer->rebRemovedTopics) == 1);*/
|
||||
char *removedTopic = taosArrayGetP(pNewConsumer->rebRemovedTopics, 0);
|
||||
|
||||
// not exist in new topic
|
||||
#if 1
|
||||
#if 0
|
||||
for (int32_t i = 0; i < taosArrayGetSize(pOldConsumer->rebNewTopics); i++) {
|
||||
char *topic = taosArrayGetP(pOldConsumer->rebNewTopics, i);
|
||||
ASSERT(strcmp(topic, removedTopic) != 0);
|
||||
A(strcmp(topic, removedTopic) != 0);
|
||||
}
|
||||
#endif
|
||||
|
||||
|
@ -913,7 +912,7 @@ static int32_t mndConsumerActionUpdate(SSdb *pSdb, SMqConsumerObj *pOldConsumer,
|
|||
}
|
||||
}
|
||||
// must find the topic
|
||||
ASSERT(i < sz);
|
||||
/*A(i < sz);*/
|
||||
|
||||
// set status
|
||||
if (taosArrayGetSize(pOldConsumer->rebNewTopics) == 0 && taosArrayGetSize(pOldConsumer->rebRemovedTopics) == 0) {
|
||||
|
|
|
@ -115,13 +115,11 @@ int32_t mndAddDispatcherToInnerTask(SMnode* pMnode, SStreamObj* pStream, SStream
|
|||
|
||||
if (pStream->fixedSinkVgId == 0) {
|
||||
SDbObj* pDb = mndAcquireDb(pMnode, pStream->targetDb);
|
||||
ASSERT(pDb);
|
||||
if (pDb->cfg.numOfVgroups > 1) {
|
||||
isShuffle = true;
|
||||
pTask->outputType = TASK_OUTPUT__SHUFFLE_DISPATCH;
|
||||
pTask->dispatchMsgType = TDMT_STREAM_TASK_DISPATCH;
|
||||
if (mndExtractDbInfo(pMnode, pDb, &pTask->shuffleDispatcher.dbInfo, NULL) < 0) {
|
||||
ASSERT(0);
|
||||
return -1;
|
||||
}
|
||||
}
|
||||
|
@ -140,9 +138,7 @@ int32_t mndAddDispatcherToInnerTask(SMnode* pMnode, SStreamObj* pStream, SStream
|
|||
for (int32_t j = 0; j < sinkLvSize; j++) {
|
||||
SStreamTask* pLastLevelTask = taosArrayGetP(sinkLv, j);
|
||||
if (pLastLevelTask->nodeId == pVgInfo->vgId) {
|
||||
ASSERT(pVgInfo->vgId > 0);
|
||||
pVgInfo->taskId = pLastLevelTask->taskId;
|
||||
ASSERT(pVgInfo->taskId != 0);
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
@ -152,7 +148,6 @@ int32_t mndAddDispatcherToInnerTask(SMnode* pMnode, SStreamObj* pStream, SStream
|
|||
pTask->dispatchMsgType = TDMT_STREAM_TASK_DISPATCH;
|
||||
SArray* pArray = taosArrayGetP(pStream->tasks, 0);
|
||||
// one sink only
|
||||
ASSERT(taosArrayGetSize(pArray) == 1);
|
||||
SStreamTask* lastLevelTask = taosArrayGetP(pArray, 0);
|
||||
pTask->fixedEpDispatcher.taskId = lastLevelTask->taskId;
|
||||
pTask->fixedEpDispatcher.nodeId = lastLevelTask->nodeId;
|
||||
|
@ -170,7 +165,6 @@ int32_t mndAssignTaskToVg(SMnode* pMnode, SStreamTask* pTask, SSubplan* plan, co
|
|||
plan->execNode.epSet = pTask->epSet;
|
||||
|
||||
if (qSubPlanToString(plan, &pTask->exec.qmsg, &msgLen) < 0) {
|
||||
ASSERT(0);
|
||||
terrno = TSDB_CODE_QRY_INVALID_INPUT;
|
||||
return -1;
|
||||
}
|
||||
|
@ -195,7 +189,6 @@ int32_t mndAssignTaskToSnode(SMnode* pMnode, SStreamTask* pTask, SSubplan* plan,
|
|||
plan->execNode.epSet = pTask->epSet;
|
||||
|
||||
if (qSubPlanToString(plan, &pTask->exec.qmsg, &msgLen) < 0) {
|
||||
ASSERT(0);
|
||||
terrno = TSDB_CODE_QRY_INVALID_INPUT;
|
||||
return -1;
|
||||
}
|
||||
|
@ -222,8 +215,6 @@ int32_t mndAddShuffleSinkTasksToStream(SMnode* pMnode, SStreamObj* pStream) {
|
|||
void* pIter = NULL;
|
||||
SArray* tasks = taosArrayGetP(pStream->tasks, 0);
|
||||
|
||||
ASSERT(taosArrayGetSize(pStream->tasks) == 1);
|
||||
|
||||
while (1) {
|
||||
SVgObj* pVgroup = NULL;
|
||||
pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void**)&pVgroup);
|
||||
|
@ -257,7 +248,10 @@ int32_t mndAddShuffleSinkTasksToStream(SMnode* pMnode, SStreamObj* pStream) {
|
|||
pTask->tbSink.stbUid = pStream->targetStbUid;
|
||||
memcpy(pTask->tbSink.stbFullName, pStream->targetSTbName, TSDB_TABLE_FNAME_LEN);
|
||||
pTask->tbSink.pSchemaWrapper = tCloneSSchemaWrapper(&pStream->outputSchema);
|
||||
ASSERT(pTask->tbSink.pSchemaWrapper);
|
||||
if (pTask->tbSink.pSchemaWrapper == NULL) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
return -1;
|
||||
}
|
||||
}
|
||||
sdbRelease(pSdb, pVgroup);
|
||||
}
|
||||
|
@ -265,7 +259,6 @@ int32_t mndAddShuffleSinkTasksToStream(SMnode* pMnode, SStreamObj* pStream) {
|
|||
}
|
||||
|
||||
int32_t mndAddFixedSinkTaskToStream(SMnode* pMnode, SStreamObj* pStream) {
|
||||
ASSERT(pStream->fixedSinkVgId != 0);
|
||||
SArray* tasks = taosArrayGetP(pStream->tasks, 0);
|
||||
SStreamTask* pTask = tNewSStreamTask(pStream->uid);
|
||||
if (pTask == NULL) {
|
||||
|
@ -275,8 +268,6 @@ int32_t mndAddFixedSinkTaskToStream(SMnode* pMnode, SStreamObj* pStream) {
|
|||
pTask->fillHistory = pStream->fillHistory;
|
||||
mndAddTaskToTaskSet(tasks, pTask);
|
||||
|
||||
ASSERT(pStream->fixedSinkVg.vgId == pStream->fixedSinkVgId);
|
||||
|
||||
pTask->nodeId = pStream->fixedSinkVgId;
|
||||
#if 0
|
||||
SVgObj* pVgroup = mndAcquireVgroup(pMnode, pStream->fixedSinkVgId);
|
||||
|
@ -311,13 +302,16 @@ int32_t mndScheduleStream(SMnode* pMnode, SStreamObj* pStream) {
|
|||
return -1;
|
||||
}
|
||||
int32_t planTotLevel = LIST_LENGTH(pPlan->pSubplans);
|
||||
ASSERT(planTotLevel <= 2);
|
||||
|
||||
pStream->tasks = taosArrayInit(planTotLevel, sizeof(void*));
|
||||
|
||||
bool hasExtraSink = false;
|
||||
bool externalTargetDB = strcmp(pStream->sourceDb, pStream->targetDb) != 0;
|
||||
SDbObj* pDbObj = mndAcquireDb(pMnode, pStream->targetDb);
|
||||
ASSERT(pDbObj != NULL);
|
||||
if (pDbObj == NULL) {
|
||||
terrno = TSDB_CODE_QRY_INVALID_INPUT;
|
||||
return -1;
|
||||
}
|
||||
|
||||
bool multiTarget = pDbObj->cfg.numOfVgroups > 1;
|
||||
sdbRelease(pSdb, pDbObj);
|
||||
|
@ -351,7 +345,10 @@ int32_t mndScheduleStream(SMnode* pMnode, SStreamObj* pStream) {
|
|||
|
||||
SNodeListNode* inner = (SNodeListNode*)nodesListGetNode(pPlan->pSubplans, 0);
|
||||
SSubplan* plan = (SSubplan*)nodesListGetNode(inner->pNodeList, 0);
|
||||
ASSERT(plan->subplanType == SUBPLAN_TYPE_MERGE);
|
||||
if (plan->subplanType != SUBPLAN_TYPE_MERGE) {
|
||||
terrno = TSDB_CODE_QRY_INVALID_INPUT;
|
||||
return -1;
|
||||
}
|
||||
|
||||
pInnerTask = tNewSStreamTask(pStream->uid);
|
||||
if (pInnerTask == NULL) {
|
||||
|
@ -409,7 +406,10 @@ int32_t mndScheduleStream(SMnode* pMnode, SStreamObj* pStream) {
|
|||
|
||||
SNodeListNode* inner = (SNodeListNode*)nodesListGetNode(pPlan->pSubplans, 1);
|
||||
SSubplan* plan = (SSubplan*)nodesListGetNode(inner->pNodeList, 0);
|
||||
ASSERT(plan->subplanType == SUBPLAN_TYPE_SCAN);
|
||||
if (plan->subplanType != SUBPLAN_TYPE_SCAN) {
|
||||
terrno = TSDB_CODE_QRY_INVALID_INPUT;
|
||||
return -1;
|
||||
}
|
||||
|
||||
void* pIter = NULL;
|
||||
while (1) {
|
||||
|
@ -471,9 +471,15 @@ int32_t mndScheduleStream(SMnode* pMnode, SStreamObj* pStream) {
|
|||
taosArrayPush(pStream->tasks, &taskOneLevel);
|
||||
|
||||
SNodeListNode* inner = (SNodeListNode*)nodesListGetNode(pPlan->pSubplans, 0);
|
||||
ASSERT(LIST_LENGTH(inner->pNodeList) == 1);
|
||||
if (LIST_LENGTH(inner->pNodeList) != 1) {
|
||||
terrno = TSDB_CODE_QRY_INVALID_INPUT;
|
||||
return -1;
|
||||
}
|
||||
SSubplan* plan = (SSubplan*)nodesListGetNode(inner->pNodeList, 0);
|
||||
ASSERT(plan->subplanType == SUBPLAN_TYPE_SCAN);
|
||||
if (plan->subplanType != SUBPLAN_TYPE_SCAN) {
|
||||
terrno = TSDB_CODE_QRY_INVALID_INPUT;
|
||||
return -1;
|
||||
}
|
||||
|
||||
void* pIter = NULL;
|
||||
while (1) {
|
||||
|
@ -550,9 +556,6 @@ int32_t mndSchedInitSubEp(SMnode* pMnode, const SMqTopicObj* pTopic, SMqSubscrib
|
|||
plan = (SSubplan*)nodesListGetNode(inner->pNodeList, 0);
|
||||
}
|
||||
|
||||
ASSERT(pSub->unassignedVgs);
|
||||
ASSERT(taosHashGetSize(pSub->consumerHash) == 0);
|
||||
|
||||
void* pIter = NULL;
|
||||
while (1) {
|
||||
pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void**)&pVgroup);
|
||||
|
@ -590,10 +593,6 @@ int32_t mndSchedInitSubEp(SMnode* pMnode, const SMqTopicObj* pTopic, SMqSubscrib
|
|||
sdbRelease(pSdb, pVgroup);
|
||||
}
|
||||
|
||||
ASSERT(pSub->unassignedVgs->size > 0);
|
||||
|
||||
ASSERT(taosHashGetSize(pSub->consumerHash) == 0);
|
||||
|
||||
qDestroyQueryPlan(pPlan);
|
||||
|
||||
return 0;
|
||||
|
|
|
@ -326,13 +326,11 @@ static int32_t mndBuildStreamObjFromCreateReq(SMnode *pMnode, SStreamObj *pObj,
|
|||
|
||||
// deserialize ast
|
||||
if (nodesStringToNode(pObj->ast, &pAst) < 0) {
|
||||
/*ASSERT(0);*/
|
||||
goto FAIL;
|
||||
}
|
||||
|
||||
// extract output schema from ast
|
||||
if (qExtractResultSchema(pAst, (int32_t *)&pObj->outputSchema.nCols, &pObj->outputSchema.pSchema) != 0) {
|
||||
/*ASSERT(0);*/
|
||||
goto FAIL;
|
||||
}
|
||||
|
||||
|
@ -347,13 +345,11 @@ static int32_t mndBuildStreamObjFromCreateReq(SMnode *pMnode, SStreamObj *pObj,
|
|||
|
||||
// using ast and param to build physical plan
|
||||
if (qCreateQueryPlan(&cxt, &pPlan, NULL) < 0) {
|
||||
/*ASSERT(0);*/
|
||||
goto FAIL;
|
||||
}
|
||||
|
||||
// save physcial plan
|
||||
if (nodesNodeToString((SNode *)pPlan, false, &pObj->physicalPlan, NULL) != 0) {
|
||||
/*ASSERT(0);*/
|
||||
goto FAIL;
|
||||
}
|
||||
|
||||
|
@ -361,7 +357,7 @@ static int32_t mndBuildStreamObjFromCreateReq(SMnode *pMnode, SStreamObj *pObj,
|
|||
if (pCreate->numOfTags) {
|
||||
pObj->tagSchema.pSchema = taosMemoryCalloc(pCreate->numOfTags, sizeof(SSchema));
|
||||
}
|
||||
ASSERT(pCreate->numOfTags == taosArrayGetSize(pCreate->pTags));
|
||||
/*A(pCreate->numOfTags == taosArrayGetSize(pCreate->pTags));*/
|
||||
for (int32_t i = 0; i < pCreate->numOfTags; i++) {
|
||||
SField *pField = taosArrayGet(pCreate->pTags, i);
|
||||
pObj->tagSchema.pSchema[i].colId = pObj->outputSchema.nCols + i + 1;
|
||||
|
@ -378,9 +374,6 @@ FAIL:
|
|||
}
|
||||
|
||||
int32_t mndPersistTaskDeployReq(STrans *pTrans, const SStreamTask *pTask) {
|
||||
if (pTask->taskLevel == TASK_LEVEL__AGG) {
|
||||
ASSERT(taosArrayGetSize(pTask->childEpInfo) != 0);
|
||||
}
|
||||
SEncoder encoder;
|
||||
tEncoderInit(&encoder, NULL, 0);
|
||||
tEncodeSStreamTask(&encoder, pTask);
|
||||
|
@ -545,8 +538,6 @@ _OVER:
|
|||
}
|
||||
|
||||
static int32_t mndPersistTaskDropReq(STrans *pTrans, SStreamTask *pTask) {
|
||||
ASSERT(pTask->nodeId != 0);
|
||||
|
||||
// vnode
|
||||
/*if (pTask->nodeId > 0) {*/
|
||||
SVDropStreamTaskReq *pReq = taosMemoryCalloc(1, sizeof(SVDropStreamTaskReq));
|
||||
|
@ -800,10 +791,9 @@ static int32_t mndProcessStreamDoCheckpoint(SRpcMsg *pReq) {
|
|||
int32_t sz = taosArrayGetSize(pLevel);
|
||||
for (int32_t j = 0; j < sz; j++) {
|
||||
SStreamTask *pTask = taosArrayGetP(pLevel, j);
|
||||
ASSERT(pTask->nodeId > 0);
|
||||
/*A(pTask->nodeId > 0);*/
|
||||
SVgObj *pVgObj = mndAcquireVgroup(pMnode, pTask->nodeId);
|
||||
if (pVgObj == NULL) {
|
||||
ASSERT(0);
|
||||
taosRUnLockLatch(&pStream->lock);
|
||||
mndReleaseStream(pMnode, pStream);
|
||||
mndTransDrop(pTrans);
|
||||
|
@ -863,7 +853,6 @@ static int32_t mndProcessDropStreamReq(SRpcMsg *pReq) {
|
|||
|
||||
SMDropStreamReq dropReq = {0};
|
||||
if (tDeserializeSMDropStreamReq(pReq->pCont, pReq->contLen, &dropReq) < 0) {
|
||||
ASSERT(0);
|
||||
terrno = TSDB_CODE_INVALID_MSG;
|
||||
return -1;
|
||||
}
|
||||
|
|
|
@ -92,21 +92,21 @@ STQ* tqOpen(const char* path, SVnode* pVnode) {
|
|||
taosHashSetFreeFp(pTq->pCheckInfo, (FDelete)tDeleteSTqCheckInfo);
|
||||
|
||||
if (tqMetaOpen(pTq) < 0) {
|
||||
ASSERT(0);
|
||||
return NULL;
|
||||
}
|
||||
|
||||
pTq->pOffsetStore = tqOffsetOpen(pTq);
|
||||
if (pTq->pOffsetStore == NULL) {
|
||||
ASSERT(0);
|
||||
return NULL;
|
||||
}
|
||||
|
||||
pTq->pStreamMeta = streamMetaOpen(path, pTq, (FTaskExpand*)tqExpandTask, pTq->pVnode->config.vgId);
|
||||
if (pTq->pStreamMeta == NULL) {
|
||||
ASSERT(0);
|
||||
return NULL;
|
||||
}
|
||||
|
||||
if (streamLoadTasks(pTq->pStreamMeta) < 0) {
|
||||
ASSERT(0);
|
||||
return NULL;
|
||||
}
|
||||
|
||||
return pTq;
|
||||
|
@ -166,19 +166,17 @@ int32_t tqSendMetaPollRsp(STQ* pTq, const SRpcMsg* pMsg, const SMqPollReq* pReq,
|
|||
int32_t tqPushDataRsp(STQ* pTq, STqPushEntry* pPushEntry) {
|
||||
SMqDataRsp* pRsp = &pPushEntry->dataRsp;
|
||||
|
||||
ASSERT(taosArrayGetSize(pRsp->blockData) == pRsp->blockNum);
|
||||
ASSERT(taosArrayGetSize(pRsp->blockDataLen) == pRsp->blockNum);
|
||||
#if 0
|
||||
A(taosArrayGetSize(pRsp->blockData) == pRsp->blockNum);
|
||||
A(taosArrayGetSize(pRsp->blockDataLen) == pRsp->blockNum);
|
||||
|
||||
ASSERT(!pRsp->withSchema);
|
||||
ASSERT(taosArrayGetSize(pRsp->blockSchema) == 0);
|
||||
A(!pRsp->withSchema);
|
||||
A(taosArrayGetSize(pRsp->blockSchema) == 0);
|
||||
|
||||
if (pRsp->reqOffset.type == TMQ_OFFSET__LOG) {
|
||||
/*if (pRsp->blockNum > 0) {*/
|
||||
/*ASSERT(pRsp->rspOffset.version > pRsp->reqOffset.version);*/
|
||||
/*} else {*/
|
||||
ASSERT(pRsp->rspOffset.version > pRsp->reqOffset.version);
|
||||
/*}*/
|
||||
A(pRsp->rspOffset.version > pRsp->reqOffset.version);
|
||||
}
|
||||
#endif
|
||||
|
||||
int32_t len = 0;
|
||||
int32_t code = 0;
|
||||
|
@ -223,19 +221,21 @@ int32_t tqPushDataRsp(STQ* pTq, STqPushEntry* pPushEntry) {
|
|||
}
|
||||
|
||||
int32_t tqSendDataRsp(STQ* pTq, const SRpcMsg* pMsg, const SMqPollReq* pReq, const SMqDataRsp* pRsp) {
|
||||
ASSERT(taosArrayGetSize(pRsp->blockData) == pRsp->blockNum);
|
||||
ASSERT(taosArrayGetSize(pRsp->blockDataLen) == pRsp->blockNum);
|
||||
#if 0
|
||||
A(taosArrayGetSize(pRsp->blockData) == pRsp->blockNum);
|
||||
A(taosArrayGetSize(pRsp->blockDataLen) == pRsp->blockNum);
|
||||
|
||||
ASSERT(!pRsp->withSchema);
|
||||
ASSERT(taosArrayGetSize(pRsp->blockSchema) == 0);
|
||||
A(!pRsp->withSchema);
|
||||
A(taosArrayGetSize(pRsp->blockSchema) == 0);
|
||||
|
||||
if (pRsp->reqOffset.type == TMQ_OFFSET__LOG) {
|
||||
if (pRsp->blockNum > 0) {
|
||||
ASSERT(pRsp->rspOffset.version > pRsp->reqOffset.version);
|
||||
A(pRsp->rspOffset.version > pRsp->reqOffset.version);
|
||||
} else {
|
||||
ASSERT(pRsp->rspOffset.version >= pRsp->reqOffset.version);
|
||||
A(pRsp->rspOffset.version >= pRsp->reqOffset.version);
|
||||
}
|
||||
}
|
||||
#endif
|
||||
|
||||
int32_t len = 0;
|
||||
int32_t code = 0;
|
||||
|
@ -279,22 +279,24 @@ int32_t tqSendDataRsp(STQ* pTq, const SRpcMsg* pMsg, const SMqPollReq* pReq, con
|
|||
}
|
||||
|
||||
int32_t tqSendTaosxRsp(STQ* pTq, const SRpcMsg* pMsg, const SMqPollReq* pReq, const STaosxRsp* pRsp) {
|
||||
ASSERT(taosArrayGetSize(pRsp->blockData) == pRsp->blockNum);
|
||||
ASSERT(taosArrayGetSize(pRsp->blockDataLen) == pRsp->blockNum);
|
||||
#if 0
|
||||
A(taosArrayGetSize(pRsp->blockData) == pRsp->blockNum);
|
||||
A(taosArrayGetSize(pRsp->blockDataLen) == pRsp->blockNum);
|
||||
|
||||
if (pRsp->withSchema) {
|
||||
ASSERT(taosArrayGetSize(pRsp->blockSchema) == pRsp->blockNum);
|
||||
A(taosArrayGetSize(pRsp->blockSchema) == pRsp->blockNum);
|
||||
} else {
|
||||
ASSERT(taosArrayGetSize(pRsp->blockSchema) == 0);
|
||||
A(taosArrayGetSize(pRsp->blockSchema) == 0);
|
||||
}
|
||||
|
||||
if (pRsp->reqOffset.type == TMQ_OFFSET__LOG) {
|
||||
if (pRsp->blockNum > 0) {
|
||||
ASSERT(pRsp->rspOffset.version > pRsp->reqOffset.version);
|
||||
A(pRsp->rspOffset.version > pRsp->reqOffset.version);
|
||||
} else {
|
||||
ASSERT(pRsp->rspOffset.version >= pRsp->reqOffset.version);
|
||||
A(pRsp->rspOffset.version >= pRsp->reqOffset.version);
|
||||
}
|
||||
}
|
||||
#endif
|
||||
|
||||
int32_t len = 0;
|
||||
int32_t code = 0;
|
||||
|
@ -348,7 +350,6 @@ int32_t tqProcessOffsetCommitReq(STQ* pTq, int64_t version, char* msg, int32_t m
|
|||
SDecoder decoder;
|
||||
tDecoderInit(&decoder, msg, msgLen);
|
||||
if (tDecodeSTqOffset(&decoder, &offset) < 0) {
|
||||
ASSERT(0);
|
||||
return -1;
|
||||
}
|
||||
tDecoderClear(&decoder);
|
||||
|
@ -362,8 +363,8 @@ int32_t tqProcessOffsetCommitReq(STQ* pTq, int64_t version, char* msg, int32_t m
|
|||
if (offset.val.version + 1 == version) {
|
||||
offset.val.version += 1;
|
||||
}
|
||||
} else {
|
||||
ASSERT(0);
|
||||
/*} else {*/
|
||||
/*A(0);*/
|
||||
}
|
||||
STqOffset* pOffset = tqOffsetRead(pTq->pOffsetStore, offset.subKey);
|
||||
if (pOffset != NULL && tqOffsetLessOrEqual(&offset, pOffset)) {
|
||||
|
@ -371,7 +372,6 @@ int32_t tqProcessOffsetCommitReq(STQ* pTq, int64_t version, char* msg, int32_t m
|
|||
}
|
||||
|
||||
if (tqOffsetWrite(pTq->pOffsetStore, &offset) < 0) {
|
||||
ASSERT(0);
|
||||
return -1;
|
||||
}
|
||||
|
||||
|
@ -434,7 +434,7 @@ static int32_t tqInitDataRsp(SMqDataRsp* pRsp, const SMqPollReq* pReq, int8_t su
|
|||
}
|
||||
#endif
|
||||
|
||||
ASSERT(subType == TOPIC_SUB_TYPE__COLUMN);
|
||||
/*A(subType == TOPIC_SUB_TYPE__COLUMN);*/
|
||||
pRsp->withSchema = false;
|
||||
|
||||
return 0;
|
||||
|
@ -473,7 +473,6 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) {
|
|||
|
||||
// 1.find handle
|
||||
STqHandle* pHandle = taosHashGet(pTq->pHandle, req.subKey, strlen(req.subKey));
|
||||
/*ASSERT(pHandle);*/
|
||||
if (pHandle == NULL) {
|
||||
tqError("tmq poll: no consumer handle for consumer:%" PRId64 ", in vgId:%d, subkey %s", consumerId,
|
||||
TD_VID(pTq->pVnode), req.subKey);
|
||||
|
@ -560,7 +559,9 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) {
|
|||
tqInitDataRsp(&dataRsp, &req, pHandle->execHandle.subType);
|
||||
// lock
|
||||
taosWLockLatch(&pTq->pushLock);
|
||||
tqScanData(pTq, pHandle, &dataRsp, &fetchOffsetNew);
|
||||
if (tqScanData(pTq, pHandle, &dataRsp, &fetchOffsetNew) < 0) {
|
||||
return -1;
|
||||
}
|
||||
|
||||
#if 1
|
||||
if (dataRsp.blockNum == 0 && dataRsp.reqOffset.type == TMQ_OFFSET__LOG &&
|
||||
|
@ -599,7 +600,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) {
|
|||
}
|
||||
|
||||
// for taosx
|
||||
ASSERT(pHandle->execHandle.subType != TOPIC_SUB_TYPE__COLUMN);
|
||||
/*A(pHandle->execHandle.subType != TOPIC_SUB_TYPE__COLUMN);*/
|
||||
|
||||
SMqMetaRsp metaRsp = {0};
|
||||
|
||||
|
@ -607,7 +608,9 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) {
|
|||
tqInitTaosxRsp(&taosxRsp, &req);
|
||||
|
||||
if (fetchOffsetNew.type != TMQ_OFFSET__LOG) {
|
||||
tqScanTaosx(pTq, pHandle, &taosxRsp, &metaRsp, &fetchOffsetNew);
|
||||
if (tqScanTaosx(pTq, pHandle, &taosxRsp, &metaRsp, &fetchOffsetNew) < 0) {
|
||||
return -1;
|
||||
}
|
||||
|
||||
if (metaRsp.metaRspLen > 0) {
|
||||
if (tqSendMetaPollRsp(pTq, pMsg, &req, &metaRsp) < 0) {
|
||||
|
@ -690,8 +693,8 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) {
|
|||
}
|
||||
|
||||
} else {
|
||||
ASSERT(pHandle->fetchMeta);
|
||||
ASSERT(IS_META_MSG(pHead->msgType));
|
||||
/*A(pHandle->fetchMeta);*/
|
||||
/*A(IS_META_MSG(pHead->msgType));*/
|
||||
tqDebug("fetch meta msg, ver:%" PRId64 ", type:%d", pHead->version, pHead->msgType);
|
||||
tqOffsetResetToLog(&metaRsp.rspOffset, fetchVer);
|
||||
metaRsp.resMsgType = pHead->msgType;
|
||||
|
@ -808,7 +811,6 @@ int32_t tqProcessSubscribeReq(STQ* pTq, int64_t version, char* msg, int32_t msgL
|
|||
// TODO version should be assigned and refed during preprocess
|
||||
SWalRef* pRef = walRefCommittedVer(pTq->pVnode->pWal);
|
||||
if (pRef == NULL) {
|
||||
ASSERT(0);
|
||||
return -1;
|
||||
}
|
||||
int64_t ver = pRef->refVer;
|
||||
|
@ -829,12 +831,12 @@ int32_t tqProcessSubscribeReq(STQ* pTq, int64_t version, char* msg, int32_t msgL
|
|||
|
||||
pHandle->execHandle.task =
|
||||
qCreateQueueExecTaskInfo(pHandle->execHandle.execCol.qmsg, &handle, &pHandle->execHandle.numOfCols, NULL);
|
||||
ASSERT(pHandle->execHandle.task);
|
||||
/*A(pHandle->execHandle.task);*/
|
||||
void* scanner = NULL;
|
||||
qExtractStreamScanner(pHandle->execHandle.task, &scanner);
|
||||
ASSERT(scanner);
|
||||
/*A(scanner);*/
|
||||
pHandle->execHandle.pExecReader = qExtractReaderFromStreamScanner(scanner);
|
||||
ASSERT(pHandle->execHandle.pExecReader);
|
||||
/*A(pHandle->execHandle.pExecReader);*/
|
||||
} else if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__DB) {
|
||||
pHandle->pWalReader = walOpenReader(pTq->pVnode->pWal, NULL);
|
||||
pHandle->execHandle.pExecReader = tqOpenReader(pTq->pVnode);
|
||||
|
@ -867,19 +869,14 @@ int32_t tqProcessSubscribeReq(STQ* pTq, int64_t version, char* msg, int32_t msgL
|
|||
taosHashPut(pTq->pHandle, req.subKey, strlen(req.subKey), pHandle, sizeof(STqHandle));
|
||||
tqDebug("try to persist handle %s consumer %" PRId64, req.subKey, pHandle->consumerId);
|
||||
if (tqMetaSaveHandle(pTq, req.subKey, pHandle) < 0) {
|
||||
// TODO
|
||||
ASSERT(0);
|
||||
}
|
||||
} else {
|
||||
/*ASSERT(pExec->consumerId == req.oldConsumerId);*/
|
||||
// TODO handle qmsg and exec modification
|
||||
atomic_store_32(&pHandle->epoch, -1);
|
||||
atomic_store_64(&pHandle->consumerId, req.newConsumerId);
|
||||
atomic_add_fetch_32(&pHandle->epoch, 1);
|
||||
taosMemoryFree(req.qmsg);
|
||||
if (tqMetaSaveHandle(pTq, req.subKey, pHandle) < 0) {
|
||||
// TODO
|
||||
ASSERT(0);
|
||||
}
|
||||
// close handle
|
||||
}
|
||||
|
@ -888,9 +885,11 @@ int32_t tqProcessSubscribeReq(STQ* pTq, int64_t version, char* msg, int32_t msgL
|
|||
}
|
||||
|
||||
int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver) {
|
||||
#if 0
|
||||
if (pTask->taskLevel == TASK_LEVEL__AGG) {
|
||||
ASSERT(taosArrayGetSize(pTask->childEpInfo) != 0);
|
||||
A(taosArrayGetSize(pTask->childEpInfo) != 0);
|
||||
}
|
||||
#endif
|
||||
|
||||
pTask->refCnt = 1;
|
||||
pTask->schedStatus = TASK_SCHED_STATUS__INACTIVE;
|
||||
|
@ -927,7 +926,9 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver) {
|
|||
.pStateBackend = pTask->pState,
|
||||
};
|
||||
pTask->exec.executor = qCreateStreamExecTaskInfo(pTask->exec.qmsg, &handle);
|
||||
ASSERT(pTask->exec.executor);
|
||||
if (pTask->exec.executor == NULL) {
|
||||
return -1;
|
||||
}
|
||||
|
||||
} else if (pTask->taskLevel == TASK_LEVEL__AGG) {
|
||||
pTask->pState = streamStateOpen(pTq->pStreamMeta->path, pTask, false, -1, -1);
|
||||
|
@ -940,7 +941,9 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver) {
|
|||
.pStateBackend = pTask->pState,
|
||||
};
|
||||
pTask->exec.executor = qCreateStreamExecTaskInfo(pTask->exec.qmsg, &mgHandle);
|
||||
ASSERT(pTask->exec.executor);
|
||||
if (pTask->exec.executor == NULL) {
|
||||
return -1;
|
||||
}
|
||||
}
|
||||
|
||||
// sink
|
||||
|
@ -952,12 +955,12 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver) {
|
|||
pTask->tbSink.vnode = pTq->pVnode;
|
||||
pTask->tbSink.tbSinkFunc = tqSinkToTablePipeline;
|
||||
|
||||
ASSERT(pTask->tbSink.pSchemaWrapper);
|
||||
ASSERT(pTask->tbSink.pSchemaWrapper->pSchema);
|
||||
/*A(pTask->tbSink.pSchemaWrapper);*/
|
||||
/*A(pTask->tbSink.pSchemaWrapper->pSchema);*/
|
||||
|
||||
pTask->tbSink.pTSchema =
|
||||
tdGetSTSChemaFromSSChema(pTask->tbSink.pSchemaWrapper->pSchema, pTask->tbSink.pSchemaWrapper->nCols, 1);
|
||||
ASSERT(pTask->tbSink.pTSchema);
|
||||
/*A(pTask->tbSink.pTSchema);*/
|
||||
}
|
||||
|
||||
streamSetupTrigger(pTask);
|
||||
|
@ -1003,7 +1006,8 @@ int32_t tqProcessStreamTaskCheckReq(STQ* pTq, SRpcMsg* pMsg) {
|
|||
int32_t len;
|
||||
tEncodeSize(tEncodeSStreamTaskCheckRsp, &rsp, len, code);
|
||||
if (code < 0) {
|
||||
ASSERT(0);
|
||||
tqError("unable to encode rsp %d", __LINE__);
|
||||
return -1;
|
||||
}
|
||||
void* buf = rpcMallocCont(sizeof(SMsgHead) + len);
|
||||
((SMsgHead*)buf)->vgId = htonl(req.upstreamNodeId);
|
||||
|
@ -1096,12 +1100,10 @@ int32_t tqProcessTaskRecover1Req(STQ* pTq, SRpcMsg* pMsg) {
|
|||
if (pTask == NULL) {
|
||||
return -1;
|
||||
}
|
||||
ASSERT(pReq->taskId == pTask->taskId);
|
||||
|
||||
// check param
|
||||
int64_t fillVer1 = pTask->startVer;
|
||||
if (fillVer1 <= 0) {
|
||||
ASSERT(0);
|
||||
streamMetaReleaseTask(pTq->pStreamMeta, pTask);
|
||||
return -1;
|
||||
}
|
||||
|
@ -1296,7 +1298,7 @@ int32_t tqProcessDelReq(STQ* pTq, void* pReq, int32_t len, int64_t ver) {
|
|||
}
|
||||
|
||||
int32_t ref = atomic_sub_fetch_32(pRef, 1);
|
||||
ASSERT(ref >= 0);
|
||||
/*A(ref >= 0);*/
|
||||
if (ref == 0) {
|
||||
blockDataDestroy(pDelBlock);
|
||||
taosMemoryFree(pRef);
|
||||
|
|
|
@ -29,7 +29,6 @@ int32_t tqAddBlockDataToRsp(const SSDataBlock* pBlock, SMqDataRsp* pRsp, int32_t
|
|||
|
||||
int32_t actualLen = blockEncode(pBlock, pRetrieve->data, numOfCols);
|
||||
actualLen += sizeof(SRetrieveTableRsp);
|
||||
ASSERT(actualLen <= dataStrLen);
|
||||
taosArrayPush(pRsp->blockDataLen, &actualLen);
|
||||
taosArrayPush(pRsp->blockData, &buf);
|
||||
return 0;
|
||||
|
@ -62,7 +61,6 @@ static int32_t tqAddTbNameToRsp(const STQ* pTq, int64_t uid, SMqDataRsp* pRsp, i
|
|||
|
||||
int32_t tqScanData(STQ* pTq, const STqHandle* pHandle, SMqDataRsp* pRsp, STqOffsetVal* pOffset) {
|
||||
const STqExecHandle* pExec = &pHandle->execHandle;
|
||||
ASSERT(pExec->subType == TOPIC_SUB_TYPE__COLUMN);
|
||||
|
||||
qTaskInfo_t task = pExec->task;
|
||||
|
||||
|
@ -87,7 +85,8 @@ int32_t tqScanData(STQ* pTq, const STqHandle* pHandle, SMqDataRsp* pRsp, STqOffs
|
|||
uint64_t ts = 0;
|
||||
tqDebug("vgId:%d, tmq task start to execute", pTq->pVnode->config.vgId);
|
||||
if (qExecTask(task, &pDataBlock, &ts) < 0) {
|
||||
ASSERT(0);
|
||||
tqError("vgId:%d task exec error since %s", pTq->pVnode->config.vgId, terrstr());
|
||||
return -1;
|
||||
}
|
||||
tqDebug("vgId:%d, tmq task executed, get %p", pTq->pVnode->config.vgId, pDataBlock);
|
||||
|
||||
|
@ -105,10 +104,14 @@ int32_t tqScanData(STQ* pTq, const STqHandle* pHandle, SMqDataRsp* pRsp, STqOffs
|
|||
}
|
||||
|
||||
if (qStreamExtractOffset(task, &pRsp->rspOffset) < 0) {
|
||||
ASSERT(0);
|
||||
return -1;
|
||||
}
|
||||
ASSERT(pRsp->rspOffset.type != 0);
|
||||
|
||||
if (pRsp->rspOffset.type == 0) {
|
||||
tqError("expected rsp offset: type %d %" PRId64 " %" PRId64 " %" PRId64, pRsp->rspOffset.type, pRsp->rspOffset.ts,
|
||||
pRsp->rspOffset.uid, pRsp->rspOffset.version);
|
||||
return -1;
|
||||
}
|
||||
|
||||
if (pRsp->withTbName) {
|
||||
if (pRsp->rspOffset.type == TMQ_OFFSET__LOG) {
|
||||
|
@ -118,7 +121,6 @@ int32_t tqScanData(STQ* pTq, const STqHandle* pHandle, SMqDataRsp* pRsp, STqOffs
|
|||
pRsp->withTbName = false;
|
||||
}
|
||||
}
|
||||
ASSERT(pRsp->withSchema == false);
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
@ -148,7 +150,8 @@ int32_t tqScanTaosx(STQ* pTq, const STqHandle* pHandle, STaosxRsp* pRsp, SMqMeta
|
|||
uint64_t ts = 0;
|
||||
tqDebug("tmqsnap task start to execute");
|
||||
if (qExecTask(task, &pDataBlock, &ts) < 0) {
|
||||
ASSERT(0);
|
||||
tqError("vgId:%d task exec error since %s", pTq->pVnode->config.vgId, terrstr());
|
||||
return -1;
|
||||
}
|
||||
tqDebug("tmqsnap task execute end, get %p", pDataBlock);
|
||||
|
||||
|
@ -215,17 +218,20 @@ int32_t tqScanTaosx(STQ* pTq, const STqHandle* pHandle, STaosxRsp* pRsp, SMqMeta
|
|||
break;
|
||||
}
|
||||
|
||||
if (qStreamExtractOffset(task, &pRsp->rspOffset) < 0) {
|
||||
ASSERT(0);
|
||||
qStreamExtractOffset(task, &pRsp->rspOffset);
|
||||
|
||||
if (pRsp->rspOffset.type == 0) {
|
||||
tqError("expected rsp offset: type %d %" PRId64 " %" PRId64 " %" PRId64, pRsp->rspOffset.type, pRsp->rspOffset.ts,
|
||||
pRsp->rspOffset.uid, pRsp->rspOffset.version);
|
||||
return -1;
|
||||
}
|
||||
|
||||
ASSERT(pRsp->rspOffset.type != 0);
|
||||
return 0;
|
||||
}
|
||||
|
||||
int32_t tqTaosxScanLog(STQ* pTq, STqHandle* pHandle, SSubmitReq* pReq, STaosxRsp* pRsp) {
|
||||
STqExecHandle* pExec = &pHandle->execHandle;
|
||||
ASSERT(pExec->subType != TOPIC_SUB_TYPE__COLUMN);
|
||||
/*A(pExec->subType != TOPIC_SUB_TYPE__COLUMN);*/
|
||||
|
||||
SArray* pBlocks = taosArrayInit(0, sizeof(SSDataBlock));
|
||||
SArray* pSchemas = taosArrayInit(0, sizeof(void*));
|
||||
|
|
|
@ -71,17 +71,14 @@ int32_t tDecodeSTqHandle(SDecoder* pDecoder, STqHandle* pHandle) {
|
|||
|
||||
int32_t tqMetaOpen(STQ* pTq) {
|
||||
if (tdbOpen(pTq->path, 16 * 1024, 1, &pTq->pMetaDB, 0) < 0) {
|
||||
ASSERT(0);
|
||||
return -1;
|
||||
}
|
||||
|
||||
if (tdbTbOpen("tq.db", -1, -1, NULL, pTq->pMetaDB, &pTq->pExecStore, 0) < 0) {
|
||||
ASSERT(0);
|
||||
return -1;
|
||||
}
|
||||
|
||||
if (tdbTbOpen("tq.check.db", -1, -1, NULL, pTq->pMetaDB, &pTq->pCheckStore, 0) < 0) {
|
||||
ASSERT(0);
|
||||
return -1;
|
||||
}
|
||||
|
||||
|
@ -135,19 +132,19 @@ int32_t tqMetaDeleteCheckInfo(STQ* pTq, const char* key) {
|
|||
|
||||
if (tdbBegin(pTq->pMetaDB, &txn, tdbDefaultMalloc, tdbDefaultFree, NULL, TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED) <
|
||||
0) {
|
||||
ASSERT(0);
|
||||
return -1;
|
||||
}
|
||||
|
||||
if (tdbTbDelete(pTq->pCheckStore, key, (int)strlen(key), txn) < 0) {
|
||||
/*ASSERT(0);*/
|
||||
tqWarn("vgId:%d, tq try delete checkinfo failed %s", pTq->pVnode->config.vgId, key);
|
||||
}
|
||||
|
||||
if (tdbCommit(pTq->pMetaDB, txn) < 0) {
|
||||
ASSERT(0);
|
||||
return -1;
|
||||
}
|
||||
|
||||
if (tdbPostCommit(pTq->pMetaDB, txn) < 0) {
|
||||
ASSERT(0);
|
||||
return -1;
|
||||
}
|
||||
|
||||
return 0;
|
||||
|
@ -156,7 +153,6 @@ int32_t tqMetaDeleteCheckInfo(STQ* pTq, const char* key) {
|
|||
int32_t tqMetaRestoreCheckInfo(STQ* pTq) {
|
||||
TBC* pCur = NULL;
|
||||
if (tdbTbcOpen(pTq->pCheckStore, &pCur, NULL) < 0) {
|
||||
ASSERT(0);
|
||||
return -1;
|
||||
}
|
||||
|
||||
|
@ -197,40 +193,42 @@ int32_t tqMetaSaveHandle(STQ* pTq, const char* key, const STqHandle* pHandle) {
|
|||
int32_t code;
|
||||
int32_t vlen;
|
||||
tEncodeSize(tEncodeSTqHandle, pHandle, vlen, code);
|
||||
ASSERT(code == 0);
|
||||
if (code < 0) {
|
||||
return -1;
|
||||
}
|
||||
|
||||
tqDebug("tq save %s(%d) consumer %" PRId64 " vgId:%d", pHandle->subKey, (int32_t)strlen(pHandle->subKey),
|
||||
pHandle->consumerId, TD_VID(pTq->pVnode));
|
||||
|
||||
void* buf = taosMemoryCalloc(1, vlen);
|
||||
if (buf == NULL) {
|
||||
ASSERT(0);
|
||||
return -1;
|
||||
}
|
||||
|
||||
SEncoder encoder;
|
||||
tEncoderInit(&encoder, buf, vlen);
|
||||
|
||||
if (tEncodeSTqHandle(&encoder, pHandle) < 0) {
|
||||
ASSERT(0);
|
||||
return -1;
|
||||
}
|
||||
|
||||
TXN* txn;
|
||||
|
||||
if (tdbBegin(pTq->pMetaDB, &txn, tdbDefaultMalloc, tdbDefaultFree, NULL, TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED) <
|
||||
0) {
|
||||
ASSERT(0);
|
||||
return -1;
|
||||
}
|
||||
|
||||
if (tdbTbUpsert(pTq->pExecStore, key, (int)strlen(key), buf, vlen, txn) < 0) {
|
||||
ASSERT(0);
|
||||
return -1;
|
||||
}
|
||||
|
||||
if (tdbCommit(pTq->pMetaDB, txn) < 0) {
|
||||
ASSERT(0);
|
||||
return -1;
|
||||
}
|
||||
|
||||
if (tdbPostCommit(pTq->pMetaDB, txn) < 0) {
|
||||
ASSERT(0);
|
||||
return -1;
|
||||
}
|
||||
|
||||
tEncoderClear(&encoder);
|
||||
|
@ -243,19 +241,18 @@ int32_t tqMetaDeleteHandle(STQ* pTq, const char* key) {
|
|||
|
||||
if (tdbBegin(pTq->pMetaDB, &txn, tdbDefaultMalloc, tdbDefaultFree, NULL, TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED) <
|
||||
0) {
|
||||
ASSERT(0);
|
||||
return -1;
|
||||
}
|
||||
|
||||
if (tdbTbDelete(pTq->pExecStore, key, (int)strlen(key), txn) < 0) {
|
||||
/*ASSERT(0);*/
|
||||
}
|
||||
|
||||
if (tdbCommit(pTq->pMetaDB, txn) < 0) {
|
||||
ASSERT(0);
|
||||
return -1;
|
||||
}
|
||||
|
||||
if (tdbPostCommit(pTq->pMetaDB, txn) < 0) {
|
||||
ASSERT(0);
|
||||
return -1;
|
||||
}
|
||||
|
||||
return 0;
|
||||
|
@ -264,7 +261,6 @@ int32_t tqMetaDeleteHandle(STQ* pTq, const char* key) {
|
|||
int32_t tqMetaRestoreHandle(STQ* pTq) {
|
||||
TBC* pCur = NULL;
|
||||
if (tdbTbcOpen(pTq->pExecStore, &pCur, NULL) < 0) {
|
||||
ASSERT(0);
|
||||
return -1;
|
||||
}
|
||||
|
||||
|
@ -284,7 +280,6 @@ int32_t tqMetaRestoreHandle(STQ* pTq) {
|
|||
|
||||
handle.pRef = walOpenRef(pTq->pVnode->pWal);
|
||||
if (handle.pRef == NULL) {
|
||||
ASSERT(0);
|
||||
return -1;
|
||||
}
|
||||
walRefVer(handle.pRef, handle.snapshotVer);
|
||||
|
@ -300,12 +295,19 @@ int32_t tqMetaRestoreHandle(STQ* pTq) {
|
|||
if (handle.execHandle.subType == TOPIC_SUB_TYPE__COLUMN) {
|
||||
handle.execHandle.task =
|
||||
qCreateQueueExecTaskInfo(handle.execHandle.execCol.qmsg, &reader, &handle.execHandle.numOfCols, NULL);
|
||||
ASSERT(handle.execHandle.task);
|
||||
if (handle.execHandle.task == NULL) {
|
||||
tqError("cannot create exec task for %s", handle.subKey);
|
||||
return -1;
|
||||
}
|
||||
void* scanner = NULL;
|
||||
qExtractStreamScanner(handle.execHandle.task, &scanner);
|
||||
ASSERT(scanner);
|
||||
if (scanner == NULL) {
|
||||
tqError("cannot extract stream scanner for %s", handle.subKey);
|
||||
}
|
||||
handle.execHandle.pExecReader = qExtractReaderFromStreamScanner(scanner);
|
||||
ASSERT(handle.execHandle.pExecReader);
|
||||
if (handle.execHandle.pExecReader == NULL) {
|
||||
tqError("cannot extract exec reader for %s", handle.subKey);
|
||||
}
|
||||
} else if (handle.execHandle.subType == TOPIC_SUB_TYPE__DB) {
|
||||
handle.pWalReader = walOpenReader(pTq->pVnode->pWal, NULL);
|
||||
handle.execHandle.pExecReader = tqOpenReader(pTq->pVnode);
|
||||
|
|
|
@ -40,26 +40,23 @@ int32_t tqOffsetRestoreFromFile(STqOffsetStore* pStore, const char* fname) {
|
|||
if (code == 0) {
|
||||
break;
|
||||
} else {
|
||||
ASSERT(0);
|
||||
// TODO handle error
|
||||
return -1;
|
||||
}
|
||||
}
|
||||
int32_t size = htonl(head.size);
|
||||
void* memBuf = taosMemoryCalloc(1, size);
|
||||
if ((code = taosReadFile(pFile, memBuf, size)) != size) {
|
||||
ASSERT(0);
|
||||
// TODO handle error
|
||||
return -1;
|
||||
}
|
||||
STqOffset offset;
|
||||
SDecoder decoder;
|
||||
tDecoderInit(&decoder, memBuf, size);
|
||||
if (tDecodeSTqOffset(&decoder, &offset) < 0) {
|
||||
ASSERT(0);
|
||||
return -1;
|
||||
}
|
||||
tDecoderClear(&decoder);
|
||||
if (taosHashPut(pStore->pHash, offset.subKey, strlen(offset.subKey), &offset, sizeof(STqOffset)) < 0) {
|
||||
ASSERT(0);
|
||||
// TODO
|
||||
return -1;
|
||||
}
|
||||
taosMemoryFree(memBuf);
|
||||
}
|
||||
|
@ -85,7 +82,9 @@ STqOffsetStore* tqOffsetOpen(STQ* pTq) {
|
|||
}
|
||||
char* fname = tqOffsetBuildFName(pStore->pTq->path, 0);
|
||||
if (tqOffsetRestoreFromFile(pStore, fname) < 0) {
|
||||
ASSERT(0);
|
||||
taosMemoryFree(fname);
|
||||
taosMemoryFree(pStore);
|
||||
return NULL;
|
||||
}
|
||||
taosMemoryFree(fname);
|
||||
return pStore;
|
||||
|
@ -124,7 +123,6 @@ int32_t tqOffsetCommitFile(STqOffsetStore* pStore) {
|
|||
const char* sysErrStr = strerror(errno);
|
||||
tqError("vgId:%d, cannot open file %s when commit offset since %s", pStore->pTq->pVnode->config.vgId, fname,
|
||||
sysErrStr);
|
||||
ASSERT(0);
|
||||
return -1;
|
||||
}
|
||||
taosMemoryFree(fname);
|
||||
|
@ -136,9 +134,7 @@ int32_t tqOffsetCommitFile(STqOffsetStore* pStore) {
|
|||
int32_t bodyLen;
|
||||
int32_t code;
|
||||
tEncodeSize(tEncodeSTqOffset, pOffset, bodyLen, code);
|
||||
ASSERT(code == 0);
|
||||
if (code < 0) {
|
||||
ASSERT(0);
|
||||
taosHashCancelIterate(pStore->pHash, pIter);
|
||||
return -1;
|
||||
}
|
||||
|
@ -154,7 +150,6 @@ int32_t tqOffsetCommitFile(STqOffsetStore* pStore) {
|
|||
// write file
|
||||
int64_t writeLen;
|
||||
if ((writeLen = taosWriteFile(pFile, buf, totLen)) != totLen) {
|
||||
ASSERT(0);
|
||||
tqError("write offset incomplete, len %d, write len %" PRId64, bodyLen, writeLen);
|
||||
taosHashCancelIterate(pStore->pHash, pIter);
|
||||
taosMemoryFree(buf);
|
||||
|
|
|
@ -56,24 +56,28 @@ int32_t tqOffsetSnapRead(STqOffsetReader* pReader, uint8_t** ppData) {
|
|||
TdFilePtr pFile = taosOpenFile(fname, TD_FILE_READ);
|
||||
if (pFile == NULL) {
|
||||
taosMemoryFree(fname);
|
||||
return 0;
|
||||
return -1;
|
||||
}
|
||||
|
||||
int64_t sz = 0;
|
||||
if (taosStatFile(fname, &sz, NULL) < 0) {
|
||||
ASSERT(0);
|
||||
taosCloseFile(&pFile);
|
||||
taosMemoryFree(fname);
|
||||
return -1;
|
||||
}
|
||||
taosMemoryFree(fname);
|
||||
|
||||
SSnapDataHdr* buf = taosMemoryCalloc(1, sz + sizeof(SSnapDataHdr));
|
||||
if (buf == NULL) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
taosCloseFile(&pFile);
|
||||
return terrno;
|
||||
}
|
||||
void* abuf = POINTER_SHIFT(buf, sizeof(SSnapDataHdr));
|
||||
int64_t contLen = taosReadFile(pFile, abuf, sz);
|
||||
if (contLen != sz) {
|
||||
ASSERT(0);
|
||||
taosCloseFile(&pFile);
|
||||
taosMemoryFree(buf);
|
||||
return -1;
|
||||
}
|
||||
buf->size = sz;
|
||||
|
@ -122,14 +126,17 @@ int32_t tqOffsetWriterClose(STqOffsetWriter** ppWriter, int8_t rollback) {
|
|||
|
||||
if (rollback) {
|
||||
if (taosRemoveFile(pWriter->fname) < 0) {
|
||||
ASSERT(0);
|
||||
taosMemoryFree(fname);
|
||||
return -1;
|
||||
}
|
||||
} else {
|
||||
if (taosRenameFile(pWriter->fname, fname) < 0) {
|
||||
ASSERT(0);
|
||||
taosMemoryFree(fname);
|
||||
return -1;
|
||||
}
|
||||
if (tqOffsetRestoreFromFile(pTq->pOffsetStore, fname) < 0) {
|
||||
ASSERT(0);
|
||||
taosMemoryFree(fname);
|
||||
return -1;
|
||||
}
|
||||
}
|
||||
taosMemoryFree(fname);
|
||||
|
@ -146,14 +153,13 @@ int32_t tqOffsetSnapWrite(STqOffsetWriter* pWriter, uint8_t* pData, uint32_t nDa
|
|||
TdFilePtr pFile = taosOpenFile(pWriter->fname, TD_FILE_CREATE | TD_FILE_WRITE);
|
||||
SSnapDataHdr* pHdr = (SSnapDataHdr*)pData;
|
||||
int64_t size = pHdr->size;
|
||||
ASSERT(size == nData - sizeof(SSnapDataHdr));
|
||||
if (pFile) {
|
||||
int64_t contLen = taosWriteFile(pFile, pHdr->data, size);
|
||||
if (contLen != size) {
|
||||
ASSERT(0);
|
||||
taosCloseFile(&pFile);
|
||||
return -1;
|
||||
}
|
||||
} else {
|
||||
ASSERT(0);
|
||||
return -1;
|
||||
}
|
||||
return 0;
|
||||
|
|
|
@ -25,9 +25,7 @@ void tqTmrRspFunc(void* param, void* tmrId) {
|
|||
static int32_t tqLoopExecFromQueue(STQ* pTq, STqHandle* pHandle, SStreamDataSubmit** ppSubmit, SMqDataRsp* pRsp) {
|
||||
SStreamDataSubmit* pSubmit = *ppSubmit;
|
||||
while (pSubmit != NULL) {
|
||||
ASSERT(pSubmit->ver == pHandle->pushHandle.processedVer + 1);
|
||||
if (tqLogScanExec(pTq, &pHandle->execHandle, pSubmit->data, pRsp, 0) < 0) {
|
||||
/*ASSERT(0);*/
|
||||
}
|
||||
// update processed
|
||||
atomic_store_64(&pHandle->pushHandle.processedVer, pSubmit->ver);
|
||||
|
@ -160,8 +158,7 @@ int32_t tqPushMsgNew(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_
|
|||
if (msgType == TDMT_VND_SUBMIT) {
|
||||
tqLogScanExec(pTq, &pHandle->execHandle, pReq, &rsp, workerId);
|
||||
} else {
|
||||
// TODO
|
||||
ASSERT(0);
|
||||
tqError("tq push unexpected msg type %d", msgType);
|
||||
}
|
||||
|
||||
if (rsp.blockNum == 0) {
|
||||
|
@ -169,9 +166,6 @@ int32_t tqPushMsgNew(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_
|
|||
continue;
|
||||
}
|
||||
|
||||
ASSERT(taosArrayGetSize(rsp.blockData) == rsp.blockNum);
|
||||
ASSERT(taosArrayGetSize(rsp.blockDataLen) == rsp.blockNum);
|
||||
|
||||
rsp.rspOffset = fetchOffset;
|
||||
|
||||
int32_t tlen = sizeof(SMqRspHead) + tEncodeSMqDataBlkRsp(NULL, &rsp);
|
||||
|
@ -263,7 +257,7 @@ int tqPushMsg(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_t ver)
|
|||
SSDataBlock* pDataBlock = NULL;
|
||||
uint64_t ts = 0;
|
||||
if (qExecTask(task, &pDataBlock, &ts) < 0) {
|
||||
ASSERT(0);
|
||||
tqDebug("vgId:%d, tq exec error since %s", pTq->pVnode->config.vgId, terrstr());
|
||||
}
|
||||
|
||||
if (pDataBlock == NULL) {
|
||||
|
@ -282,7 +276,7 @@ int tqPushMsg(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_t ver)
|
|||
// remove from hash
|
||||
size_t kLen;
|
||||
void* key = taosHashGetKey(pIter, &kLen);
|
||||
void* keyCopy = taosMemoryMalloc(kLen);
|
||||
void* keyCopy = taosMemoryCalloc(1, kLen + 1);
|
||||
memcpy(keyCopy, key, kLen);
|
||||
|
||||
taosArrayPush(cachedKeys, &keyCopy);
|
||||
|
@ -296,7 +290,7 @@ int tqPushMsg(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_t ver)
|
|||
void* key = taosArrayGetP(cachedKeys, i);
|
||||
size_t kLen = *(size_t*)taosArrayGet(cachedKeyLens, i);
|
||||
if (taosHashRemove(pTq->pPushMgr, key, kLen) != 0) {
|
||||
ASSERT(0);
|
||||
tqError("vgId:%d, tq push hash remove key error, key: %s", pTq->pVnode->config.vgId, (char*)key);
|
||||
}
|
||||
}
|
||||
taosArrayDestroyP(cachedKeys, (FDelete)taosMemoryFree);
|
||||
|
|
|
@ -176,8 +176,6 @@ bool isValValidForTable(STqHandle* pHandle, SWalCont* pHead) {
|
|||
goto end;
|
||||
}
|
||||
realTbSuid = req.suid;
|
||||
} else {
|
||||
ASSERT(0);
|
||||
}
|
||||
|
||||
end:
|
||||
|
@ -206,7 +204,6 @@ int64_t tqFetchLog(STQ* pTq, STqHandle* pHandle, int64_t* fetchOffset, SWalCkHea
|
|||
code = walFetchBody(pHandle->pWalReader, ppCkHead);
|
||||
|
||||
if (code < 0) {
|
||||
ASSERT(0);
|
||||
*fetchOffset = offset;
|
||||
code = -1;
|
||||
goto END;
|
||||
|
@ -220,7 +217,6 @@ int64_t tqFetchLog(STQ* pTq, STqHandle* pHandle, int64_t* fetchOffset, SWalCkHea
|
|||
if (IS_META_MSG(pHead->msgType)) {
|
||||
code = walFetchBody(pHandle->pWalReader, ppCkHead);
|
||||
if (code < 0) {
|
||||
ASSERT(0);
|
||||
*fetchOffset = offset;
|
||||
code = -1;
|
||||
goto END;
|
||||
|
@ -238,7 +234,6 @@ int64_t tqFetchLog(STQ* pTq, STqHandle* pHandle, int64_t* fetchOffset, SWalCkHea
|
|||
}
|
||||
code = walSkipFetchBody(pHandle->pWalReader, *ppCkHead);
|
||||
if (code < 0) {
|
||||
ASSERT(0);
|
||||
*fetchOffset = offset;
|
||||
code = -1;
|
||||
goto END;
|
||||
|
@ -297,11 +292,8 @@ void tqCloseReader(STqReader* pReader) {
|
|||
|
||||
int32_t tqSeekVer(STqReader* pReader, int64_t ver) {
|
||||
if (walReadSeekVer(pReader->pWalReader, ver) < 0) {
|
||||
ASSERT(pReader->pWalReader->curInvalid);
|
||||
ASSERT(pReader->pWalReader->curVersion == ver);
|
||||
return -1;
|
||||
}
|
||||
ASSERT(pReader->pWalReader->curVersion == ver);
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
@ -317,7 +309,6 @@ int32_t tqNextBlock(STqReader* pReader, SFetchRet* ret) {
|
|||
ret->offset.version = pReader->ver;
|
||||
ret->fetchType = FETCH_TYPE__NONE;
|
||||
tqDebug("return offset %" PRId64 ", no more valid", ret->offset.version);
|
||||
ASSERT(ret->offset.version >= 0);
|
||||
return -1;
|
||||
}
|
||||
void* body = pReader->pWalReader->pHead->head.body;
|
||||
|
@ -340,7 +331,6 @@ int32_t tqNextBlock(STqReader* pReader, SFetchRet* ret) {
|
|||
memset(&ret->data, 0, sizeof(SSDataBlock));
|
||||
int32_t code = tqRetrieveDataBlock(&ret->data, pReader);
|
||||
if (code != 0 || ret->data.info.rows == 0) {
|
||||
ASSERT(0);
|
||||
continue;
|
||||
}
|
||||
ret->fetchType = FETCH_TYPE__DATA;
|
||||
|
@ -351,7 +341,6 @@ int32_t tqNextBlock(STqReader* pReader, SFetchRet* ret) {
|
|||
if (fromProcessedMsg) {
|
||||
ret->offset.type = TMQ_OFFSET__LOG;
|
||||
ret->offset.version = pReader->ver;
|
||||
ASSERT(pReader->ver >= 0);
|
||||
ret->fetchType = FETCH_TYPE__SEP;
|
||||
tqDebug("return offset %" PRId64 ", processed finish", ret->offset.version);
|
||||
return 0;
|
||||
|
@ -434,7 +423,6 @@ bool tqNextDataBlockFilterOut(STqReader* pHandle, SHashObj* filterOutUids) {
|
|||
}
|
||||
if (pHandle->pBlock == NULL) return false;
|
||||
|
||||
ASSERT(pHandle->tbIdHash == NULL);
|
||||
void* ret = taosHashGet(filterOutUids, &pHandle->msgIter.uid, sizeof(int64_t));
|
||||
if (ret == NULL) {
|
||||
return true;
|
||||
|
@ -453,7 +441,6 @@ int32_t tqRetrieveDataBlock(SSDataBlock* pBlock, STqReader* pReader) {
|
|||
if (pReader->pSchema == NULL) {
|
||||
tqWarn("cannot found tsschema for table: uid:%" PRId64 " (suid:%" PRId64 "), version %d, possibly dropped table",
|
||||
pReader->msgIter.uid, pReader->msgIter.suid, pReader->cachedSchemaVer);
|
||||
/*ASSERT(0);*/
|
||||
pReader->cachedSchemaSuid = 0;
|
||||
terrno = TSDB_CODE_TQ_TABLE_SCHEMA_NOT_FOUND;
|
||||
return -1;
|
||||
|
@ -464,7 +451,6 @@ int32_t tqRetrieveDataBlock(SSDataBlock* pBlock, STqReader* pReader) {
|
|||
if (pReader->pSchemaWrapper == NULL) {
|
||||
tqWarn("cannot found schema wrapper for table: suid:%" PRId64 ", version %d, possibly dropped table",
|
||||
pReader->msgIter.uid, pReader->cachedSchemaVer);
|
||||
/*ASSERT(0);*/
|
||||
pReader->cachedSchemaSuid = 0;
|
||||
terrno = TSDB_CODE_TQ_TABLE_SCHEMA_NOT_FOUND;
|
||||
return -1;
|
||||
|
@ -567,7 +553,6 @@ int32_t tqRetrieveTaosxBlock(STqReader* pReader, SArray* blocks, SArray* schemas
|
|||
if (pReader->pSchema == NULL) {
|
||||
tqWarn("cannot found tsschema for table: uid:%" PRId64 " (suid:%" PRId64 "), version %d, possibly dropped table",
|
||||
pReader->msgIter.uid, pReader->msgIter.suid, pReader->cachedSchemaVer);
|
||||
/*ASSERT(0);*/
|
||||
pReader->cachedSchemaSuid = 0;
|
||||
terrno = TSDB_CODE_TQ_TABLE_SCHEMA_NOT_FOUND;
|
||||
return -1;
|
||||
|
@ -578,7 +563,6 @@ int32_t tqRetrieveTaosxBlock(STqReader* pReader, SArray* blocks, SArray* schemas
|
|||
if (pReader->pSchemaWrapper == NULL) {
|
||||
tqWarn("cannot found schema wrapper for table: suid:%" PRId64 ", version %d, possibly dropped table",
|
||||
pReader->msgIter.uid, pReader->cachedSchemaVer);
|
||||
/*ASSERT(0);*/
|
||||
pReader->cachedSchemaSuid = 0;
|
||||
terrno = TSDB_CODE_TQ_TABLE_SCHEMA_NOT_FOUND;
|
||||
return -1;
|
||||
|
@ -671,8 +655,6 @@ int32_t tqRetrieveTaosxBlock(STqReader* pReader, SArray* blocks, SArray* schemas
|
|||
break;
|
||||
}
|
||||
|
||||
ASSERT(sVal.valType != TD_VTYPE_NONE);
|
||||
|
||||
if (colDataAppend(pColData, curRow, sVal.val, sVal.valType == TD_VTYPE_NULL) < 0) {
|
||||
goto FAIL;
|
||||
}
|
||||
|
@ -732,8 +714,6 @@ int tqReaderAddTbUidList(STqReader* pReader, const SArray* tbUidList) {
|
|||
}
|
||||
|
||||
int tqReaderRemoveTbUidList(STqReader* pReader, const SArray* tbUidList) {
|
||||
ASSERT(pReader->tbIdHash != NULL);
|
||||
|
||||
for (int32_t i = 0; i < taosArrayGetSize(tbUidList); i++) {
|
||||
int64_t* pKey = (int64_t*)taosArrayGet(tbUidList, i);
|
||||
taosHashRemove(pReader->tbIdHash, pKey, sizeof(int64_t));
|
||||
|
@ -750,7 +730,10 @@ int32_t tqUpdateTbUidList(STQ* pTq, const SArray* tbUidList, bool isAdd) {
|
|||
STqHandle* pExec = (STqHandle*)pIter;
|
||||
if (pExec->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) {
|
||||
int32_t code = qUpdateQualifiedTableId(pExec->execHandle.task, tbUidList, isAdd);
|
||||
ASSERT(code == 0);
|
||||
if (code != 0) {
|
||||
tqError("update qualified table error for %s", pExec->subKey);
|
||||
continue;
|
||||
}
|
||||
} else if (pExec->execHandle.subType == TOPIC_SUB_TYPE__DB) {
|
||||
if (!isAdd) {
|
||||
int32_t sz = taosArrayGetSize(tbUidList);
|
||||
|
@ -769,7 +752,7 @@ int32_t tqUpdateTbUidList(STQ* pTq, const SArray* tbUidList, bool isAdd) {
|
|||
|
||||
int32_t code = metaGetTableEntryByUidCache(&mr, *id);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
qError("failed to get table meta, uid:%" PRIu64 " code:%s", *id, tstrerror(terrno));
|
||||
tqError("failed to get table meta, uid:%" PRIu64 " code:%s", *id, tstrerror(terrno));
|
||||
continue;
|
||||
}
|
||||
|
||||
|
@ -790,8 +773,6 @@ int32_t tqUpdateTbUidList(STQ* pTq, const SArray* tbUidList, bool isAdd) {
|
|||
} else {
|
||||
// TODO handle delete table from stb
|
||||
}
|
||||
} else {
|
||||
ASSERT(0);
|
||||
}
|
||||
}
|
||||
while (1) {
|
||||
|
@ -800,7 +781,10 @@ int32_t tqUpdateTbUidList(STQ* pTq, const SArray* tbUidList, bool isAdd) {
|
|||
SStreamTask* pTask = *(SStreamTask**)pIter;
|
||||
if (pTask->taskLevel == TASK_LEVEL__SOURCE) {
|
||||
int32_t code = qUpdateQualifiedTableId(pTask->exec.executor, tbUidList, isAdd);
|
||||
ASSERT(code == 0);
|
||||
if (code != 0) {
|
||||
tqError("update qualified table error for stream task %d", pTask->taskId);
|
||||
continue;
|
||||
}
|
||||
}
|
||||
}
|
||||
return 0;
|
||||
|
|
|
@ -19,7 +19,6 @@
|
|||
|
||||
int32_t tqBuildDeleteReq(SVnode* pVnode, const char* stbFullName, const SSDataBlock* pDataBlock,
|
||||
SBatchDeleteReq* deleteReq) {
|
||||
ASSERT(pDataBlock->info.type == STREAM_DELETE_RESULT);
|
||||
int32_t totRow = pDataBlock->info.rows;
|
||||
SColumnInfoData* pStartTsCol = taosArrayGet(pDataBlock->pDataBlock, START_TS_COLUMN_INDEX);
|
||||
SColumnInfoData* pEndTsCol = taosArrayGet(pDataBlock->pDataBlock, END_TS_COLUMN_INDEX);
|
||||
|
@ -334,8 +333,8 @@ void tqSinkToTablePipeline(SStreamTask* pTask, void* vnode, int64_t ver, void* d
|
|||
int32_t code;
|
||||
tEncodeSize(tEncodeSBatchDeleteReq, &deleteReq, len, code);
|
||||
if (code < 0) {
|
||||
//
|
||||
ASSERT(0);
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
return;
|
||||
}
|
||||
SEncoder encoder;
|
||||
void* serializedDeleteReq = rpcMallocCont(len + sizeof(SMsgHead));
|
||||
|
@ -559,7 +558,6 @@ void tqSinkToTableMerge(SStreamTask* pTask, void* vnode, int64_t ver, void* data
|
|||
|
||||
tqDebug("vgId:%d, task %d write into table, block num: %d", TD_VID(pVnode), pTask->taskId, (int32_t)pRes->size);
|
||||
|
||||
ASSERT(pTask->tbSink.pTSchema);
|
||||
deleteReq.deleteReqs = taosArrayInit(0, sizeof(SSingleDeleteReq));
|
||||
SSubmitReq* submitReq = tqBlockToSubmit(pVnode, pRes, pTask->tbSink.pTSchema, pTask->tbSink.pSchemaWrapper, true,
|
||||
pTask->tbSink.stbUid, pTask->tbSink.stbFullName, &deleteReq);
|
||||
|
@ -570,10 +568,6 @@ void tqSinkToTableMerge(SStreamTask* pTask, void* vnode, int64_t ver, void* data
|
|||
int32_t code;
|
||||
int32_t len;
|
||||
tEncodeSize(tEncodeSBatchDeleteReq, &deleteReq, len, code);
|
||||
if (code < 0) {
|
||||
//
|
||||
ASSERT(0);
|
||||
}
|
||||
SEncoder encoder;
|
||||
void* serializedDeleteReq = rpcMallocCont(len + sizeof(SMsgHead));
|
||||
void* abuf = POINTER_SHIFT(serializedDeleteReq, sizeof(SMsgHead));
|
||||
|
|
|
@ -100,8 +100,6 @@ int32_t tqSnapRead(STqSnapReader* pReader, uint8_t** ppData) {
|
|||
}
|
||||
}
|
||||
|
||||
ASSERT(pVal && vLen);
|
||||
|
||||
*ppData = taosMemoryMalloc(sizeof(SSnapDataHdr) + vLen);
|
||||
if (*ppData == NULL) {
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
|
|
|
@ -100,8 +100,6 @@ int32_t tqSnapRead(STqSnapReader* pReader, uint8_t** ppData) {
|
|||
}
|
||||
}
|
||||
|
||||
ASSERT(pVal && vLen);
|
||||
|
||||
*ppData = taosMemoryMalloc(sizeof(SSnapDataHdr) + vLen);
|
||||
if (*ppData == NULL) {
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
|
@ -168,7 +166,6 @@ int32_t tqSnapWriterClose(STqSnapWriter** ppWriter, int8_t rollback) {
|
|||
|
||||
if (rollback) {
|
||||
tdbAbort(pWriter->pTq->pMetaDB, pWriter->txn);
|
||||
ASSERT(0);
|
||||
} else {
|
||||
code = tdbCommit(pWriter->pTq->pMetaDB, pWriter->txn);
|
||||
if (code) goto _err;
|
||||
|
|
|
@ -100,8 +100,6 @@ int32_t tqSnapRead(STqSnapReader* pReader, uint8_t** ppData) {
|
|||
}
|
||||
}
|
||||
|
||||
ASSERT(pVal && vLen);
|
||||
|
||||
*ppData = taosMemoryMalloc(sizeof(SSnapDataHdr) + vLen);
|
||||
if (*ppData == NULL) {
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
|
@ -146,7 +144,7 @@ int32_t tqSnapWriterOpen(STQ* pTq, int64_t sver, int64_t ever, STqSnapWriter** p
|
|||
pWriter->sver = sver;
|
||||
pWriter->ever = ever;
|
||||
|
||||
if (tdbBegin(pTq->pMetaStore, &pWriter->txn, tdbDefaultMalloc, tdbDefaultFree, NULL, 0) < 0) {
|
||||
if (tdbBegin(pTq->pMetaDB, &pWriter->txn, tdbDefaultMalloc, tdbDefaultFree, NULL, 0) < 0) {
|
||||
code = -1;
|
||||
taosMemoryFree(pWriter);
|
||||
goto _err;
|
||||
|
@ -167,12 +165,11 @@ int32_t tqSnapWriterClose(STqSnapWriter** ppWriter, int8_t rollback) {
|
|||
STQ* pTq = pWriter->pTq;
|
||||
|
||||
if (rollback) {
|
||||
tdbAbort(pWriter->pTq->pMetaStore, pWriter->txn);
|
||||
ASSERT(0);
|
||||
tdbAbort(pWriter->pTq->pMetaDB, pWriter->txn);
|
||||
} else {
|
||||
code = tdbCommit(pWriter->pTq->pMetaStore, pWriter->txn);
|
||||
code = tdbCommit(pWriter->pTq->pMetaDB, pWriter->txn);
|
||||
if (code) goto _err;
|
||||
code = tdbPostCommit(pWriter->pTq->pMetaStore, pWriter->txn);
|
||||
code = tdbPostCommit(pWriter->pTq->pMetaDB, pWriter->txn);
|
||||
if (code) goto _err;
|
||||
}
|
||||
|
||||
|
|
|
@ -768,8 +768,8 @@ static SSDataBlock* doTableScan(SOperatorInfo* pOperator) {
|
|||
tableListGetGroupList(pTaskInfo->pTableInfoList, pInfo->currentGroupId, &pList, &num);
|
||||
ASSERT(pInfo->base.dataReader == NULL);
|
||||
|
||||
int32_t code = tsdbReaderOpen(pInfo->base.readHandle.vnode, &pInfo->base.cond, pList, num,
|
||||
pInfo->pResBlock, (STsdbReader**)&pInfo->base.dataReader, GET_TASKID(pTaskInfo));
|
||||
int32_t code = tsdbReaderOpen(pInfo->base.readHandle.vnode, &pInfo->base.cond, pList, num, pInfo->pResBlock,
|
||||
(STsdbReader**)&pInfo->base.dataReader, GET_TASKID(pTaskInfo));
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
T_LONG_JMP(pTaskInfo->env, code);
|
||||
}
|
||||
|
@ -986,8 +986,8 @@ static SSDataBlock* readPreVersionData(SOperatorInfo* pTableScanOp, uint64_t tbU
|
|||
|
||||
SSDataBlock* pBlock = pTableScanInfo->pResBlock;
|
||||
STsdbReader* pReader = NULL;
|
||||
int32_t code = tsdbReaderOpen(pTableScanInfo->base.readHandle.vnode, &cond, &tblInfo, 1, pBlock, (STsdbReader**)&pReader,
|
||||
GET_TASKID(pTaskInfo));
|
||||
int32_t code = tsdbReaderOpen(pTableScanInfo->base.readHandle.vnode, &cond, &tblInfo, 1, pBlock,
|
||||
(STsdbReader**)&pReader, GET_TASKID(pTaskInfo));
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
terrno = code;
|
||||
T_LONG_JMP(pTaskInfo->env, code);
|
||||
|
@ -995,7 +995,7 @@ static SSDataBlock* readPreVersionData(SOperatorInfo* pTableScanOp, uint64_t tbU
|
|||
}
|
||||
|
||||
if (tsdbNextDataBlock(pReader)) {
|
||||
/*SSDataBlock* p = */tsdbRetrieveDataBlock(pReader, NULL);
|
||||
/*SSDataBlock* p = */ tsdbRetrieveDataBlock(pReader, NULL);
|
||||
doSetTagColumnData(&pTableScanInfo->base, pBlock, pTaskInfo, pBlock->info.rows);
|
||||
pBlock->info.id.groupId = getTableGroupId(pTaskInfo->pTableInfoList, pBlock->info.id.uid);
|
||||
}
|
||||
|
@ -1224,7 +1224,7 @@ static int32_t generateIntervalScanRange(SStreamScanInfo* pInfo, SSDataBlock* pS
|
|||
SColumnInfoData* pSrcUidCol = taosArrayGet(pSrcBlock->pDataBlock, UID_COLUMN_INDEX);
|
||||
SColumnInfoData* pSrcGpCol = taosArrayGet(pSrcBlock->pDataBlock, GROUPID_COLUMN_INDEX);
|
||||
|
||||
uint64_t* srcUidData = (uint64_t*)pSrcUidCol->pData;
|
||||
uint64_t* srcUidData = (uint64_t*)pSrcUidCol->pData;
|
||||
ASSERT(pSrcStartTsCol->info.type == TSDB_DATA_TYPE_TIMESTAMP);
|
||||
TSKEY* srcStartTsCol = (TSKEY*)pSrcStartTsCol->pData;
|
||||
TSKEY* srcEndTsCol = (TSKEY*)pSrcEndTsCol->pData;
|
||||
|
@ -1347,6 +1347,36 @@ static int32_t generateScanRange(SStreamScanInfo* pInfo, SSDataBlock* pSrcBlock,
|
|||
return code;
|
||||
}
|
||||
|
||||
#if 0
|
||||
void calBlockTag(SStreamScanInfo* pInfo, SSDataBlock* pBlock) {
|
||||
SExprSupp* pTagCalSup = &pInfo->tagCalSup;
|
||||
SStreamState* pState = pInfo->pStreamScanOp->pTaskInfo->streamInfo.pState;
|
||||
if (pTagCalSup == NULL || pTagCalSup->numOfExprs == 0) return;
|
||||
if (pBlock == NULL || pBlock->info.rows == 0) return;
|
||||
|
||||
void* tag = NULL;
|
||||
int32_t tagLen = 0;
|
||||
if (streamStateGetParTag(pState, pBlock->info.id.groupId, &tag, &tagLen) == 0) {
|
||||
pBlock->info.tagLen = tagLen;
|
||||
void* pTag = taosMemoryRealloc(pBlock->info.pTag, tagLen);
|
||||
if (pTag == NULL) {
|
||||
tdbFree(tag);
|
||||
taosMemoryFree(pBlock->info.pTag);
|
||||
pBlock->info.pTag = NULL;
|
||||
pBlock->info.tagLen = 0;
|
||||
return;
|
||||
}
|
||||
pBlock->info.pTag = pTag;
|
||||
memcpy(pBlock->info.pTag, tag, tagLen);
|
||||
tdbFree(tag);
|
||||
return;
|
||||
} else {
|
||||
pBlock->info.pTag = NULL;
|
||||
}
|
||||
tdbFree(tag);
|
||||
}
|
||||
#endif
|
||||
|
||||
void calBlockTbName(SStreamScanInfo* pInfo, SSDataBlock* pBlock) {
|
||||
SExprSupp* pTbNameCalSup = &pInfo->tbnameCalSup;
|
||||
SStreamState* pState = pInfo->pStreamScanOp->pTaskInfo->streamInfo.pState;
|
||||
|
@ -1354,10 +1384,12 @@ void calBlockTbName(SStreamScanInfo* pInfo, SSDataBlock* pBlock) {
|
|||
if (pBlock == NULL || pBlock->info.rows == 0) return;
|
||||
|
||||
void* tbname = NULL;
|
||||
if (streamStateGetParName(pInfo->pStreamScanOp->pTaskInfo->streamInfo.pState, pBlock->info.id.groupId, &tbname) < 0) {
|
||||
pBlock->info.parTbName[0] = 0;
|
||||
} else {
|
||||
if (streamStateGetParName(pState, pBlock->info.id.groupId, &tbname) == 0) {
|
||||
memcpy(pBlock->info.parTbName, tbname, TSDB_TABLE_NAME_LEN);
|
||||
tdbFree(tbname);
|
||||
return;
|
||||
} else {
|
||||
pBlock->info.parTbName[0] = 0;
|
||||
}
|
||||
tdbFree(tbname);
|
||||
|
||||
|
@ -2285,7 +2317,8 @@ SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhys
|
|||
if (pHandle->initTableReader) {
|
||||
pTSInfo->scanMode = TABLE_SCAN__TABLE_ORDER;
|
||||
pTSInfo->base.dataReader = NULL;
|
||||
code = tsdbReaderOpen(pHandle->vnode, &pTSInfo->base.cond, pList, num, pTSInfo->pResBlock, &pTSInfo->base.dataReader, NULL);
|
||||
code = tsdbReaderOpen(pHandle->vnode, &pTSInfo->base.cond, pList, num, pTSInfo->pResBlock,
|
||||
&pTSInfo->base.dataReader, NULL);
|
||||
if (code != 0) {
|
||||
terrno = code;
|
||||
destroyTableScanOperatorInfo(pTableScanOp);
|
||||
|
@ -2355,7 +2388,8 @@ SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhys
|
|||
pOperator->exprSupp.numOfExprs = taosArrayGetSize(pInfo->pRes->pDataBlock);
|
||||
|
||||
__optr_fn_t nextFn = pTaskInfo->execModel == OPTR_EXEC_MODEL_STREAM ? doStreamScan : doQueueScan;
|
||||
pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, nextFn, NULL, destroyStreamScanOperatorInfo, optrDefaultBufFn, NULL);
|
||||
pOperator->fpSet =
|
||||
createOperatorFpSet(optrDummyOpenFn, nextFn, NULL, destroyStreamScanOperatorInfo, optrDefaultBufFn, NULL);
|
||||
|
||||
return pOperator;
|
||||
|
||||
|
@ -2492,7 +2526,8 @@ SOperatorInfo* createTagScanOperatorInfo(SReadHandle* pReadHandle, STagScanPhysi
|
|||
initResultSizeInfo(&pOperator->resultInfo, 4096);
|
||||
blockDataEnsureCapacity(pInfo->pRes, pOperator->resultInfo.capacity);
|
||||
|
||||
pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doTagScan, NULL, destroyTagScanOperatorInfo, optrDefaultBufFn, NULL);
|
||||
pOperator->fpSet =
|
||||
createOperatorFpSet(optrDummyOpenFn, doTagScan, NULL, destroyTagScanOperatorInfo, optrDefaultBufFn, NULL);
|
||||
|
||||
return pOperator;
|
||||
|
||||
|
@ -2513,11 +2548,12 @@ static SSDataBlock* getTableDataBlockImpl(void* param) {
|
|||
|
||||
SQueryTableDataCond* pQueryCond = taosArrayGet(pInfo->queryConds, readIdx);
|
||||
|
||||
int64_t st = taosGetTimestampUs();
|
||||
void* p = tableListGetInfo(pTaskInfo->pTableInfoList, readIdx + pInfo->tableStartIndex);
|
||||
int64_t st = taosGetTimestampUs();
|
||||
void* p = tableListGetInfo(pTaskInfo->pTableInfoList, readIdx + pInfo->tableStartIndex);
|
||||
SReadHandle* pHandle = &pInfo->base.readHandle;
|
||||
|
||||
int32_t code = tsdbReaderOpen(pHandle->vnode, pQueryCond, p, 1, pBlock, &pInfo->base.dataReader, GET_TASKID(pTaskInfo));
|
||||
int32_t code =
|
||||
tsdbReaderOpen(pHandle->vnode, pQueryCond, p, 1, pBlock, &pInfo->base.dataReader, GET_TASKID(pTaskInfo));
|
||||
if (code != 0) {
|
||||
T_LONG_JMP(pTaskInfo->env, code);
|
||||
}
|
||||
|
@ -2915,8 +2951,8 @@ static void buildVnodeGroupedNtbTableCount(STableCountScanOperatorInfo*
|
|||
SSDataBlock* pRes, char* dbName);
|
||||
static void buildVnodeFilteredTbCount(SOperatorInfo* pOperator, STableCountScanOperatorInfo* pInfo,
|
||||
STableCountScanSupp* pSupp, SSDataBlock* pRes, char* dbName);
|
||||
static void buildVnodeGroupedTableCount(SOperatorInfo* pOperator, STableCountScanOperatorInfo* pInfo,
|
||||
STableCountScanSupp* pSupp, SSDataBlock* pRes, int32_t vgId, char* dbName);
|
||||
static void buildVnodeGroupedTableCount(SOperatorInfo* pOperator, STableCountScanOperatorInfo* pInfo,
|
||||
STableCountScanSupp* pSupp, SSDataBlock* pRes, int32_t vgId, char* dbName);
|
||||
static SSDataBlock* buildVnodeDbTableCount(SOperatorInfo* pOperator, STableCountScanOperatorInfo* pInfo,
|
||||
STableCountScanSupp* pSupp, SSDataBlock* pRes);
|
||||
static void buildSysDbGroupedTableCount(SOperatorInfo* pOperator, STableCountScanOperatorInfo* pInfo,
|
||||
|
@ -3041,8 +3077,8 @@ SOperatorInfo* createTableCountScanOperatorInfo(SReadHandle* readHandle, STableC
|
|||
|
||||
setOperatorInfo(pOperator, "TableCountScanOperator", QUERY_NODE_PHYSICAL_PLAN_TABLE_COUNT_SCAN, false, OP_NOT_OPENED,
|
||||
pInfo, pTaskInfo);
|
||||
pOperator->fpSet =
|
||||
createOperatorFpSet(optrDummyOpenFn, doTableCountScan, NULL, destoryTableCountScanOperator, optrDefaultBufFn, NULL);
|
||||
pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doTableCountScan, NULL, destoryTableCountScanOperator,
|
||||
optrDefaultBufFn, NULL);
|
||||
return pOperator;
|
||||
|
||||
_error:
|
||||
|
|
|
@ -159,6 +159,11 @@ SStreamState* streamStateOpen(char* path, SStreamTask* pTask, bool specPath, int
|
|||
goto _err;
|
||||
}
|
||||
|
||||
if (tdbTbOpen("partag.state.db", sizeof(int64_t), -1, NULL, pState->pTdbState->db, &pState->pTdbState->pParTagDb, 0) <
|
||||
0) {
|
||||
goto _err;
|
||||
}
|
||||
|
||||
if (streamStateBegin(pState) < 0) {
|
||||
goto _err;
|
||||
}
|
||||
|
@ -173,6 +178,7 @@ _err:
|
|||
tdbTbClose(pState->pTdbState->pFillStateDb);
|
||||
tdbTbClose(pState->pTdbState->pSessionStateDb);
|
||||
tdbTbClose(pState->pTdbState->pParNameDb);
|
||||
tdbTbClose(pState->pTdbState->pParTagDb);
|
||||
tdbClose(pState->pTdbState->db);
|
||||
streamStateDestroy(pState);
|
||||
return NULL;
|
||||
|
@ -186,6 +192,7 @@ void streamStateClose(SStreamState* pState) {
|
|||
tdbTbClose(pState->pTdbState->pFillStateDb);
|
||||
tdbTbClose(pState->pTdbState->pSessionStateDb);
|
||||
tdbTbClose(pState->pTdbState->pParNameDb);
|
||||
tdbTbClose(pState->pTdbState->pParTagDb);
|
||||
tdbClose(pState->pTdbState->db);
|
||||
|
||||
streamStateDestroy(pState);
|
||||
|
@ -821,10 +828,17 @@ _end:
|
|||
return res;
|
||||
}
|
||||
|
||||
int32_t streamStatePutParTag(SStreamState* pState, int64_t groupId, const void* tag, int32_t tagLen) {
|
||||
return tdbTbUpsert(pState->pTdbState->pParTagDb, &groupId, sizeof(int64_t), tag, tagLen, pState->pTdbState->txn);
|
||||
}
|
||||
|
||||
int32_t streamStateGetParTag(SStreamState* pState, int64_t groupId, void** tagVal, int32_t* tagLen) {
|
||||
return tdbTbGet(pState->pTdbState->pParTagDb, &groupId, sizeof(int64_t), tagVal, tagLen);
|
||||
}
|
||||
|
||||
int32_t streamStatePutParName(SStreamState* pState, int64_t groupId, const char tbname[TSDB_TABLE_NAME_LEN]) {
|
||||
tdbTbUpsert(pState->pTdbState->pParNameDb, &groupId, sizeof(int64_t), tbname, TSDB_TABLE_NAME_LEN,
|
||||
pState->pTdbState->txn);
|
||||
return 0;
|
||||
return tdbTbUpsert(pState->pTdbState->pParNameDb, &groupId, sizeof(int64_t), tbname, TSDB_TABLE_NAME_LEN,
|
||||
pState->pTdbState->txn);
|
||||
}
|
||||
|
||||
int32_t streamStateGetParName(SStreamState* pState, int64_t groupId, void** pVal) {
|
||||
|
|
|
@ -25,7 +25,7 @@ bool FORCE_INLINE walLogExist(SWal* pWal, int64_t ver) {
|
|||
}
|
||||
|
||||
bool FORCE_INLINE walIsEmpty(SWal* pWal) {
|
||||
return (pWal->vers.firstVer == -1 || pWal->vers.lastVer < pWal->vers.firstVer); // [firstVer, lastVer + 1)
|
||||
return (pWal->vers.firstVer == -1 || pWal->vers.lastVer < pWal->vers.firstVer); // [firstVer, lastVer + 1)
|
||||
}
|
||||
|
||||
int64_t FORCE_INLINE walGetFirstVer(SWal* pWal) { return pWal->vers.firstVer; }
|
||||
|
@ -49,7 +49,6 @@ static FORCE_INLINE int walBuildTmpMetaName(SWal* pWal, char* buf) {
|
|||
static FORCE_INLINE int64_t walScanLogGetLastVer(SWal* pWal, int32_t fileIdx) {
|
||||
int32_t sz = taosArrayGetSize(pWal->fileInfoSet);
|
||||
terrno = TSDB_CODE_SUCCESS;
|
||||
ASSERT(fileIdx >= 0 && fileIdx < sz);
|
||||
|
||||
SWalFileInfo* pFileInfo = taosArrayGet(pWal->fileInfoSet, fileIdx);
|
||||
char fnameStr[WAL_FILE_LEN];
|
||||
|
@ -101,7 +100,6 @@ static FORCE_INLINE int64_t walScanLogGetLastVer(SWal* pWal, int32_t fileIdx) {
|
|||
offsetBackward = offset;
|
||||
}
|
||||
|
||||
ASSERT(offset <= end);
|
||||
readSize = end - offset;
|
||||
capacity = readSize + sizeof(magic);
|
||||
|
||||
|
@ -257,7 +255,6 @@ static void walRebuildFileInfoSet(SArray* metaLogList, SArray* actualLogList) {
|
|||
SWalFileInfo* pLogInfo = taosArrayGet(actualLogList, i);
|
||||
while (j < metaFileNum) {
|
||||
SWalFileInfo* pMetaInfo = taosArrayGet(metaLogList, j);
|
||||
ASSERT(pMetaInfo != NULL);
|
||||
if (pMetaInfo->firstVer < pLogInfo->firstVer) {
|
||||
j++;
|
||||
} else if (pMetaInfo->firstVer == pLogInfo->firstVer) {
|
||||
|
@ -385,7 +382,6 @@ int walCheckAndRepairMeta(SWal* pWal) {
|
|||
taosArrayDestroy(actualLog);
|
||||
|
||||
int32_t sz = taosArrayGetSize(pWal->fileInfoSet);
|
||||
ASSERT(sz == actualFileNum);
|
||||
|
||||
// scan and determine the lastVer
|
||||
int32_t fileIdx = sz;
|
||||
|
@ -403,8 +399,6 @@ int walCheckAndRepairMeta(SWal* pWal) {
|
|||
return -1;
|
||||
}
|
||||
|
||||
ASSERT(pFileInfo->firstVer >= 0);
|
||||
|
||||
if (pFileInfo->lastVer >= pFileInfo->firstVer && fileSize == pFileInfo->fileSize) {
|
||||
totSize += pFileInfo->fileSize;
|
||||
continue;
|
||||
|
@ -417,7 +411,6 @@ int walCheckAndRepairMeta(SWal* pWal) {
|
|||
wError("failed to scan wal last ver since %s", terrstr());
|
||||
return -1;
|
||||
}
|
||||
ASSERT(pFileInfo->fileSize == 0);
|
||||
// remove the empty wal log, and its idx
|
||||
wInfo("vgId:%d, wal remove empty file %s", pWal->cfg.vgId, fnameStr);
|
||||
taosRemoveFile(fnameStr);
|
||||
|
@ -477,8 +470,7 @@ int walReadLogHead(TdFilePtr pLogFile, int64_t offset, SWalCkHead* pCkHead) {
|
|||
}
|
||||
|
||||
int walCheckAndRepairIdxFile(SWal* pWal, int32_t fileIdx) {
|
||||
int32_t sz = taosArrayGetSize(pWal->fileInfoSet);
|
||||
ASSERT(fileIdx >= 0 && fileIdx < sz);
|
||||
int32_t sz = taosArrayGetSize(pWal->fileInfoSet);
|
||||
SWalFileInfo* pFileInfo = taosArrayGet(pWal->fileInfoSet, fileIdx);
|
||||
char fnameStr[WAL_FILE_LEN];
|
||||
walBuildIdxName(pWal, pFileInfo->firstVer, fnameStr);
|
||||
|
@ -492,7 +484,6 @@ int walCheckAndRepairIdxFile(SWal* pWal, int32_t fileIdx) {
|
|||
return -1;
|
||||
}
|
||||
|
||||
ASSERT(pFileInfo->fileSize > 0 && pFileInfo->firstVer >= 0 && pFileInfo->lastVer >= pFileInfo->firstVer);
|
||||
if (fileSize == (pFileInfo->lastVer - pFileInfo->firstVer + 1) * sizeof(SWalIdxEntry)) {
|
||||
return 0;
|
||||
}
|
||||
|
@ -556,7 +547,7 @@ int walCheckAndRepairIdxFile(SWal* pWal, int32_t fileIdx) {
|
|||
}
|
||||
offset += sizeof(SWalIdxEntry);
|
||||
|
||||
ASSERT(offset == (idxEntry.ver - pFileInfo->firstVer + 1) * sizeof(SWalIdxEntry));
|
||||
/*A(offset == (idxEntry.ver - pFileInfo->firstVer + 1) * sizeof(SWalIdxEntry));*/
|
||||
|
||||
// ftruncate idx file
|
||||
if (offset < fileSize) {
|
||||
|
@ -577,7 +568,7 @@ int walCheckAndRepairIdxFile(SWal* pWal, int32_t fileIdx) {
|
|||
}
|
||||
|
||||
while (idxEntry.ver < pFileInfo->lastVer) {
|
||||
ASSERT(idxEntry.ver == ckHead.head.version);
|
||||
/*A(idxEntry.ver == ckHead.head.version);*/
|
||||
|
||||
idxEntry.ver += 1;
|
||||
idxEntry.offset += sizeof(SWalCkHead) + ckHead.head.bodyLen;
|
||||
|
@ -649,8 +640,7 @@ int walRollFileInfo(SWal* pWal) {
|
|||
}
|
||||
|
||||
char* walMetaSerialize(SWal* pWal) {
|
||||
char buf[30];
|
||||
ASSERT(pWal->fileInfoSet);
|
||||
char buf[30];
|
||||
int sz = taosArrayGetSize(pWal->fileInfoSet);
|
||||
cJSON* pRoot = cJSON_CreateObject();
|
||||
cJSON* pMeta = cJSON_CreateObject();
|
||||
|
@ -707,7 +697,7 @@ char* walMetaSerialize(SWal* pWal) {
|
|||
}
|
||||
|
||||
int walMetaDeserialize(SWal* pWal, const char* bytes) {
|
||||
ASSERT(taosArrayGetSize(pWal->fileInfoSet) == 0);
|
||||
/*A(taosArrayGetSize(pWal->fileInfoSet) == 0);*/
|
||||
cJSON *pRoot, *pMeta, *pFiles, *pInfoJson, *pField;
|
||||
pRoot = cJSON_Parse(bytes);
|
||||
if (!pRoot) goto _err;
|
||||
|
@ -823,7 +813,9 @@ int walSaveMeta(SWal* pWal) {
|
|||
|
||||
// flush to a tmpfile
|
||||
n = walBuildTmpMetaName(pWal, tmpFnameStr);
|
||||
ASSERT(n < sizeof(tmpFnameStr) && "Buffer overflow of file name");
|
||||
if (n >= sizeof(tmpFnameStr)) {
|
||||
return -1;
|
||||
}
|
||||
|
||||
TdFilePtr pMetaFile = taosOpenFile(tmpFnameStr, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_TRUNC);
|
||||
if (pMetaFile == NULL) {
|
||||
|
@ -854,7 +846,9 @@ int walSaveMeta(SWal* pWal) {
|
|||
|
||||
// rename it
|
||||
n = walBuildMetaName(pWal, metaVer + 1, fnameStr);
|
||||
ASSERT(n < sizeof(fnameStr) && "Buffer overflow of file name");
|
||||
if (n >= sizeof(fnameStr)) {
|
||||
goto _err;
|
||||
}
|
||||
|
||||
if (taosRenameFile(tmpFnameStr, fnameStr) < 0) {
|
||||
wError("failed to rename file due to %s. dest:%s", strerror(errno), fnameStr);
|
||||
|
@ -877,7 +871,6 @@ _err:
|
|||
}
|
||||
|
||||
int walLoadMeta(SWal* pWal) {
|
||||
ASSERT(pWal->fileInfoSet->size == 0);
|
||||
// find existing meta file
|
||||
int metaVer = walFindCurMetaVer(pWal);
|
||||
if (metaVer == -1) {
|
||||
|
|
|
@ -97,7 +97,6 @@ int32_t walNextValidMsg(SWalReader *pReader) {
|
|||
return -1;
|
||||
}
|
||||
fetchVer++;
|
||||
ASSERT(fetchVer == pReader->curVersion);
|
||||
}
|
||||
}
|
||||
pReader->curStopped = 1;
|
||||
|
@ -132,7 +131,6 @@ static int64_t walReadSeekFilePos(SWalReader *pReader, int64_t fileFirstVer, int
|
|||
return -1;
|
||||
}
|
||||
|
||||
ASSERT(entry.ver == ver);
|
||||
ret = taosLSeekFile(pLogTFile, entry.offset, SEEK_SET);
|
||||
if (ret < 0) {
|
||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||
|
@ -241,7 +239,6 @@ static int32_t walFetchHeadNew(SWalReader *pRead, int64_t fetchVer) {
|
|||
|
||||
if (pRead->curInvalid || pRead->curVersion != fetchVer) {
|
||||
if (walReadSeekVer(pRead, fetchVer) < 0) {
|
||||
ASSERT(0);
|
||||
pRead->curVersion = fetchVer;
|
||||
pRead->curInvalid = 1;
|
||||
return -1;
|
||||
|
@ -262,7 +259,6 @@ static int32_t walFetchHeadNew(SWalReader *pRead, int64_t fetchVer) {
|
|||
} else {
|
||||
terrno = TSDB_CODE_WAL_FILE_CORRUPTED;
|
||||
}
|
||||
ASSERT(0);
|
||||
pRead->curInvalid = 1;
|
||||
return -1;
|
||||
}
|
||||
|
@ -299,7 +295,6 @@ static int32_t walFetchBodyNew(SWalReader *pRead) {
|
|||
terrno = TSDB_CODE_WAL_FILE_CORRUPTED;
|
||||
}
|
||||
pRead->curInvalid = 1;
|
||||
ASSERT(0);
|
||||
return -1;
|
||||
}
|
||||
|
||||
|
@ -308,7 +303,6 @@ static int32_t walFetchBodyNew(SWalReader *pRead) {
|
|||
pRead->pHead->head.version, ver);
|
||||
pRead->curInvalid = 1;
|
||||
terrno = TSDB_CODE_WAL_FILE_CORRUPTED;
|
||||
ASSERT(0);
|
||||
return -1;
|
||||
}
|
||||
|
||||
|
@ -316,7 +310,6 @@ static int32_t walFetchBodyNew(SWalReader *pRead) {
|
|||
wError("vgId:%d, wal fetch body error:%" PRId64 ", since body checksum not passed", pRead->pWal->cfg.vgId, ver);
|
||||
pRead->curInvalid = 1;
|
||||
terrno = TSDB_CODE_WAL_FILE_CORRUPTED;
|
||||
ASSERT(0);
|
||||
return -1;
|
||||
}
|
||||
|
||||
|
@ -328,14 +321,10 @@ static int32_t walFetchBodyNew(SWalReader *pRead) {
|
|||
static int32_t walSkipFetchBodyNew(SWalReader *pRead) {
|
||||
int64_t code;
|
||||
|
||||
ASSERT(pRead->curVersion == pRead->pHead->head.version);
|
||||
ASSERT(pRead->curInvalid == 0);
|
||||
|
||||
code = taosLSeekFile(pRead->pLogFile, pRead->pHead->head.bodyLen, SEEK_CUR);
|
||||
if (code < 0) {
|
||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||
pRead->curInvalid = 1;
|
||||
ASSERT(0);
|
||||
return -1;
|
||||
}
|
||||
|
||||
|
@ -384,7 +373,6 @@ int32_t walFetchHead(SWalReader *pRead, int64_t ver, SWalCkHead *pHead) {
|
|||
} else {
|
||||
terrno = TSDB_CODE_WAL_FILE_CORRUPTED;
|
||||
}
|
||||
ASSERT(0);
|
||||
pRead->curInvalid = 1;
|
||||
return -1;
|
||||
}
|
||||
|
@ -410,9 +398,6 @@ int32_t walSkipFetchBody(SWalReader *pRead, const SWalCkHead *pHead) {
|
|||
pRead->pWal->cfg.vgId, pHead->head.version, pRead->pWal->vers.firstVer, pRead->pWal->vers.commitVer,
|
||||
pRead->pWal->vers.lastVer, pRead->pWal->vers.appliedVer);
|
||||
|
||||
ASSERT(pRead->curVersion == pHead->head.version);
|
||||
ASSERT(pRead->curInvalid == 0);
|
||||
|
||||
code = taosLSeekFile(pRead->pLogFile, pHead->head.bodyLen, SEEK_CUR);
|
||||
if (code < 0) {
|
||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||
|
@ -447,7 +432,6 @@ int32_t walFetchBody(SWalReader *pRead, SWalCkHead **ppHead) {
|
|||
|
||||
if (pReadHead->bodyLen != taosReadFile(pRead->pLogFile, pReadHead->body, pReadHead->bodyLen)) {
|
||||
if (pReadHead->bodyLen < 0) {
|
||||
ASSERT(0);
|
||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||
wError("vgId:%d, wal fetch body error:%" PRId64 ", read request index:%" PRId64 ", since %s",
|
||||
pRead->pWal->cfg.vgId, pReadHead->version, ver, tstrerror(terrno));
|
||||
|
@ -457,12 +441,10 @@ int32_t walFetchBody(SWalReader *pRead, SWalCkHead **ppHead) {
|
|||
terrno = TSDB_CODE_WAL_FILE_CORRUPTED;
|
||||
}
|
||||
pRead->curInvalid = 1;
|
||||
ASSERT(0);
|
||||
return -1;
|
||||
}
|
||||
|
||||
if (pReadHead->version != ver) {
|
||||
ASSERT(0);
|
||||
wError("vgId:%d, wal fetch body error, index:%" PRId64 ", read request index:%" PRId64, pRead->pWal->cfg.vgId,
|
||||
pReadHead->version, ver);
|
||||
pRead->curInvalid = 1;
|
||||
|
@ -471,7 +453,6 @@ int32_t walFetchBody(SWalReader *pRead, SWalCkHead **ppHead) {
|
|||
}
|
||||
|
||||
if (walValidBodyCksum(*ppHead) != 0) {
|
||||
ASSERT(0);
|
||||
wError("vgId:%d, wal fetch body error, index:%" PRId64 ", since body checksum not passed", pRead->pWal->cfg.vgId,
|
||||
ver);
|
||||
pRead->curInvalid = 1;
|
||||
|
|
|
@ -61,7 +61,7 @@ int32_t walRefVer(SWalRef *pRef, int64_t ver) {
|
|||
SWalFileInfo tmpInfo;
|
||||
tmpInfo.firstVer = ver;
|
||||
SWalFileInfo *pRet = taosArraySearch(pWal->fileInfoSet, &tmpInfo, compareWalFileInfo, TD_LE);
|
||||
ASSERT(pRet != NULL);
|
||||
/*A(pRet != NULL);*/
|
||||
pRef->refFile = pRet->firstVer;
|
||||
|
||||
taosThreadMutexUnlock(&pWal->mutex);
|
||||
|
@ -80,6 +80,7 @@ void walUnrefVer(SWalRef *pRef) {
|
|||
SWalRef *walRefCommittedVer(SWal *pWal) {
|
||||
SWalRef *pRef = walOpenRef(pWal);
|
||||
if (pRef == NULL) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
return NULL;
|
||||
}
|
||||
taosThreadMutexLock(&pWal->mutex);
|
||||
|
@ -91,7 +92,7 @@ SWalRef *walRefCommittedVer(SWal *pWal) {
|
|||
SWalFileInfo tmpInfo;
|
||||
tmpInfo.firstVer = ver;
|
||||
SWalFileInfo *pRet = taosArraySearch(pWal->fileInfoSet, &tmpInfo, compareWalFileInfo, TD_LE);
|
||||
ASSERT(pRet != NULL);
|
||||
/*A(pRet != NULL);*/
|
||||
pRef->refFile = pRet->firstVer;
|
||||
|
||||
taosThreadMutexUnlock(&pWal->mutex);
|
||||
|
|
|
@ -40,7 +40,6 @@ static int64_t walSeekWritePos(SWal* pWal, int64_t ver) {
|
|||
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||
return -1;
|
||||
}
|
||||
ASSERT(entry.ver == ver);
|
||||
code = taosLSeekFile(pLogTFile, entry.offset, SEEK_SET);
|
||||
if (code < 0) {
|
||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||
|
@ -53,8 +52,7 @@ static int64_t walSeekWritePos(SWal* pWal, int64_t ver) {
|
|||
int walInitWriteFile(SWal* pWal) {
|
||||
TdFilePtr pIdxTFile, pLogTFile;
|
||||
SWalFileInfo* pRet = taosArrayGetLast(pWal->fileInfoSet);
|
||||
ASSERT(pRet != NULL);
|
||||
int64_t fileFirstVer = pRet->firstVer;
|
||||
int64_t fileFirstVer = pRet->firstVer;
|
||||
|
||||
char fnameStr[WAL_FILE_LEN];
|
||||
walBuildIdxName(pWal, fileFirstVer, fnameStr);
|
||||
|
@ -109,9 +107,8 @@ int64_t walChangeWrite(SWal* pWal, int64_t ver) {
|
|||
tmpInfo.firstVer = ver;
|
||||
// bsearch in fileSet
|
||||
int32_t idx = taosArraySearchIdx(pWal->fileInfoSet, &tmpInfo, compareWalFileInfo, TD_LE);
|
||||
ASSERT(idx != -1);
|
||||
/*A(idx != -1);*/
|
||||
SWalFileInfo* pFileInfo = taosArrayGet(pWal->fileInfoSet, idx);
|
||||
/*ASSERT(pFileInfo != NULL);*/
|
||||
|
||||
int64_t fileFirstVer = pFileInfo->firstVer;
|
||||
walBuildIdxName(pWal, fileFirstVer, fnameStr);
|
||||
|
|
|
@ -87,12 +87,10 @@ int32_t walApplyVer(SWal *pWal, int64_t ver) {
|
|||
}
|
||||
|
||||
int32_t walCommit(SWal *pWal, int64_t ver) {
|
||||
ASSERT(pWal->vers.commitVer >= pWal->vers.snapshotVer);
|
||||
ASSERT(pWal->vers.commitVer <= pWal->vers.lastVer);
|
||||
if (ver < pWal->vers.commitVer) {
|
||||
return 0;
|
||||
}
|
||||
if (ver > pWal->vers.lastVer) {
|
||||
if (ver > pWal->vers.lastVer || pWal->vers.commitVer < pWal->vers.snapshotVer) {
|
||||
terrno = TSDB_CODE_WAL_INVALID_VER;
|
||||
return -1;
|
||||
}
|
||||
|
@ -138,25 +136,21 @@ int32_t walRollback(SWal *pWal, int64_t ver) {
|
|||
TdFilePtr pIdxFile = taosOpenFile(fnameStr, TD_FILE_WRITE | TD_FILE_READ | TD_FILE_APPEND);
|
||||
|
||||
if (pIdxFile == NULL) {
|
||||
ASSERT(0);
|
||||
taosThreadMutexUnlock(&pWal->mutex);
|
||||
return -1;
|
||||
}
|
||||
int64_t idxOff = walGetVerIdxOffset(pWal, ver);
|
||||
code = taosLSeekFile(pIdxFile, idxOff, SEEK_SET);
|
||||
if (code < 0) {
|
||||
ASSERT(0);
|
||||
taosThreadMutexUnlock(&pWal->mutex);
|
||||
return -1;
|
||||
}
|
||||
// read idx file and get log file pos
|
||||
SWalIdxEntry entry;
|
||||
if (taosReadFile(pIdxFile, &entry, sizeof(SWalIdxEntry)) != sizeof(SWalIdxEntry)) {
|
||||
ASSERT(0);
|
||||
taosThreadMutexUnlock(&pWal->mutex);
|
||||
return -1;
|
||||
}
|
||||
ASSERT(entry.ver == ver);
|
||||
|
||||
walBuildLogName(pWal, walGetCurFileFirstVer(pWal), fnameStr);
|
||||
TdFilePtr pLogFile = taosOpenFile(fnameStr, TD_FILE_WRITE | TD_FILE_READ | TD_FILE_APPEND);
|
||||
|
@ -176,24 +170,19 @@ int32_t walRollback(SWal *pWal, int64_t ver) {
|
|||
}
|
||||
// validate offset
|
||||
SWalCkHead head;
|
||||
ASSERT(taosValidFile(pLogFile));
|
||||
int64_t size = taosReadFile(pLogFile, &head, sizeof(SWalCkHead));
|
||||
int64_t size = taosReadFile(pLogFile, &head, sizeof(SWalCkHead));
|
||||
if (size != sizeof(SWalCkHead)) {
|
||||
ASSERT(0);
|
||||
taosThreadMutexUnlock(&pWal->mutex);
|
||||
return -1;
|
||||
}
|
||||
code = walValidHeadCksum(&head);
|
||||
|
||||
ASSERT(code == 0);
|
||||
if (code != 0) {
|
||||
terrno = TSDB_CODE_WAL_FILE_CORRUPTED;
|
||||
ASSERT(0);
|
||||
taosThreadMutexUnlock(&pWal->mutex);
|
||||
return -1;
|
||||
}
|
||||
if (head.head.version != ver) {
|
||||
ASSERT(0);
|
||||
terrno = TSDB_CODE_WAL_FILE_CORRUPTED;
|
||||
taosThreadMutexUnlock(&pWal->mutex);
|
||||
return -1;
|
||||
|
@ -202,22 +191,22 @@ int32_t walRollback(SWal *pWal, int64_t ver) {
|
|||
// truncate old files
|
||||
code = taosFtruncateFile(pLogFile, entry.offset);
|
||||
if (code < 0) {
|
||||
ASSERT(0);
|
||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||
taosThreadMutexUnlock(&pWal->mutex);
|
||||
return -1;
|
||||
}
|
||||
code = taosFtruncateFile(pIdxFile, idxOff);
|
||||
if (code < 0) {
|
||||
ASSERT(0);
|
||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||
taosThreadMutexUnlock(&pWal->mutex);
|
||||
return -1;
|
||||
}
|
||||
pWal->vers.lastVer = ver - 1;
|
||||
#if 0
|
||||
if (pWal->vers.lastVer < pWal->vers.firstVer) {
|
||||
ASSERT(pWal->vers.lastVer == pWal->vers.firstVer - 1);
|
||||
A(pWal->vers.lastVer == pWal->vers.firstVer - 1);
|
||||
}
|
||||
#endif
|
||||
((SWalFileInfo *)taosArrayGetLast(pWal->fileInfoSet))->lastVer = ver - 1;
|
||||
((SWalFileInfo *)taosArrayGetLast(pWal->fileInfoSet))->fileSize = entry.offset;
|
||||
taosCloseFile(&pIdxFile);
|
||||
|
@ -386,7 +375,8 @@ int32_t walEndSnapshot(SWal *pWal) {
|
|||
walBuildIdxName(pWal, pInfo->firstVer, fnameStr);
|
||||
wDebug("vgId:%d, wal remove file %s", pWal->cfg.vgId, fnameStr);
|
||||
if (taosRemoveFile(fnameStr) < 0 && errno != ENOENT) {
|
||||
ASSERT(0);
|
||||
wError("vgId:%d, failed to remove idx file %s due to %s", pWal->cfg.vgId, fnameStr, strerror(errno));
|
||||
goto END;
|
||||
}
|
||||
}
|
||||
taosArrayClear(pWal->toDeleteFiles);
|
||||
|
@ -441,7 +431,6 @@ int32_t walRollImpl(SWal *pWal) {
|
|||
pWal->pIdxFile = pIdxFile;
|
||||
pWal->pLogFile = pLogFile;
|
||||
pWal->writeCur = taosArrayGetSize(pWal->fileInfoSet) - 1;
|
||||
ASSERT(pWal->writeCur >= 0);
|
||||
|
||||
pWal->lastRollSeq = walGetSeq();
|
||||
|
||||
|
@ -458,8 +447,7 @@ END:
|
|||
static int32_t walWriteIndex(SWal *pWal, int64_t ver, int64_t offset) {
|
||||
SWalIdxEntry entry = {.ver = ver, .offset = offset};
|
||||
SWalFileInfo *pFileInfo = walGetCurFileInfo(pWal);
|
||||
ASSERT(pFileInfo != NULL);
|
||||
ASSERT(pFileInfo->firstVer >= 0);
|
||||
|
||||
int64_t idxOffset = (entry.ver - pFileInfo->firstVer) * sizeof(SWalIdxEntry);
|
||||
wDebug("vgId:%d, write index, index:%" PRId64 ", offset:%" PRId64 ", at %" PRId64, pWal->cfg.vgId, ver, offset,
|
||||
idxOffset);
|
||||
|
@ -476,7 +464,6 @@ static int32_t walWriteIndex(SWal *pWal, int64_t ver, int64_t offset) {
|
|||
if (endOffset < 0) {
|
||||
wFatal("vgId:%d, failed to seek end of idxfile due to %s. ver:%" PRId64 "", pWal->cfg.vgId, strerror(errno), ver);
|
||||
}
|
||||
ASSERT(endOffset == idxOffset + sizeof(SWalIdxEntry) && "Offset of idx entries misaligned");
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
@ -486,9 +473,7 @@ static FORCE_INLINE int32_t walWriteImpl(SWal *pWal, int64_t index, tmsg_t msgTy
|
|||
|
||||
int64_t offset = walGetCurFileOffset(pWal);
|
||||
SWalFileInfo *pFileInfo = walGetCurFileInfo(pWal);
|
||||
ASSERT(pFileInfo != NULL);
|
||||
|
||||
ASSERT(pFileInfo->firstVer != -1);
|
||||
pWal->writeHead.head.version = index;
|
||||
pWal->writeHead.head.bodyLen = bodyLen;
|
||||
pWal->writeHead.head.msgType = msgType;
|
||||
|
@ -525,7 +510,6 @@ static FORCE_INLINE int32_t walWriteImpl(SWal *pWal, int64_t index, tmsg_t msgTy
|
|||
|
||||
// set status
|
||||
if (pWal->vers.firstVer == -1) {
|
||||
ASSERT(index == 0);
|
||||
pWal->vers.firstVer = 0;
|
||||
}
|
||||
pWal->vers.lastVer = index;
|
||||
|
@ -541,7 +525,6 @@ END:
|
|||
wFatal("vgId:%d, failed to ftruncate logfile to offset:%" PRId64 " during recovery due to %s", pWal->cfg.vgId,
|
||||
offset, strerror(errno));
|
||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||
ASSERT(0 && "failed to recover from error");
|
||||
}
|
||||
|
||||
int64_t idxOffset = (index - pFileInfo->firstVer) * sizeof(SWalIdxEntry);
|
||||
|
@ -549,7 +532,6 @@ END:
|
|||
wFatal("vgId:%d, failed to ftruncate idxfile to offset:%" PRId64 "during recovery due to %s", pWal->cfg.vgId,
|
||||
idxOffset, strerror(errno));
|
||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||
ASSERT(0 && "failed to recover from error");
|
||||
}
|
||||
return -1;
|
||||
}
|
||||
|
@ -576,8 +558,6 @@ int64_t walAppendLog(SWal *pWal, int64_t index, tmsg_t msgType, SWalSyncInfo syn
|
|||
}
|
||||
}
|
||||
|
||||
ASSERT(pWal->pLogFile != NULL && pWal->pIdxFile != NULL && pWal->writeCur >= 0);
|
||||
|
||||
if (walWriteImpl(pWal, index, msgType, syncMeta, body, bodyLen) < 0) {
|
||||
taosThreadMutexUnlock(&pWal->mutex);
|
||||
return -1;
|
||||
|
@ -614,8 +594,6 @@ int32_t walWriteWithSyncInfo(SWal *pWal, int64_t index, tmsg_t msgType, SWalSync
|
|||
}
|
||||
}
|
||||
|
||||
ASSERT(pWal->pIdxFile != NULL && pWal->pLogFile != NULL && pWal->writeCur >= 0);
|
||||
|
||||
if (walWriteImpl(pWal, index, msgType, syncMeta, body, bodyLen) < 0) {
|
||||
taosThreadMutexUnlock(&pWal->mutex);
|
||||
return -1;
|
||||
|
|
Loading…
Reference in New Issue