diff --git a/include/common/tmsgdef.h b/include/common/tmsgdef.h index d6071ebcf3..592672b32b 100644 --- a/include/common/tmsgdef.h +++ b/include/common/tmsgdef.h @@ -159,6 +159,7 @@ enum { TD_DEF_MSG_TYPE(TDMT_VND_MQ_QUERY, "vnode-mq-query", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_VND_MQ_CONNECT, "vnode-mq-connect", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_VND_MQ_DISCONNECT, "vnode-mq-disconnect", NULL, NULL) + TD_DEF_MSG_TYPE(TDMT_VND_MQ_SET_CONN, "vnode-mq-set-conn", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_VND_MQ_SET_CUR, "vnode-mq-set-cur", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_VND_RES_READY, "vnode-res-ready", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_VND_TASKS_STATUS, "vnode-tasks-status", NULL, NULL) diff --git a/include/util/taoserror.h b/include/util/taoserror.h index ae36ac7216..a10dd8eda2 100644 --- a/include/util/taoserror.h +++ b/include/util/taoserror.h @@ -400,6 +400,8 @@ int32_t* taosGetErrno(); #define TSDB_CODE_WAL_APP_ERROR TAOS_DEF_ERROR_CODE(0, 0x1000) //"Unexpected generic error in wal") #define TSDB_CODE_WAL_FILE_CORRUPTED TAOS_DEF_ERROR_CODE(0, 0x1001) //"WAL file is corrupted") #define TSDB_CODE_WAL_SIZE_LIMIT TAOS_DEF_ERROR_CODE(0, 0x1002) //"WAL size exceeds limit") +#define TSDB_CODE_WAL_INVALID_VER TAOS_DEF_ERROR_CODE(0, 0x1003) //"WAL invalid version") +#define TSDB_CODE_WAL_OUT_OF_MEMORY TAOS_DEF_ERROR_CODE(0, 0x1004) //"WAL out of memory") // tfs #define TSDB_CODE_FS_OUT_OF_MEMORY TAOS_DEF_ERROR_CODE(0, 0x2200) //"tfs out of memory") diff --git a/source/dnode/mnode/impl/inc/mndConsumer.h b/source/dnode/mnode/impl/inc/mndConsumer.h index 60f186d7d2..68ba08b66e 100644 --- a/source/dnode/mnode/impl/inc/mndConsumer.h +++ b/source/dnode/mnode/impl/inc/mndConsumer.h @@ -25,11 +25,8 @@ extern "C" { int32_t mndInitConsumer(SMnode *pMnode); void mndCleanupConsumer(SMnode *pMnode); -SConsumerObj *mndAcquireConsumer(SMnode *pMnode, int32_t consumerId); -void mndReleaseConsumer(SMnode *pMnode, SConsumerObj *pConsumer); - -SCGroupObj *mndAcquireCGroup(SMnode *pMnode, char *consumerGroup); -void mndReleaseCGroup(SMnode *pMnode, SCGroupObj *pCGroup); +SMqConsumerObj *mndAcquireConsumer(SMnode *pMnode, int32_t consumerId); +void mndReleaseConsumer(SMnode *pMnode, SMqConsumerObj *pConsumer); #ifdef __cplusplus } diff --git a/source/dnode/mnode/impl/inc/mndDef.h b/source/dnode/mnode/impl/inc/mndDef.h index 1af9b840a9..b490eeac96 100644 --- a/source/dnode/mnode/impl/inc/mndDef.h +++ b/source/dnode/mnode/impl/inc/mndDef.h @@ -302,6 +302,7 @@ typedef struct { char payload[]; } SShowObj; +#if 0 typedef struct SConsumerObj { uint64_t uid; int64_t createTime; @@ -309,17 +310,59 @@ typedef struct SConsumerObj { //uint64_t dbUid; int32_t version; SRWLatch lock; - SList* topics; + SArray* topics; } SConsumerObj; +typedef struct SMqTopicConsumer { + int64_t consumerId; + SList* topicList; +} SMqTopicConsumer; +#endif + +typedef struct SMqCGroup { + char name[TSDB_CONSUMER_GROUP_LEN]; + int32_t status; // 0 - uninitialized, 1 - wait rebalance, 2- normal + SList *consumerIds; // SList + SList *idleVGroups; // SList +} SMqCGroup; + +typedef struct SMqTopicObj { + char name[TSDB_TOPIC_FNAME_LEN]; + char db[TSDB_DB_FNAME_LEN]; + int64_t createTime; + int64_t updateTime; + uint64_t uid; + uint64_t dbUid; + int32_t version; + SRWLatch lock; + int32_t sqlLen; + char *sql; + char *logicalPlan; + char *physicalPlan; + SHashObj *cgroups; // SHashObj +} SMqTopicObj; + +// TODO: add cache and change name to id +typedef struct SMqConsumerTopic { + char name[TSDB_TOPIC_FNAME_LEN]; + SList *vgroups; // SList +} SMqConsumerTopic; + +typedef struct SMqConsumerObj { + SRWLatch lock; + int64_t consumerId; + char cgroup[TSDB_CONSUMER_GROUP_LEN]; + SArray *topics; // SArray +} SMqConsumerObj; + typedef struct SMqSubConsumerObj { int64_t consumerUid; // if -1, unassigned - SList* vgId; //SList + SList *vgId; // SList } SMqSubConsumerObj; typedef struct SMqSubCGroupObj { char name[TSDB_CONSUMER_GROUP_LEN]; - SList* consumers; //SList + SList *consumers; // SList } SMqSubCGroupObj; typedef struct SMqSubTopicObj { @@ -332,32 +375,33 @@ typedef struct SMqSubTopicObj { int32_t version; SRWLatch lock; int32_t sqlLen; - char* sql; - char* logicalPlan; - char* physicalPlan; - SList* cgroups; //SList + char *sql; + char *logicalPlan; + char *physicalPlan; + SList *cgroups; // SList } SMqSubTopicObj; typedef struct SMqConsumerSubObj { int64_t topicUid; - SList* vgIds; //SList + SList *vgIds; // SList } SMqConsumerSubObj; typedef struct SMqConsumerHbObj { int64_t consumerId; - SList* consumerSubs; //SList + SList *consumerSubs; // SList } SMqConsumerHbObj; typedef struct SMqVGroupSubObj { int64_t topicUid; - SList* consumerIds; //SList + SList *consumerIds; // SList } SMqVGroupSubObj; typedef struct SMqVGroupHbObj { int64_t vgId; - SList* vgSubs; //SList + SList *vgSubs; // SList } SMqVGroupHbObj; +#if 0 typedef struct SCGroupObj { char name[TSDB_TOPIC_FNAME_LEN]; int64_t createTime; @@ -368,24 +412,7 @@ typedef struct SCGroupObj { SRWLatch lock; SList* consumerIds; } SCGroupObj; - -typedef struct { - char name[TSDB_TOPIC_FNAME_LEN]; - char db[TSDB_DB_FNAME_LEN]; - int64_t createTime; - int64_t updateTime; - uint64_t uid; - uint64_t dbUid; - int32_t version; - SRWLatch lock; - int32_t execLen; - void* executor; - int32_t sqlLen; - char* sql; - char* logicalPlan; - char* physicalPlan; - SList* consumerIds; -} STopicObj; +#endif typedef struct SMnodeMsg { char user[TSDB_USER_LEN]; diff --git a/source/dnode/mnode/impl/inc/mndTopic.h b/source/dnode/mnode/impl/inc/mndTopic.h index d092f47d4b..fd82c60d37 100644 --- a/source/dnode/mnode/impl/inc/mndTopic.h +++ b/source/dnode/mnode/impl/inc/mndTopic.h @@ -25,8 +25,11 @@ extern "C" { int32_t mndInitTopic(SMnode *pMnode); void mndCleanupTopic(SMnode *pMnode); -STopicObj *mndAcquireTopic(SMnode *pMnode, char *topicName); -void mndReleaseTopic(SMnode *pMnode, STopicObj *pTopic); +SMqTopicObj *mndAcquireTopic(SMnode *pMnode, char *topicName); +void mndReleaseTopic(SMnode *pMnode, SMqTopicObj *pTopic); + +SSdbRaw *mndTopicActionEncode(SMqTopicObj *pTopic); +SSdbRow *mndTopicActionDecode(SSdbRaw *pRaw); #ifdef __cplusplus } diff --git a/source/dnode/mnode/impl/src/mndConsumer.c b/source/dnode/mnode/impl/src/mndConsumer.c index 5391b0f73e..eeb011c084 100644 --- a/source/dnode/mnode/impl/src/mndConsumer.c +++ b/source/dnode/mnode/impl/src/mndConsumer.c @@ -24,16 +24,17 @@ #include "mndTrans.h" #include "mndUser.h" #include "mndVgroup.h" +#include "tcompare.h" #include "tname.h" #define MND_CONSUMER_VER_NUMBER 1 #define MND_CONSUMER_RESERVE_SIZE 64 -static SSdbRaw *mndConsumerActionEncode(SConsumerObj *pConsumer); +static SSdbRaw *mndConsumerActionEncode(SMqConsumerObj *pConsumer); static SSdbRow *mndConsumerActionDecode(SSdbRaw *pRaw); -static int32_t mndConsumerActionInsert(SSdb *pSdb, SConsumerObj *pConsumer); -static int32_t mndConsumerActionDelete(SSdb *pSdb, SConsumerObj *pConsumer); -static int32_t mndConsumerActionUpdate(SSdb *pSdb, SConsumerObj *pConsumer, SConsumerObj *pNewConsumer); +static int32_t mndConsumerActionInsert(SSdb *pSdb, SMqConsumerObj *pConsumer); +static int32_t mndConsumerActionDelete(SSdb *pSdb, SMqConsumerObj *pConsumer); +static int32_t mndConsumerActionUpdate(SSdb *pSdb, SMqConsumerObj *pConsumer, SMqConsumerObj *pNewConsumer); static int32_t mndProcessCreateConsumerMsg(SMnodeMsg *pMsg); static int32_t mndProcessDropConsumerMsg(SMnodeMsg *pMsg); static int32_t mndProcessDropConsumerInRsp(SMnodeMsg *pMsg); @@ -57,8 +58,8 @@ int32_t mndInitConsumer(SMnode *pMnode) { .deleteFp = (SdbDeleteFp)mndConsumerActionDelete}; mndSetMsgHandle(pMnode, TDMT_MND_SUBSCRIBE, mndProcessSubscribeReq); - mndSetMsgHandle(pMnode, TDMT_MND_SUBSCRIBE_RSP, mndProcessSubscribeRsp); - mndSetMsgHandle(pMnode, TDMT_VND_SUBSCRIBE, mndProcessSubscribeInternalReq); + /*mndSetMsgHandle(pMnode, TDMT_MND_SUBSCRIBE_RSP, mndProcessSubscribeRsp);*/ + /*mndSetMsgHandle(pMnode, TDMT_VND_SUBSCRIBE, mndProcessSubscribeInternalReq);*/ mndSetMsgHandle(pMnode, TDMT_VND_SUBSCRIBE_RSP, mndProcessSubscribeInternalRsp); return sdbSetTable(pMnode->pSdb, table); @@ -66,85 +67,41 @@ int32_t mndInitConsumer(SMnode *pMnode) { void mndCleanupConsumer(SMnode *pMnode) {} -static SSdbRaw *mndCGroupActionEncode(SCGroupObj *pCGroup) { - int32_t size = sizeof(SConsumerObj) + MND_CONSUMER_RESERVE_SIZE; +static void *mndBuildMqVGroupSetReq(SMnode *pMnode, char *topicName, int32_t vgId, int64_t consumerId, char *cgroup) { + return 0; +} + +static SSdbRaw *mndConsumerActionEncode(SMqConsumerObj *pConsumer) { + int32_t size = sizeof(SMqConsumerObj) + MND_CONSUMER_RESERVE_SIZE; SSdbRaw *pRaw = sdbAllocRaw(SDB_CONSUMER, MND_CONSUMER_VER_NUMBER, size); if (pRaw == NULL) return NULL; int32_t dataPos = 0; - SDB_SET_BINARY(pRaw, dataPos, pCGroup->name, TSDB_TABLE_FNAME_LEN); - SDB_SET_INT64(pRaw, dataPos, pCGroup->createTime); - SDB_SET_INT64(pRaw, dataPos, pCGroup->updateTime); - SDB_SET_INT64(pRaw, dataPos, pCGroup->uid); - /*SDB_SET_INT64(pRaw, dataPos, pConsumer->dbUid);*/ - SDB_SET_INT32(pRaw, dataPos, pCGroup->version); - - int32_t sz = listNEles(pCGroup->consumerIds); - SDB_SET_INT32(pRaw, dataPos, sz); - - SListIter iter; - tdListInitIter(pCGroup->consumerIds, &iter, TD_LIST_FORWARD); - SListNode *pn = NULL; - while ((pn = tdListNext(&iter)) != NULL) { - int64_t consumerId = *(int64_t *)pn->data; - SDB_SET_INT64(pRaw, dataPos, consumerId); + int32_t topicNum = taosArrayGetSize(pConsumer->topics); + SDB_SET_INT64(pRaw, dataPos, pConsumer->consumerId); + int32_t len = strlen(pConsumer->cgroup); + SDB_SET_INT32(pRaw, dataPos, len); + SDB_SET_BINARY(pRaw, dataPos, pConsumer->cgroup, len); + SDB_SET_INT32(pRaw, dataPos, topicNum); + for (int i = 0; i < topicNum; i++) { + int32_t len; + SMqConsumerTopic *pConsumerTopic = taosArrayGet(pConsumer->topics, i); + len = strlen(pConsumerTopic->name); + SDB_SET_INT32(pRaw, dataPos, len); + SDB_SET_BINARY(pRaw, dataPos, pConsumerTopic->name, len); + int vgSize; + if (pConsumerTopic->vgroups == NULL) { + vgSize = 0; + } else { + vgSize = listNEles(pConsumerTopic->vgroups); + } + SDB_SET_INT32(pRaw, dataPos, vgSize); + for (int j = 0; j < vgSize; j++) { + // SList* head; + /*SDB_SET_INT64(pRaw, dataPos, 0[> change to list item <]);*/ + } } - SDB_SET_RESERVE(pRaw, dataPos, MND_CONSUMER_RESERVE_SIZE); - SDB_SET_DATALEN(pRaw, dataPos); - return pRaw; -} - -static SSdbRow *mndCGroupActionDecode(SSdbRaw *pRaw) { - int8_t sver = 0; - if (sdbGetRawSoftVer(pRaw, &sver) != 0) return NULL; - - if (sver != MND_CONSUMER_VER_NUMBER) { - terrno = TSDB_CODE_SDB_INVALID_DATA_VER; - mError("failed to decode cgroup since %s", terrstr()); - return NULL; - } - - // TODO: maximum size is not known - int32_t size = sizeof(SCGroupObj) + 128 * sizeof(int64_t); - SSdbRow *pRow = sdbAllocRow(size); - SCGroupObj *pCGroup = sdbGetRowObj(pRow); - if (pCGroup == NULL) return NULL; - - int32_t dataPos = 0; - SDB_GET_BINARY(pRaw, pRow, dataPos, pCGroup->name, TSDB_TABLE_FNAME_LEN); - SDB_GET_INT64(pRaw, pRow, dataPos, &pCGroup->createTime); - SDB_GET_INT64(pRaw, pRow, dataPos, &pCGroup->updateTime); - SDB_GET_INT64(pRaw, pRow, dataPos, &pCGroup->uid); - /*SDB_GET_INT64(pRaw, pRow, dataPos, &pConsumer->dbUid);*/ - SDB_GET_INT32(pRaw, pRow, dataPos, &pCGroup->version); - - int32_t sz; - SDB_GET_INT32(pRaw, pRow, dataPos, &sz); - // TODO: free list when failing - tdListInit(pCGroup->consumerIds, sizeof(int64_t)); - for (int i = 0; i < sz; i++) { - int64_t consumerId; - SDB_GET_INT64(pRaw, pRow, dataPos, &consumerId); - tdListAppend(pCGroup->consumerIds, &consumerId); - } - - SDB_GET_RESERVE(pRaw, pRow, dataPos, MND_CONSUMER_RESERVE_SIZE); - return pRow; -} - -static SSdbRaw *mndConsumerActionEncode(SConsumerObj *pConsumer) { - int32_t size = sizeof(SConsumerObj) + MND_CONSUMER_RESERVE_SIZE; - SSdbRaw *pRaw = sdbAllocRaw(SDB_CONSUMER, MND_CONSUMER_VER_NUMBER, size); - if (pRaw == NULL) return NULL; - - int32_t dataPos = 0; - SDB_SET_INT64(pRaw, dataPos, pConsumer->uid); - SDB_SET_INT64(pRaw, dataPos, pConsumer->createTime); - SDB_SET_INT64(pRaw, dataPos, pConsumer->updateTime); - /*SDB_SET_INT64(pRaw, dataPos, pConsumer->dbUid);*/ - SDB_SET_INT32(pRaw, dataPos, pConsumer->version); - SDB_SET_RESERVE(pRaw, dataPos, MND_CONSUMER_RESERVE_SIZE); SDB_SET_DATALEN(pRaw, dataPos); @@ -161,56 +118,67 @@ static SSdbRow *mndConsumerActionDecode(SSdbRaw *pRaw) { return NULL; } - int32_t size = sizeof(SConsumerObj) + TSDB_MAX_COLUMNS * sizeof(SSchema); - SSdbRow *pRow = sdbAllocRow(size); - SConsumerObj *pConsumer = sdbGetRowObj(pRow); + int32_t size = sizeof(SMqConsumerObj); + SSdbRow *pRow = sdbAllocRow(size); + SMqConsumerObj *pConsumer = sdbGetRowObj(pRow); if (pConsumer == NULL) return NULL; int32_t dataPos = 0; - SDB_GET_INT64(pRaw, pRow, dataPos, &pConsumer->uid); - SDB_GET_INT64(pRaw, pRow, dataPos, &pConsumer->createTime); - SDB_GET_INT64(pRaw, pRow, dataPos, &pConsumer->updateTime); - /*SDB_GET_INT64(pRaw, pRow, dataPos, &pConsumer->dbUid);*/ - SDB_GET_INT32(pRaw, pRow, dataPos, &pConsumer->version); + SDB_GET_INT64(pRaw, pRow, dataPos, &pConsumer->consumerId); + int32_t len, topicNum; + SDB_GET_INT32(pRaw, pRow, dataPos, &len); + SDB_GET_BINARY(pRaw, pRow, dataPos, pConsumer->cgroup, len); + SDB_GET_INT32(pRaw, pRow, dataPos, &topicNum); + for (int i = 0; i < topicNum; i++) { + int32_t topicLen; + SMqConsumerTopic *pConsumerTopic = malloc(sizeof(SMqConsumerTopic)); + if (pConsumerTopic == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + // TODO + return NULL; + } + /*pConsumerTopic->vgroups = taosArrayInit(topicNum, sizeof(SMqConsumerTopic));*/ + SDB_GET_INT32(pRaw, pRow, dataPos, &topicLen); + SDB_GET_BINARY(pRaw, pRow, dataPos, pConsumerTopic->name, topicLen); + int32_t vgSize; + SDB_GET_INT32(pRaw, pRow, dataPos, &vgSize); + } SDB_GET_RESERVE(pRaw, pRow, dataPos, MND_CONSUMER_RESERVE_SIZE); return pRow; } -static int32_t mndConsumerActionInsert(SSdb *pSdb, SConsumerObj *pConsumer) { - mTrace("consumer:%ld, perform insert action", pConsumer->uid); +static int32_t mndConsumerActionInsert(SSdb *pSdb, SMqConsumerObj *pConsumer) { + mTrace("consumer:%ld, perform insert action", pConsumer->consumerId); return 0; } -static int32_t mndConsumerActionDelete(SSdb *pSdb, SConsumerObj *pConsumer) { - mTrace("consumer:%ld, perform delete action", pConsumer->uid); +static int32_t mndConsumerActionDelete(SSdb *pSdb, SMqConsumerObj *pConsumer) { + mTrace("consumer:%ld, perform delete action", pConsumer->consumerId); return 0; } -static int32_t mndConsumerActionUpdate(SSdb *pSdb, SConsumerObj *pOldConsumer, SConsumerObj *pNewConsumer) { - mTrace("consumer:%ld, perform update action", pOldConsumer->uid); - atomic_exchange_32(&pOldConsumer->updateTime, pNewConsumer->updateTime); - atomic_exchange_32(&pOldConsumer->version, pNewConsumer->version); - - taosWLockLatch(&pOldConsumer->lock); +static int32_t mndConsumerActionUpdate(SSdb *pSdb, SMqConsumerObj *pOldConsumer, SMqConsumerObj *pNewConsumer) { + mTrace("consumer:%ld, perform update action", pOldConsumer->consumerId); // TODO handle update + /*taosWLockLatch(&pOldConsumer->lock);*/ + /*taosWUnLockLatch(&pOldConsumer->lock);*/ - taosWUnLockLatch(&pOldConsumer->lock); return 0; } -SConsumerObj *mndAcquireConsumer(SMnode *pMnode, int32_t consumerId) { - SSdb *pSdb = pMnode->pSdb; - SConsumerObj *pConsumer = sdbAcquire(pSdb, SDB_CONSUMER, &consumerId); +SMqConsumerObj *mndAcquireConsumer(SMnode *pMnode, int32_t consumerId) { + SSdb *pSdb = pMnode->pSdb; + SMqConsumerObj *pConsumer = sdbAcquire(pSdb, SDB_CONSUMER, &consumerId); if (pConsumer == NULL) { /*terrno = TSDB_CODE_MND_CONSUMER_NOT_EXIST;*/ } return pConsumer; } -void mndReleaseConsumer(SMnode *pMnode, SConsumerObj *pConsumer) { +void mndReleaseConsumer(SMnode *pMnode, SMqConsumerObj *pConsumer) { SSdb *pSdb = pMnode->pSdb; sdbRelease(pSdb, pConsumer); } @@ -220,48 +188,185 @@ static int32_t mndProcessSubscribeReq(SMnodeMsg *pMsg) { char *msgStr = pMsg->rpcMsg.pCont; SCMSubscribeReq *pSubscribe; tDeserializeSCMSubscribeReq(msgStr, pSubscribe); - int topicNum = pSubscribe->topicNum; int64_t consumerId = pSubscribe->consumerId; char *consumerGroup = pSubscribe->consumerGroup; - // get consumer group and add client into it - SCGroupObj *pCGroup = sdbAcquire(pMnode->pSdb, SDB_CGROUP, consumerGroup); - if (pCGroup != NULL) { - // iterate the list until finding the consumer - // add consumer to cgroup list if not found - // put new record + + SArray *newSub = NULL; + int newTopicNum = pSubscribe->topicNum; + if (newTopicNum) { + newSub = taosArrayInit(newTopicNum, sizeof(SMqConsumerTopic)); + } + for (int i = 0; i < newTopicNum; i++) { + char *topic = pSubscribe->topicName[i]; + SMqConsumerTopic *pConsumerTopic = malloc(sizeof(SMqConsumerTopic)); + if (pConsumerTopic == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + // TODO: free + return -1; + } + pConsumerTopic->vgroups = tdListNew(sizeof(int64_t)); + taosArrayPush(newSub, pConsumerTopic); + free(pConsumerTopic); + } + taosArraySortString(newSub, taosArrayCompareString); + + SArray *oldSub = NULL; + int oldTopicNum = 0; + SMqConsumerObj *pConsumer = mndAcquireConsumer(pMnode, consumerId); + if (pConsumer == NULL) { + // create consumer + pConsumer = malloc(sizeof(SMqConsumerObj)); + if (pConsumer == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return -1; + } + strcpy(pConsumer->cgroup, pSubscribe->consumerGroup); + + } else { + oldSub = pConsumer->topics; + oldTopicNum = taosArrayGetSize(oldSub); + } + STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, &pMsg->rpcMsg); + if (pTrans == NULL) { + return -1; } - SConsumerObj *pConsumer = mndAcquireConsumer(pMnode, consumerId); - if (pConsumer != NULL) { - //reset topic list - } - - for (int i = 0; i < topicNum; i++) { - char *topicName = pSubscribe->topicName[i]; - STopicObj *pTopic = mndAcquireTopic(pMnode, topicName); - //get - // consumer id - SList *list = pTopic->consumerIds; - // add the consumer if not in the list - // - SList* topicList = pConsumer->topics; - //add to topic + int i = 0, j = 0; + while (i < newTopicNum || j < oldTopicNum) { + SMqConsumerTopic *pOldTopic = NULL; + SMqConsumerTopic *pNewTopic = NULL; + if (i >= newTopicNum) { + // encode unset topic msg to all vnodes related to that topic + pOldTopic = taosArrayGet(oldSub, j); + j++; + } else if (j >= oldTopicNum) { + pNewTopic = taosArrayGet(newSub, i); + } else { + pNewTopic = taosArrayGet(newSub, i); + pOldTopic = taosArrayGet(oldSub, j); + + char *newName = pNewTopic->name; + char *oldName = pOldTopic->name; + int comp = compareLenPrefixedStr(newName, oldName); + if (comp == 0) { + // do nothing + pOldTopic = pNewTopic = NULL; + i++; + j++; + continue; + } else if (comp < 0) { + pOldTopic = NULL; + i++; + } else { + pNewTopic = NULL; + j++; + } + } + + if (pOldTopic != NULL) { + ASSERT(pNewTopic == NULL); + char *oldTopicName = pOldTopic->name; + SList *vgroups = pOldTopic->vgroups; + SListIter iter; + tdListInitIter(vgroups, &iter, TD_LIST_FORWARD); + SListNode *pn; + + SMqTopicObj *pTopic = mndAcquireTopic(pMnode, oldTopicName); + ASSERT(pTopic != NULL); + SMqCGroup *pGroup = taosHashGet(pTopic->cgroups, pSubscribe->consumerGroup, strlen(pSubscribe->consumerGroup)); + while ((pn = tdListNext(&iter)) != NULL) { + int32_t vgId = *(int64_t *)pn->data; + SVgObj *pVgObj = mndAcquireVgroup(pMnode, vgId); + // TODO release + if (pVgObj == NULL) { + // TODO handle error + continue; + } + // acquire and get epset + void *pMqVgSetReq = + mndBuildMqVGroupSetReq(pMnode, oldTopicName, vgId, pSubscribe->consumerId, pSubscribe->consumerGroup); + // TODO:serialize + if (pMsg == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return -1; + } + STransAction action = {0}; + action.epSet = mndGetVgroupEpset(pMnode, pVgObj); + action.pCont = pMqVgSetReq; + action.contLen = 0; // TODO + action.msgType = TDMT_VND_MQ_SET_CONN; + if (mndTransAppendRedoAction(pTrans, &action) != 0) { + free(pMqVgSetReq); + mndTransDrop(pTrans); + // TODO free + return -1; + } + } + taosHashRemove(pTopic->cgroups, pSubscribe->consumerGroup, strlen(pSubscribe->consumerGroup)); + + } else if (pNewTopic != NULL) { + ASSERT(pOldTopic == NULL); + + char *newTopicName = pNewTopic->name; + SMqTopicObj *pTopic = mndAcquireTopic(pMnode, newTopicName); + ASSERT(pTopic != NULL); + + SMqCGroup *pGroup = taosHashGet(pTopic->cgroups, pSubscribe->consumerGroup, strlen(pSubscribe->consumerGroup)); + if (pGroup == NULL) { + // add new group + pGroup = malloc(sizeof(SMqCGroup)); + if (pGroup == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return -1; + } + pGroup->consumerIds = tdListNew(sizeof(int64_t)); + if (pGroup->consumerIds == NULL) { + free(pGroup); + terrno = TSDB_CODE_OUT_OF_MEMORY; + return -1; + } + pGroup->status = 0; + // add into cgroups + taosHashPut(pTopic->cgroups, pSubscribe->consumerGroup, strlen(pSubscribe->consumerGroup), pGroup, + sizeof(SMqCGroup)); + } + + // put the consumer into list + // rebalance will be triggered by timer + tdListAppend(pGroup->consumerIds, &pSubscribe->consumerId); + + SSdbRaw *pTopicRaw = mndTopicActionEncode(pTopic); + sdbSetRawStatus(pTopicRaw, SDB_STATUS_READY); + // TODO: error handling + mndTransAppendRedolog(pTrans, pTopicRaw); + } else { + ASSERT(0); + } + } + // destroy old sub + taosArrayDestroy(oldSub); + // put new sub into consumerobj + pConsumer->topics = newSub; + + // persist consumerObj + SSdbRaw *pConsumerRaw = mndConsumerActionEncode(pConsumer); + sdbSetRawStatus(pConsumerRaw, SDB_STATUS_READY); + // TODO: error handling + mndTransAppendRedolog(pTrans, pConsumerRaw); + + if (mndTransPrepare(pMnode, pTrans) != 0) { + mError("trans:%d, failed to prepare since %s", pTrans->id, terrstr()); + mndTransDrop(pTrans); + return -1; } + // TODO: free memory + mndTransDrop(pTrans); return 0; } -static int32_t mndProcessSubscribeRsp(SMnodeMsg *pMsg) { return 0; } - -static int32_t mndProcessSubscribeInternalReq(SMnodeMsg *pMsg) { return 0; } - static int32_t mndProcessSubscribeInternalRsp(SMnodeMsg *pMsg) { return 0; } -static int32_t mndProcessDropConsumerInRsp(SMnodeMsg *pMsg) { - mndTransProcessRsp(pMsg); - return 0; -} - static int32_t mndProcessConsumerMetaMsg(SMnodeMsg *pMsg) { SMnode *pMnode = pMsg->pMnode; STableInfoMsg *pInfo = pMsg->rpcMsg.pCont; @@ -339,7 +444,7 @@ static int32_t mndGetNumOfConsumers(SMnode *pMnode, char *dbName, int32_t *pNumO int32_t numOfConsumers = 0; void *pIter = NULL; while (1) { - SConsumerObj *pConsumer = NULL; + SMqConsumerObj *pConsumer = NULL; pIter = sdbFetch(pSdb, SDB_CONSUMER, pIter, (void **)&pConsumer); if (pIter == NULL) break; @@ -402,49 +507,6 @@ static int32_t mndGetConsumerMeta(SMnodeMsg *pMsg, SShowObj *pShow, STableMetaMs return 0; } -static int32_t mndRetrieveCGroup(SMnodeMsg *pMsg, SShowObj *pShow, char *data, int32_t rows) { - SMnode *pMnode = pMsg->pMnode; - SSdb *pSdb = pMnode->pSdb; - int32_t numOfRows = 0; - SCGroupObj *pCGroup = NULL; - int32_t cols = 0; - char *pWrite; - char prefix[64] = {0}; - - tstrncpy(prefix, pShow->db, 64); - strcat(prefix, TS_PATH_DELIMITER); - int32_t prefixLen = (int32_t)strlen(prefix); - - while (numOfRows < rows) { - pShow->pIter = sdbFetch(pSdb, SDB_CONSUMER, pShow->pIter, (void **)&pCGroup); - if (pShow->pIter == NULL) break; - - if (strncmp(pCGroup->name, prefix, prefixLen) != 0) { - sdbRelease(pSdb, pCGroup); - continue; - } - - cols = 0; - - char consumerName[TSDB_TABLE_NAME_LEN] = {0}; - tstrncpy(consumerName, pCGroup->name + prefixLen, TSDB_TABLE_NAME_LEN); - pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; - STR_TO_VARSTR(pWrite, consumerName); - cols++; - - pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; - *(int64_t *)pWrite = pCGroup->createTime; - cols++; - - numOfRows++; - sdbRelease(pSdb, pCGroup); - } - - pShow->numOfReads += numOfRows; - mndVacuumResult(data, pShow->numOfColumns, numOfRows, rows, pShow); - return numOfRows; -} - static void mndCancelGetNextConsumer(SMnode *pMnode, void *pIter) { SSdb *pSdb = pMnode->pSdb; sdbCancelFetch(pSdb, pIter); diff --git a/source/dnode/mnode/impl/src/mndTopic.c b/source/dnode/mnode/impl/src/mndTopic.c index 24e32e07b4..4f6567a50a 100644 --- a/source/dnode/mnode/impl/src/mndTopic.c +++ b/source/dnode/mnode/impl/src/mndTopic.c @@ -14,6 +14,7 @@ */ #define _DEFAULT_SOURCE +#include "mndTopic.h" #include "mndDb.h" #include "mndDnode.h" #include "mndMnode.h" @@ -27,18 +28,16 @@ #define MND_TOPIC_VER_NUMBER 1 #define MND_TOPIC_RESERVE_SIZE 64 -static SSdbRaw *mndTopicActionEncode(STopicObj *pTopic); -static SSdbRow *mndTopicActionDecode(SSdbRaw *pRaw); -static int32_t mndTopicActionInsert(SSdb *pSdb, STopicObj *pTopic); -static int32_t mndTopicActionDelete(SSdb *pSdb, STopicObj *pTopic); -static int32_t mndTopicActionUpdate(SSdb *pSdb, STopicObj *pTopic, STopicObj *pNewTopic); -static int32_t mndProcessCreateTopicMsg(SMnodeMsg *pMsg); -static int32_t mndProcessDropTopicMsg(SMnodeMsg *pMsg); -static int32_t mndProcessDropTopicInRsp(SMnodeMsg *pMsg); -static int32_t mndProcessTopicMetaMsg(SMnodeMsg *pMsg); -static int32_t mndGetTopicMeta(SMnodeMsg *pMsg, SShowObj *pShow, STableMetaMsg *pMeta); -static int32_t mndRetrieveTopic(SMnodeMsg *pMsg, SShowObj *pShow, char *data, int32_t rows); -static void mndCancelGetNextTopic(SMnode *pMnode, void *pIter); +static int32_t mndTopicActionInsert(SSdb *pSdb, SMqTopicObj *pTopic); +static int32_t mndTopicActionDelete(SSdb *pSdb, SMqTopicObj *pTopic); +static int32_t mndTopicActionUpdate(SSdb *pSdb, SMqTopicObj *pTopic, SMqTopicObj *pNewTopic); +static int32_t mndProcessCreateTopicMsg(SMnodeMsg *pMsg); +static int32_t mndProcessDropTopicMsg(SMnodeMsg *pMsg); +static int32_t mndProcessDropTopicInRsp(SMnodeMsg *pMsg); +static int32_t mndProcessTopicMetaMsg(SMnodeMsg *pMsg); +static int32_t mndGetTopicMeta(SMnodeMsg *pMsg, SShowObj *pShow, STableMetaMsg *pMeta); +static int32_t mndRetrieveTopic(SMnodeMsg *pMsg, SShowObj *pShow, char *data, int32_t rows); +static void mndCancelGetNextTopic(SMnode *pMnode, void *pIter); int32_t mndInitTopic(SMnode *pMnode) { SSdbTable table = {.sdbType = SDB_TOPIC, @@ -58,8 +57,9 @@ int32_t mndInitTopic(SMnode *pMnode) { void mndCleanupTopic(SMnode *pMnode) {} -static SSdbRaw *mndTopicActionEncode(STopicObj *pTopic) { - int32_t size = sizeof(STopicObj) + MND_TOPIC_RESERVE_SIZE; +SSdbRaw *mndTopicActionEncode(SMqTopicObj *pTopic) { + int32_t len; + int32_t size = sizeof(SMqTopicObj) + MND_TOPIC_RESERVE_SIZE; SSdbRaw *pRaw = sdbAllocRaw(SDB_TOPIC, MND_TOPIC_VER_NUMBER, size); if (pRaw == NULL) return NULL; @@ -71,10 +71,15 @@ static SSdbRaw *mndTopicActionEncode(STopicObj *pTopic) { SDB_SET_INT64(pRaw, dataPos, pTopic->uid); SDB_SET_INT64(pRaw, dataPos, pTopic->dbUid); SDB_SET_INT32(pRaw, dataPos, pTopic->version); - SDB_SET_INT32(pRaw, dataPos, pTopic->execLen); - SDB_SET_BINARY(pRaw, dataPos, pTopic->executor, pTopic->execLen); SDB_SET_INT32(pRaw, dataPos, pTopic->sqlLen); SDB_SET_BINARY(pRaw, dataPos, pTopic->sql, pTopic->sqlLen); + SDB_SET_INT32(pRaw, dataPos, pTopic->sqlLen); + len = strlen(pTopic->logicalPlan); + SDB_SET_INT32(pRaw, dataPos, len); + SDB_SET_BINARY(pRaw, dataPos, pTopic->logicalPlan, len); + len = strlen(pTopic->physicalPlan); + SDB_SET_INT32(pRaw, dataPos, len); + SDB_SET_BINARY(pRaw, dataPos, pTopic->physicalPlan, len); SDB_SET_RESERVE(pRaw, dataPos, MND_TOPIC_RESERVE_SIZE); SDB_SET_DATALEN(pRaw, dataPos); @@ -82,7 +87,7 @@ static SSdbRaw *mndTopicActionEncode(STopicObj *pTopic) { return pRaw; } -static SSdbRow *mndTopicActionDecode(SSdbRaw *pRaw) { +SSdbRow *mndTopicActionDecode(SSdbRaw *pRaw) { int8_t sver = 0; if (sdbGetRawSoftVer(pRaw, &sver) != 0) return NULL; @@ -92,11 +97,12 @@ static SSdbRow *mndTopicActionDecode(SSdbRaw *pRaw) { return NULL; } - int32_t size = sizeof(STopicObj) + TSDB_MAX_COLUMNS * sizeof(SSchema); - SSdbRow *pRow = sdbAllocRow(size); - STopicObj *pTopic = sdbGetRowObj(pRow); + int32_t size = sizeof(SMqTopicObj) + TSDB_MAX_COLUMNS * sizeof(SSchema); + SSdbRow *pRow = sdbAllocRow(size); + SMqTopicObj *pTopic = sdbGetRowObj(pRow); if (pTopic == NULL) return NULL; + int32_t len; int32_t dataPos = 0; SDB_GET_BINARY(pRaw, pRow, dataPos, pTopic->name, TSDB_TABLE_FNAME_LEN); SDB_GET_BINARY(pRaw, pRow, dataPos, pTopic->db, TSDB_DB_FNAME_LEN); @@ -105,49 +111,51 @@ static SSdbRow *mndTopicActionDecode(SSdbRaw *pRaw) { SDB_GET_INT64(pRaw, pRow, dataPos, &pTopic->uid); SDB_GET_INT64(pRaw, pRow, dataPos, &pTopic->dbUid); SDB_GET_INT32(pRaw, pRow, dataPos, &pTopic->version); - SDB_GET_INT32(pRaw, pRow, dataPos, &pTopic->execLen); - SDB_GET_BINARY(pRaw, pRow, dataPos, pTopic->executor, pTopic->execLen); SDB_GET_INT32(pRaw, pRow, dataPos, &pTopic->sqlLen); SDB_GET_BINARY(pRaw, pRow, dataPos, pTopic->sql, pTopic->sqlLen); + SDB_GET_INT32(pRaw, pRow, dataPos, &len); + SDB_GET_BINARY(pRaw, pRow, dataPos, pTopic->logicalPlan, len); + SDB_GET_INT32(pRaw, pRow, dataPos, &len); + SDB_GET_BINARY(pRaw, pRow, dataPos, pTopic->physicalPlan, len); SDB_GET_RESERVE(pRaw, pRow, dataPos, MND_TOPIC_RESERVE_SIZE); return pRow; } -static int32_t mndTopicActionInsert(SSdb *pSdb, STopicObj *pTopic) { +static int32_t mndTopicActionInsert(SSdb *pSdb, SMqTopicObj *pTopic) { mTrace("topic:%s, perform insert action", pTopic->name); return 0; } -static int32_t mndTopicActionDelete(SSdb *pSdb, STopicObj *pTopic) { +static int32_t mndTopicActionDelete(SSdb *pSdb, SMqTopicObj *pTopic) { mTrace("topic:%s, perform delete action", pTopic->name); return 0; } -static int32_t mndTopicActionUpdate(SSdb *pSdb, STopicObj *pOldTopic, STopicObj *pNewTopic) { +static int32_t mndTopicActionUpdate(SSdb *pSdb, SMqTopicObj *pOldTopic, SMqTopicObj *pNewTopic) { mTrace("topic:%s, perform update action", pOldTopic->name); atomic_exchange_32(&pOldTopic->updateTime, pNewTopic->updateTime); atomic_exchange_32(&pOldTopic->version, pNewTopic->version); taosWLockLatch(&pOldTopic->lock); - - //TODO handle update + + // TODO handle update taosWUnLockLatch(&pOldTopic->lock); return 0; } -STopicObj *mndAcquireTopic(SMnode *pMnode, char *topicName) { - SSdb *pSdb = pMnode->pSdb; - STopicObj *pTopic = sdbAcquire(pSdb, SDB_TOPIC, topicName); +SMqTopicObj *mndAcquireTopic(SMnode *pMnode, char *topicName) { + SSdb *pSdb = pMnode->pSdb; + SMqTopicObj *pTopic = sdbAcquire(pSdb, SDB_TOPIC, topicName); if (pTopic == NULL) { terrno = TSDB_CODE_MND_TOPIC_NOT_EXIST; } return pTopic; } -void mndReleaseTopic(SMnode *pMnode, STopicObj *pTopic) { +void mndReleaseTopic(SMnode *pMnode, SMqTopicObj *pTopic) { SSdb *pSdb = pMnode->pSdb; sdbRelease(pSdb, pTopic); } @@ -162,7 +170,7 @@ static SDbObj *mndAcquireDbByTopic(SMnode *pMnode, char *topicName) { return mndAcquireDb(pMnode, db); } -static SDDropTopicMsg *mndBuildDropTopicMsg(SMnode *pMnode, SVgObj *pVgroup, STopicObj *pTopic) { +static SDDropTopicMsg *mndBuildDropTopicMsg(SMnode *pMnode, SVgObj *pVgroup, SMqTopicObj *pTopic) { int32_t contLen = sizeof(SDDropTopicMsg); SDDropTopicMsg *pDrop = calloc(1, contLen); @@ -185,7 +193,7 @@ static int32_t mndCheckCreateTopicMsg(SCMCreateTopicReq *pCreate) { } static int32_t mndCreateTopic(SMnode *pMnode, SMnodeMsg *pMsg, SCMCreateTopicReq *pCreate, SDbObj *pDb) { - STopicObj topicObj = {0}; + SMqTopicObj topicObj = {0}; tstrncpy(topicObj.name, pCreate->name, TSDB_TABLE_FNAME_LEN); tstrncpy(topicObj.db, pDb->name, TSDB_DB_FNAME_LEN); topicObj.createTime = taosGetTimestampMs(); @@ -197,13 +205,14 @@ static int32_t mndCreateTopic(SMnode *pMnode, SMnodeMsg *pMsg, SCMCreateTopicReq SSdbRaw *pTopicRaw = mndTopicActionEncode(&topicObj); if (pTopicRaw == NULL) return -1; if (sdbSetRawStatus(pTopicRaw, SDB_STATUS_READY) != 0) return -1; + // TODO: replace with trans to support recovery return sdbWrite(pMnode->pSdb, pTopicRaw); } static int32_t mndProcessCreateTopicMsg(SMnodeMsg *pMsg) { - SMnode *pMnode = pMsg->pMnode; - char *msgStr = pMsg->rpcMsg.pCont; - SCMCreateTopicReq* pCreate; + SMnode *pMnode = pMsg->pMnode; + char *msgStr = pMsg->rpcMsg.pCont; + SCMCreateTopicReq *pCreate; tDeserializeSCMCreateTopicReq(msgStr, pCreate); mDebug("topic:%s, start to create", pCreate->name); @@ -213,7 +222,7 @@ static int32_t mndProcessCreateTopicMsg(SMnodeMsg *pMsg) { return -1; } - STopicObj *pTopic = mndAcquireTopic(pMnode, pCreate->name); + SMqTopicObj *pTopic = mndAcquireTopic(pMnode, pCreate->name); if (pTopic != NULL) { sdbRelease(pMnode->pSdb, pTopic); if (pCreate->igExists) { @@ -245,9 +254,7 @@ static int32_t mndProcessCreateTopicMsg(SMnodeMsg *pMsg) { return TSDB_CODE_MND_ACTION_IN_PROGRESS; } -static int32_t mndDropTopic(SMnode *pMnode, SMnodeMsg *pMsg, STopicObj *pTopic) { - return 0; -} +static int32_t mndDropTopic(SMnode *pMnode, SMnodeMsg *pMsg, SMqTopicObj *pTopic) { return 0; } static int32_t mndProcessDropTopicMsg(SMnodeMsg *pMsg) { SMnode *pMnode = pMsg->pMnode; @@ -255,7 +262,7 @@ static int32_t mndProcessDropTopicMsg(SMnodeMsg *pMsg) { mDebug("topic:%s, start to drop", pDrop->name); - STopicObj *pTopic = mndAcquireTopic(pMnode, pDrop->name); + SMqTopicObj *pTopic = mndAcquireTopic(pMnode, pDrop->name); if (pTopic == NULL) { if (pDrop->igNotExists) { mDebug("topic:%s, not exist, ignore not exist is set", pDrop->name); @@ -361,7 +368,7 @@ static int32_t mndGetNumOfTopics(SMnode *pMnode, char *dbName, int32_t *pNumOfTo int32_t numOfTopics = 0; void *pIter = NULL; while (1) { - STopicObj *pTopic = NULL; + SMqTopicObj *pTopic = NULL; pIter = sdbFetch(pSdb, SDB_TOPIC, pIter, (void **)&pTopic); if (pIter == NULL) break; @@ -440,13 +447,13 @@ static void mndExtractTableName(char *tableId, char *name) { } static int32_t mndRetrieveTopic(SMnodeMsg *pMsg, SShowObj *pShow, char *data, int32_t rows) { - SMnode *pMnode = pMsg->pMnode; - SSdb *pSdb = pMnode->pSdb; - int32_t numOfRows = 0; - STopicObj *pTopic = NULL; - int32_t cols = 0; - char *pWrite; - char prefix[64] = {0}; + SMnode *pMnode = pMsg->pMnode; + SSdb *pSdb = pMnode->pSdb; + int32_t numOfRows = 0; + SMqTopicObj *pTopic = NULL; + int32_t cols = 0; + char *pWrite; + char prefix[64] = {0}; tstrncpy(prefix, pShow->db, 64); strcat(prefix, TS_PATH_DELIMITER); diff --git a/source/libs/wal/src/walMeta.c b/source/libs/wal/src/walMeta.c index 0f155f9553..6164ec2baf 100644 --- a/source/libs/wal/src/walMeta.c +++ b/source/libs/wal/src/walMeta.c @@ -261,15 +261,18 @@ int walLoadMeta(SWal* pWal) { memset(buf, 0, size + 5); int tfd = tfOpenRead(fnameStr); if (tfRead(tfd, buf, size) != size) { + tfClose(tfd); free(buf); return -1; } // load into fileInfoSet int code = walMetaDeserialize(pWal, buf); if (code != 0) { + tfClose(tfd); free(buf); return -1; } + tfClose(tfd); free(buf); return 0; } diff --git a/source/libs/wal/src/walRead.c b/source/libs/wal/src/walRead.c index c80fb4eed8..67c8694fd6 100644 --- a/source/libs/wal/src/walRead.c +++ b/source/libs/wal/src/walRead.c @@ -15,6 +15,7 @@ #include "tfile.h" #include "walInt.h" +#include "taoserror.h" SWalReadHandle *walOpenReadHandle(SWal *pWal) { SWalReadHandle *pRead = malloc(sizeof(SWalReadHandle)); @@ -30,6 +31,7 @@ SWalReadHandle *walOpenReadHandle(SWal *pWal) { pRead->status = 0; pRead->pHead = malloc(sizeof(SWalHead)); if (pRead->pHead == NULL) { + terrno = TSDB_CODE_WAL_OUT_OF_MEMORY; free(pRead); return NULL; } @@ -55,16 +57,19 @@ static int32_t walReadSeekFilePos(SWalReadHandle *pRead, int64_t fileFirstVer, i int64_t offset = (ver - fileFirstVer) * sizeof(SWalIdxEntry); code = tfLseek(idxTfd, offset, SEEK_SET); if (code < 0) { + terrno = TAOS_SYSTEM_ERROR(errno); return -1; } SWalIdxEntry entry; if (tfRead(idxTfd, &entry, sizeof(SWalIdxEntry)) != sizeof(SWalIdxEntry)) { + terrno = TSDB_CODE_WAL_FILE_CORRUPTED; return -1; } // TODO:deserialize ASSERT(entry.ver == ver); code = tfLseek(logTfd, entry.offset, SEEK_SET); if (code < 0) { + terrno = TAOS_SYSTEM_ERROR(errno); return -1; } return code; @@ -79,6 +84,7 @@ static int32_t walReadChangeFile(SWalReadHandle *pRead, int64_t fileFirstVer) { walBuildLogName(pRead->pWal, fileFirstVer, fnameStr); int64_t logTfd = tfOpenRead(fnameStr); if (logTfd < 0) { + terrno = TAOS_SYSTEM_ERROR(errno); return -1; } @@ -100,6 +106,7 @@ static int32_t walReadSeekVer(SWalReadHandle *pRead, int64_t ver) { return 0; } if (ver > pWal->vers.lastVer || ver < pWal->vers.firstVer) { + terrno = TSDB_CODE_WAL_INVALID_VER; return -1; } if (ver < pWal->vers.snapshotVer) { @@ -113,7 +120,6 @@ static int32_t walReadSeekVer(SWalReadHandle *pRead, int64_t ver) { if (pRead->curFileFirstVer != pRet->firstVer) { code = walReadChangeFile(pRead, pRet->firstVer); if (code < 0) { - // TODO: set error flag return -1; } } @@ -132,7 +138,7 @@ int32_t walReadWithHandle(SWalReadHandle *pRead, int64_t ver) { // TODO: check wal life if (pRead->curVersion != ver) { code = walReadSeekVer(pRead, ver); - if (code != 0) { + if (code < 0) { return -1; } } @@ -145,11 +151,13 @@ int32_t walReadWithHandle(SWalReadHandle *pRead, int64_t ver) { } code = walValidHeadCksum(pRead->pHead); if (code != 0) { + terrno = TSDB_CODE_WAL_FILE_CORRUPTED; return -1; } if (pRead->capacity < pRead->pHead->head.len) { void *ptr = realloc(pRead->pHead, sizeof(SWalHead) + pRead->pHead->head.len); if (ptr == NULL) { + terrno = TSDB_CODE_WAL_OUT_OF_MEMORY; return -1; } pRead->pHead = ptr; @@ -163,6 +171,7 @@ int32_t walReadWithHandle(SWalReadHandle *pRead, int64_t ver) { code = walValidBodyCksum(pRead->pHead); if (code != 0) { + terrno = TSDB_CODE_WAL_FILE_CORRUPTED; return -1; } pRead->curVersion++; diff --git a/source/libs/wal/src/walSeek.c b/source/libs/wal/src/walSeek.c index 1f9b7b6e58..1d9f7bdf4d 100644 --- a/source/libs/wal/src/walSeek.c +++ b/source/libs/wal/src/walSeek.c @@ -30,17 +30,20 @@ static int walSeekFilePos(SWal* pWal, int64_t ver) { int64_t idxOff = walGetVerIdxOffset(pWal, ver); code = tfLseek(idxTfd, idxOff, SEEK_SET); if (code != 0) { + terrno = TAOS_SYSTEM_ERROR(errno); return -1; } SWalIdxEntry entry; // TODO:deserialize code = tfRead(idxTfd, &entry, sizeof(SWalIdxEntry)); if (code != 0) { + terrno = TAOS_SYSTEM_ERROR(errno); return -1; } ASSERT(entry.ver == ver); code = tfLseek(logTfd, entry.offset, SEEK_CUR); if (code < 0) { + terrno = TAOS_SYSTEM_ERROR(errno); return -1; } return code; @@ -56,11 +59,13 @@ int walChangeFileToLast(SWal* pWal) { walBuildIdxName(pWal, fileFirstVer, fnameStr); idxTfd = tfOpenReadWrite(fnameStr); if (idxTfd < 0) { + terrno = TAOS_SYSTEM_ERROR(errno); return -1; } walBuildLogName(pWal, fileFirstVer, fnameStr); logTfd = tfOpenReadWrite(fnameStr); if (logTfd < 0) { + terrno = TAOS_SYSTEM_ERROR(errno); return -1; } // switch file @@ -76,11 +81,12 @@ int walChangeFile(SWal* pWal, int64_t ver) { code = tfClose(pWal->writeLogTfd); if (code != 0) { // TODO + terrno = TAOS_SYSTEM_ERROR(errno); return -1; } code = tfClose(pWal->writeIdxTfd); if (code != 0) { - // TODO + terrno = TAOS_SYSTEM_ERROR(errno); return -1; } SWalFileInfo tmpInfo; @@ -113,6 +119,7 @@ int walSeekVer(SWal* pWal, int64_t ver) { return 0; } if (ver > pWal->vers.lastVer || ver < pWal->vers.firstVer) { + terrno = TSDB_CODE_WAL_INVALID_VER; return -1; } if (ver < pWal->vers.snapshotVer) { diff --git a/source/libs/wal/src/walWrite.c b/source/libs/wal/src/walWrite.c index c8ffd9d07d..5e26dd8278 100644 --- a/source/libs/wal/src/walWrite.c +++ b/source/libs/wal/src/walWrite.c @@ -25,6 +25,7 @@ int32_t walCommit(SWal *pWal, int64_t ver) { ASSERT(pWal->vers.commitVer >= pWal->vers.snapshotVer); ASSERT(pWal->vers.commitVer <= pWal->vers.lastVer); if (ver < pWal->vers.commitVer || ver > pWal->vers.lastVer) { + terrno = TSDB_CODE_WAL_INVALID_VER; return -1; } pWal->vers.commitVer = ver; @@ -38,6 +39,7 @@ int32_t walRollback(SWal *pWal, int64_t ver) { return 0; } if (ver > pWal->vers.lastVer || ver < pWal->vers.commitVer) { + terrno = TSDB_CODE_WAL_INVALID_VER; return -1; } pthread_mutex_lock(&pWal->mutex);