diff --git a/include/common/taosmsg.h b/include/common/taosmsg.h index 11a38a3742..a788df2e32 100644 --- a/include/common/taosmsg.h +++ b/include/common/taosmsg.h @@ -50,9 +50,8 @@ TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_MQ_CONSUME, "mq-consume" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_MQ_QUERY, "mq-query" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_MQ_CONNECT, "mq-connect" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_MQ_DISCONNECT, "mq-disconnect" ) -TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_MQ_SET, "mq-set" ) +TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_MQ_SET_CUR, "mq-set-cur" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_RSP_READY, "rsp-ready" ) - // message from client to mnode TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CONNECT, "connect" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CREATE_ACCT, "create-acct" ) @@ -225,6 +224,7 @@ typedef struct SBuildUseDBInput { int32_t vgVersion; } SBuildUseDBInput; + #pragma pack(push, 1) // null-terminated string instead of char array to avoid too many memory consumption in case of more than 1M tableMeta @@ -343,7 +343,7 @@ typedef struct { typedef struct { char tableFname[TSDB_TABLE_FNAME_LEN]; char db[TSDB_FULL_DB_NAME_LEN]; - int16_t type; /* operation type */ + int16_t type; /* operation type */ int16_t numOfCols; /* number of schema */ int32_t tagValLen; SSchema schema[]; @@ -545,8 +545,8 @@ typedef struct { int32_t sqlstrLen; // sql query string int32_t prevResultLen; // previous result length int32_t numOfOperator; - int32_t tableScanOperator; // table scan operator. -1 means no scan operator - int32_t udfNum; // number of udf function + int32_t tableScanOperator;// table scan operator. -1 means no scan operator + int32_t udfNum; // number of udf function int32_t udfContentOffset; int32_t udfContentLen; SColumnInfo tableCols[]; @@ -1005,27 +1005,35 @@ typedef struct { // mq related typedef struct { + } SMqConnectReq; typedef struct { + } SMqConnectRsp; typedef struct { + } SMqDisconnectReq; typedef struct { + } SMqDisconnectRsp; typedef struct { + } SMqAckReq; typedef struct { + } SMqAckRsp; typedef struct { + } SMqResetReq; typedef struct { + } SMqResetRsp; // mq related end diff --git a/include/dnode/vnode/tq/tq.h b/include/dnode/vnode/tq/tq.h index 7993a8f1ab..60a8c252c0 100644 --- a/include/dnode/vnode/tq/tq.h +++ b/include/dnode/vnode/tq/tq.h @@ -16,86 +16,76 @@ #ifndef _TD_TQ_H_ #define _TD_TQ_H_ +#include "common.h" #include "mallocator.h" #include "os.h" +#include "taoserror.h" +#include "taosmsg.h" +#include "tlist.h" #include "tutil.h" #ifdef __cplusplus extern "C" { #endif -typedef struct TmqMsgHead { +typedef struct STqMsgHead { int32_t protoVer; int32_t msgType; int64_t cgId; int64_t clientId; -} TmqMsgHead; +} STqMsgHead; -typedef struct TmqOneAck { +typedef struct STqOneAck { int64_t topicId; int64_t consumeOffset; -} TmqOneAck; +} STqOneAck; -typedef struct TmqAcks { +typedef struct STqAcks { int32_t ackNum; // should be sorted - TmqOneAck acks[]; -} TmqAcks; + STqOneAck acks[]; +} STqAcks; -// TODO: put msgs into common -typedef struct TmqConnectReq { - TmqMsgHead head; - TmqAcks acks; -} TmqConnectReq; - -typedef struct TmqConnectRsp { - TmqMsgHead head; - int8_t status; -} TmqConnectRsp; - -typedef struct TmqDisconnectReq { - TmqMsgHead head; -} TmqDiscconectReq; - -typedef struct TmqDisconnectRsp { - TmqMsgHead head; - int8_t status; -} TmqDisconnectRsp; +typedef struct STqSetCurReq { + STqMsgHead head; + int64_t topicId; + int64_t offset; +} STqSetCurReq; typedef struct STqConsumeReq { - TmqMsgHead head; - TmqAcks acks; + STqMsgHead head; + STqAcks acks; } STqConsumeReq; -typedef struct TmqMsgContent { +typedef struct STqMsgContent { int64_t topicId; int64_t msgLen; char msg[]; -} TmqMsgContent; +} STqMsgContent; typedef struct STqConsumeRsp { - TmqMsgHead head; + STqMsgHead head; int64_t bodySize; - TmqMsgContent msgs[]; + STqMsgContent msgs[]; } STqConsumeRsp; -typedef struct TmqSubscribeReq { - TmqMsgHead head; +typedef struct STqSubscribeReq { + STqMsgHead head; int32_t topicNum; int64_t topic[]; -} TmqSubscribeReq; +} STqSubscribeReq; -typedef struct tmqSubscribeRsp { - TmqMsgHead head; +typedef struct STqSubscribeRsp { + STqMsgHead head; int64_t vgId; char ep[TSDB_EP_LEN]; // TSDB_EP_LEN -} TmqSubscribeRsp; +} STqSubscribeRsp; -typedef struct TmqHeartbeatReq { -} TmqHeartbeatReq; +typedef struct STqHeartbeatReq { +} STqHeartbeatReq; -typedef struct TmqHeartbeatRsp { -} TmqHeartbeatRsp; +typedef struct STqHeartbeatRsp { +} STqHeartbeatRsp; typedef struct STqTopicVhandle { int64_t topicId; @@ -108,48 +98,54 @@ typedef struct STqTopicVhandle { #define TQ_BUFFER_SIZE 8 +typedef struct STqExec { + void* runtimeEnv; + SSDataBlock* (*exec)(void* runtimeEnv); + void* (*assign)(void* runtimeEnv, SSubmitBlk* inputData); + void (*clear)(void* runtimeEnv); + char* (*serialize)(struct STqExec*); + struct STqExec* (*deserialize)(char*); +} STqExec; + typedef struct STqBufferItem { int64_t offset; // executors are identical but not concurrent // so there must be a copy in each item - void* executor; - int64_t size; - void* content; -} STqBufferItem; + STqExec* executor; + int32_t status; + int64_t size; + void* content; +} STqMsgItem; -typedef struct STqBufferHandle { +typedef struct STqTopic { // char* topic; //c style, end with '\0' // int64_t cgId; // void* ahandle; - int64_t nextConsumeOffset; - int64_t floatingCursor; - int64_t topicId; - int32_t head; - int32_t tail; - STqBufferItem buffer[TQ_BUFFER_SIZE]; -} STqBufferHandle; + int64_t nextConsumeOffset; + int64_t floatingCursor; + int64_t topicId; + int32_t head; + int32_t tail; + STqMsgItem buffer[TQ_BUFFER_SIZE]; +} STqTopic; typedef struct STqListHandle { - STqBufferHandle bufHandle; + STqTopic topic; struct STqListHandle* next; -} STqListHandle; +} STqList; -typedef struct STqGroupHandle { - int64_t cId; - int64_t cgId; - void* ahandle; - int32_t topicNum; - STqListHandle* head; -} STqGroupHandle; - -typedef struct STqQueryExec { - void* src; - STqBufferItem* dest; - void* executor; -} STqQueryExec; +typedef struct STqGroup { + int64_t clientId; + int64_t cgId; + void* ahandle; + int32_t topicNum; + STqList* head; + SList* topicList; // SList + void* returnMsg; // SVReadMsg +} STqGroup; typedef struct STqQueryMsg { - STqQueryExec* exec; + STqMsgItem* item; struct STqQueryMsg* next; } STqQueryMsg; @@ -209,15 +205,15 @@ typedef void (*FTqDelete)(void*); #define TQ_DUP_INTXN_REWRITE 0 #define TQ_DUP_INTXN_REJECT 2 -static inline bool TqUpdateAppend(int32_t tqConfigFlag) { return tqConfigFlag & TQ_UPDATE_APPEND; } +static inline bool tqUpdateAppend(int32_t tqConfigFlag) { return tqConfigFlag & TQ_UPDATE_APPEND; } -static inline bool TqDupIntxnReject(int32_t tqConfigFlag) { return tqConfigFlag & TQ_DUP_INTXN_REJECT; } +static inline bool tqDupIntxnReject(int32_t tqConfigFlag) { return tqConfigFlag & TQ_DUP_INTXN_REJECT; } static const int8_t TQ_CONST_DELETE = TQ_ACTION_CONST; #define TQ_DELETE_TOKEN (void*)&TQ_CONST_DELETE -typedef struct TqMetaHandle { +typedef struct STqMetaHandle { int64_t key; int64_t offset; int64_t serializedSize; @@ -225,23 +221,25 @@ typedef struct TqMetaHandle { void* valueInTxn; } STqMetaHandle; -typedef struct TqMetaList { - STqMetaHandle handle; - struct TqMetaList* next; - // struct TqMetaList* inTxnPrev; - // struct TqMetaList* inTxnNext; - struct TqMetaList* unpersistPrev; - struct TqMetaList* unpersistNext; +typedef struct STqMetaList { + STqMetaHandle handle; + struct STqMetaList* next; + // struct STqMetaList* inTxnPrev; + // struct STqMetaList* inTxnNext; + struct STqMetaList* unpersistPrev; + struct STqMetaList* unpersistNext; } STqMetaList; -typedef struct TqMetaStore { +typedef struct STqMetaStore { STqMetaList* bucket[TQ_BUCKET_SIZE]; // a table head STqMetaList* unpersistHead; + // TODO:temporaral use, to be replaced by unified tfile int fileFd; // TODO:temporaral use, to be replaced by unified tfile - int idxFd; + int idxFd; + char* dirPath; int32_t tqConfigFlag; FTqSerialize pSerializer; @@ -250,8 +248,8 @@ typedef struct TqMetaStore { } STqMetaStore; typedef struct STQ { - // the collection of group handle - // the handle of kvstore + // the collection of groups + // the handle of meta kvstore char* path; STqCfg* tqConfig; STqLogReader* tqLogReader; @@ -266,23 +264,25 @@ void tqClose(STQ*); // void* will be replace by a msg type int tqPushMsg(STQ*, void* msg, int64_t version); int tqCommit(STQ*); -int tqSetCursor(STQ*, void* msg); - int tqConsume(STQ*, STqConsumeReq*); -STqGroupHandle* tqGetGroupHandle(STQ*, int64_t cId); +int tqSetCursor(STQ*, STqSetCurReq* pMsg); +int tqBufferSetOffset(STqTopic*, int64_t offset); -STqGroupHandle* tqOpenTCGroup(STQ*, int64_t topicId, int64_t cgId, int64_t cId); -int tqCloseTCGroup(STQ*, int64_t topicId, int64_t cgId, int64_t cId); -int tqMoveOffsetToNext(STqGroupHandle*); -int tqResetOffset(STQ*, int64_t topicId, int64_t cgId, int64_t offset); -int tqRegisterContext(STqGroupHandle*, void* ahandle); -int tqLaunchQuery(STqGroupHandle*); -int tqSendLaunchQuery(STqGroupHandle*); +STqTopic* tqFindTopic(STqGroup*, int64_t topicId); -int tqSerializeGroupHandle(const STqGroupHandle* gHandle, STqSerializedHead** ppHead); +STqGroup* tqGetGroup(STQ*, int64_t clientId); -const void* tqDeserializeGroupHandle(const STqSerializedHead* pHead, STqGroupHandle** gHandle); +STqGroup* tqOpenGroup(STQ*, int64_t topicId, int64_t cgId, int64_t cId); +int tqCloseGroup(STQ*, int64_t topicId, int64_t cgId, int64_t cId); +int tqRegisterContext(STqGroup*, void* ahandle); +int tqSendLaunchQuery(STqMsgItem*, int64_t offset); + +int tqSerializeGroup(const STqGroup*, STqSerializedHead**); + +const void* tqDeserializeGroup(const STqSerializedHead*, STqGroup**); + +static int tqQueryExecuting(int32_t status) { return status; } #ifdef __cplusplus } diff --git a/include/util/taoserror.h b/include/util/taoserror.h index 75aab24f83..95824df5ac 100644 --- a/include/util/taoserror.h +++ b/include/util/taoserror.h @@ -339,6 +339,20 @@ int32_t* taosGetErrno(); #define TSDB_CODE_SYN_INVALID_MSGLEN TAOS_DEF_ERROR_CODE(0, 0x0909) //"Invalid msg length") #define TSDB_CODE_SYN_INVALID_MSGTYPE TAOS_DEF_ERROR_CODE(0, 0x090A) //"Invalid msg type") +// tq +#define TSDB_CODE_TQ_INVALID_CONFIG TAOS_DEF_ERROR_CODE(0, 0x0A00) //"Invalid configuration") +#define TSDB_CODE_TQ_INIT_FAILED TAOS_DEF_ERROR_CODE(0, 0x0A01) //"Tq init failed") +#define TSDB_CODE_TQ_NO_DISKSPACE TAOS_DEF_ERROR_CODE(0, 0x0A02) //"No diskspace for tq") +#define TSDB_CODE_TQ_NO_DISK_PERMISSIONS TAOS_DEF_ERROR_CODE(0, 0x0A03) //"No permission for disk files") +#define TSDB_CODE_TQ_FILE_CORRUPTED TAOS_DEF_ERROR_CODE(0, 0x0A04) //"Data file(s) corrupted") +#define TSDB_CODE_TQ_OUT_OF_MEMORY TAOS_DEF_ERROR_CODE(0, 0x0A05) //"Out of memory") +#define TSDB_CODE_TQ_FILE_ALREADY_EXISTS TAOS_DEF_ERROR_CODE(0, 0x0A06) //"File already exists") +#define TSDB_CODE_TQ_FAILED_TO_CREATE_DIR TAOS_DEF_ERROR_CODE(0, 0x0A07) //"Failed to create dir") +#define TSDB_CODE_TQ_META_NO_SUCH_KEY TAOS_DEF_ERROR_CODE(0, 0x0A08) //"Target key not found") +#define TSDB_CODE_TQ_META_KEY_NOT_IN_TXN TAOS_DEF_ERROR_CODE(0, 0x0A09) //"Target key not in transaction") +#define TSDB_CODE_TQ_META_KEY_DUP_IN_TXN TAOS_DEF_ERROR_CODE(0, 0x0A0A) //"Target key duplicated in transaction") +#define TSDB_CODE_TQ_GROUP_NOT_SET TAOS_DEF_ERROR_CODE(0, 0x0A0B) //"Group of corresponding client is not set by mnode") + // wal #define TSDB_CODE_WAL_APP_ERROR TAOS_DEF_ERROR_CODE(0, 0x1000) //"Unexpected generic error in wal") #define TSDB_CODE_WAL_FILE_CORRUPTED TAOS_DEF_ERROR_CODE(0, 0x1001) //"WAL file is corrupted") diff --git a/include/util/tlog.h b/include/util/tlog.h index 5e6604598d..5c91398cdc 100644 --- a/include/util/tlog.h +++ b/include/util/tlog.h @@ -42,11 +42,11 @@ extern int32_t qDebugFlag; extern int32_t wDebugFlag; extern int32_t sDebugFlag; extern int32_t tsdbDebugFlag; +extern int32_t tqDebugFlag; extern int32_t cqDebugFlag; extern int32_t debugFlag; extern int32_t ctgDebugFlag; - #define DEBUG_FATAL 1U #define DEBUG_ERROR DEBUG_FATAL #define DEBUG_WARN 2U diff --git a/source/dnode/mgmt/impl/src/dndTransport.c b/source/dnode/mgmt/impl/src/dndTransport.c index 3402d3ff68..50b1a1cf20 100644 --- a/source/dnode/mgmt/impl/src/dndTransport.c +++ b/source/dnode/mgmt/impl/src/dndTransport.c @@ -44,7 +44,7 @@ static void dndInitMsgFp(STransMgmt *pMgmt) { pMgmt->msgFp[TSDB_MSG_TYPE_MQ_CONSUME] = dndProcessVnodeQueryMsg; pMgmt->msgFp[TSDB_MSG_TYPE_MQ_CONNECT] = dndProcessVnodeWriteMsg; pMgmt->msgFp[TSDB_MSG_TYPE_MQ_DISCONNECT] = dndProcessVnodeWriteMsg; - pMgmt->msgFp[TSDB_MSG_TYPE_MQ_SET] = dndProcessVnodeWriteMsg; + pMgmt->msgFp[TSDB_MSG_TYPE_MQ_SET_CUR] = dndProcessVnodeWriteMsg; // msg from client to mnode pMgmt->msgFp[TSDB_MSG_TYPE_CONNECT] = dndProcessMnodeReadMsg; diff --git a/source/dnode/vnode/impl/src/vnodeWrite.c b/source/dnode/vnode/impl/src/vnodeWrite.c index 85e044266a..fc977ce03f 100644 --- a/source/dnode/vnode/impl/src/vnodeWrite.c +++ b/source/dnode/vnode/impl/src/vnodeWrite.c @@ -16,17 +16,13 @@ #include "vnodeDef.h" int vnodeProcessNoWalWMsgs(SVnode *pVnode, SRpcMsg *pMsg) { - SVnodeReq *pVnodeReq; - switch (pMsg->msgType) { - case TSDB_MSG_TYPE_MQ_SET: + case TSDB_MSG_TYPE_MQ_SET_CUR: if (tqSetCursor(pVnode->pTq, pMsg->pCont) < 0) { // TODO: handle error } break; } - - void *pBuf = pMsg->pCont; return 0; } diff --git a/source/dnode/vnode/tq/CMakeLists.txt b/source/dnode/vnode/tq/CMakeLists.txt index 536e97d5f7..8d59c7b07a 100644 --- a/source/dnode/vnode/tq/CMakeLists.txt +++ b/source/dnode/vnode/tq/CMakeLists.txt @@ -11,6 +11,7 @@ target_link_libraries( PUBLIC wal PUBLIC os PUBLIC util + PUBLIC common ) if(${BUILD_TEST}) diff --git a/source/dnode/vnode/tq/inc/tqInt.h b/source/dnode/vnode/tq/inc/tqInt.h index 022b599816..5685a29d03 100644 --- a/source/dnode/vnode/tq/inc/tqInt.h +++ b/source/dnode/vnode/tq/inc/tqInt.h @@ -17,11 +17,20 @@ #define _TD_TQ_INT_H_ #include "tq.h" - +#include "tlog.h" #ifdef __cplusplus extern "C" { #endif +extern int32_t tqDebugFlag; + +#define tqFatal(...) { if (tqDebugFlag & DEBUG_FATAL) { taosPrintLog("TQ FATAL ", 255, __VA_ARGS__); }} +#define tqError(...) { if (tqDebugFlag & DEBUG_ERROR) { taosPrintLog("TQ ERROR ", 255, __VA_ARGS__); }} +#define tqWarn(...) { if (tqDebugFlag & DEBUG_WARN) { taosPrintLog("TQ WARN ", 255, __VA_ARGS__); }} +#define tqInfo(...) { if (tqDebugFlag & DEBUG_INFO) { taosPrintLog("TQ ", 255, __VA_ARGS__); }} +#define tqDebug(...) { if (tqDebugFlag & DEBUG_DEBUG) { taosPrintLog("TQ ", tqDebugFlag, __VA_ARGS__); }} +#define tqTrace(...) { if (tqDebugFlag & DEBUG_TRACE) { taosPrintLog("TQ ", tqDebugFlag, __VA_ARGS__); }} + // create persistent storage for meta info such as consuming offset // return value > 0: cgId // return value <= 0: error code diff --git a/source/dnode/vnode/tq/inc/tqMetaStore.h b/source/dnode/vnode/tq/inc/tqMetaStore.h index 5bcedaed74..ef71d8bf14 100644 --- a/source/dnode/vnode/tq/inc/tqMetaStore.h +++ b/source/dnode/vnode/tq/inc/tqMetaStore.h @@ -17,7 +17,7 @@ #define _TQ_META_STORE_H_ #include "os.h" -#include "tq.h" +#include "tqInt.h" #ifdef __cplusplus extern "C" { diff --git a/source/dnode/vnode/tq/src/tq.c b/source/dnode/vnode/tq/src/tq.c index 1a27870a1b..d8dfe4ddcf 100644 --- a/source/dnode/vnode/tq/src/tq.c +++ b/source/dnode/vnode/tq/src/tq.c @@ -24,89 +24,80 @@ // handle management message // -int tqGetgHandleSSize(const STqGroupHandle* gHandle); -int tqBufHandleSSize(); -int tqBufItemSSize(); +int tqGroupSSize(const STqGroup* pGroup); +int tqTopicSSize(); +int tqItemSSize(); -STqGroupHandle* tqFindHandle(STQ* pTq, int64_t topicId, int64_t cgId, int64_t cId) { - STqGroupHandle* gHandle; - return NULL; -} +void* tqSerializeListHandle(STqList* listHandle, void* ptr); +void* tqSerializeTopic(STqTopic* pTopic, void* ptr); +void* tqSerializeItem(STqMsgItem* pItem, void* ptr); -void* tqSerializeListHandle(STqListHandle* listHandle, void* ptr); -void* tqSerializeBufHandle(STqBufferHandle* bufHandle, void* ptr); -void* tqSerializeBufItem(STqBufferItem* bufItem, void* ptr); - -const void* tqDeserializeBufHandle(const void* pBytes, STqBufferHandle* bufHandle); -const void* tqDeserializeBufItem(const void* pBytes, STqBufferItem* bufItem); +const void* tqDeserializeTopic(const void* pBytes, STqTopic* pTopic); +const void* tqDeserializeItem(const void* pBytes, STqMsgItem* pItem); STQ* tqOpen(const char* path, STqCfg* tqConfig, STqLogReader* tqLogReader, SMemAllocatorFactory* allocFac) { STQ* pTq = malloc(sizeof(STQ)); if (pTq == NULL) { - // TODO: memory error + terrno = TSDB_CODE_TQ_OUT_OF_MEMORY; return NULL; } pTq->path = strdup(path); pTq->tqConfig = tqConfig; pTq->tqLogReader = tqLogReader; pTq->tqMemRef.pAlloctorFactory = allocFac; - // pTq->tqMemRef.pAllocator = allocFac->create(allocFac); + pTq->tqMemRef.pAllocator = allocFac->create(allocFac); if (pTq->tqMemRef.pAllocator == NULL) { - // TODO + // TODO: error code of buffer pool } - pTq->tqMeta = - tqStoreOpen(path, (FTqSerialize)tqSerializeGroupHandle, (FTqDeserialize)tqDeserializeGroupHandle, free, 0); + pTq->tqMeta = tqStoreOpen(path, (FTqSerialize)tqSerializeGroup, (FTqDeserialize)tqDeserializeGroup, free, 0); if (pTq->tqMeta == NULL) { // TODO: free STQ return NULL; } return pTq; } - -void tqClose(STQ*pTq) { +void tqClose(STQ* pTq) { // TODO } -static int tqProtoCheck(TmqMsgHead *pMsg) { - return pMsg->protoVer == 0; -} +static int tqProtoCheck(STqMsgHead* pMsg) { return pMsg->protoVer == 0; } -static int tqAckOneTopic(STqBufferHandle* bHandle, TmqOneAck* pAck, STqQueryMsg** ppQuery) { +static int tqAckOneTopic(STqTopic* pTopic, STqOneAck* pAck, STqQueryMsg** ppQuery) { // clean old item and move forward int32_t consumeOffset = pAck->consumeOffset; int idx = consumeOffset % TQ_BUFFER_SIZE; - ASSERT(bHandle->buffer[idx].content && bHandle->buffer[idx].executor); - tfree(bHandle->buffer[idx].content); + ASSERT(pTopic->buffer[idx].content && pTopic->buffer[idx].executor); + tfree(pTopic->buffer[idx].content); if (1 /* TODO: need to launch new query */) { STqQueryMsg* pNewQuery = malloc(sizeof(STqQueryMsg)); if (pNewQuery == NULL) { - // TODO: memory insufficient + terrno = TSDB_CODE_TQ_OUT_OF_MEMORY; return -1; } // TODO: lock executor - pNewQuery->exec->executor = bHandle->buffer[idx].executor; // TODO: read from wal and assign to src - pNewQuery->exec->src = 0; - pNewQuery->exec->dest = &bHandle->buffer[idx]; - pNewQuery->next = *ppQuery; - *ppQuery = pNewQuery; + /*pNewQuery->exec->executor = pTopic->buffer[idx].executor;*/ + /*pNewQuery->exec->src = 0;*/ + /*pNewQuery->exec->dest = &pTopic->buffer[idx];*/ + /*pNewQuery->next = *ppQuery;*/ + /**ppQuery = pNewQuery;*/ } return 0; } -static int tqAck(STqGroupHandle* gHandle, TmqAcks* pAcks) { +static int tqAck(STqGroup* pGroup, STqAcks* pAcks) { int32_t ackNum = pAcks->ackNum; - TmqOneAck* acks = pAcks->acks; + STqOneAck* acks = pAcks->acks; // double ptr for acks and list - int i = 0; - STqListHandle* node = gHandle->head; - int ackCnt = 0; - STqQueryMsg* pQuery = NULL; + int i = 0; + STqList* node = pGroup->head; + int ackCnt = 0; + STqQueryMsg* pQuery = NULL; while (i < ackNum && node->next) { - if (acks[i].topicId == node->next->bufHandle.topicId) { + if (acks[i].topicId == node->next->topic.topicId) { ackCnt++; - tqAckOneTopic(&node->next->bufHandle, &acks[i], &pQuery); - } else if (acks[i].topicId < node->next->bufHandle.topicId) { + tqAckOneTopic(&node->next->topic, &acks[i], &pQuery); + } else if (acks[i].topicId < node->next->topic.topicId) { i++; } else { node = node->next; @@ -118,52 +109,56 @@ static int tqAck(STqGroupHandle* gHandle, TmqAcks* pAcks) { return ackCnt; } -static int tqCommitTCGroup(STqGroupHandle* handle) { +static int tqCommitGroup(STqGroup* pGroup) { // persist modification into disk return 0; } -int tqCreateTCGroup(STQ* pTq, int64_t topicId, int64_t cgId, int64_t cId, STqGroupHandle** handle) { +int tqCreateGroup(STQ* pTq, int64_t topicId, int64_t cgId, int64_t cId, STqGroup** ppGroup) { // create in disk - STqGroupHandle* gHandle = (STqGroupHandle*)malloc(sizeof(STqGroupHandle)); - if (gHandle == NULL) { + STqGroup* pGroup = (STqGroup*)malloc(sizeof(STqGroup)); + if (pGroup == NULL) { // TODO return -1; } - memset(gHandle, 0, sizeof(STqGroupHandle)); + *ppGroup = pGroup; + memset(pGroup, 0, sizeof(STqGroup)); return 0; } -STqGroupHandle* tqOpenTCGroup(STQ* pTq, int64_t topicId, int64_t cgId, int64_t cId) { - STqGroupHandle* gHandle = tqHandleGet(pTq->tqMeta, cId); - if (gHandle == NULL) { - int code = tqCreateTCGroup(pTq, topicId, cgId, cId, &gHandle); - if (code != 0) { +STqGroup* tqOpenGroup(STQ* pTq, int64_t topicId, int64_t cgId, int64_t cId) { + STqGroup* pGroup = tqHandleGet(pTq->tqMeta, cId); + if (pGroup == NULL) { + int code = tqCreateGroup(pTq, topicId, cgId, cId, &pGroup); + if (code < 0) { // TODO return NULL; } + tqHandleMovePut(pTq->tqMeta, cId, pGroup); } + ASSERT(pGroup); - // create - // open - return gHandle; + return pGroup; } -int tqCloseTCGroup(STQ* pTq, int64_t topicId, int64_t cgId, int64_t cId) { return 0; } +int tqCloseGroup(STQ* pTq, int64_t topicId, int64_t cgId, int64_t cId) { + // TODO + return 0; +} -int tqDropTCGroup(STQ* pTq, int64_t topicId, int64_t cgId, int64_t cId) { +int tqDropGroup(STQ* pTq, int64_t topicId, int64_t cgId, int64_t cId) { // delete from disk return 0; } -static int tqFetch(STqGroupHandle* gHandle, void** msg) { - STqListHandle* head = gHandle->head; - STqListHandle* node = head; - int totSize = 0; +static int tqFetch(STqGroup* pGroup, void** msg) { + STqList* head = pGroup->head; + STqList* node = head; + int totSize = 0; // TODO: make it a macro int sizeLimit = 4 * 1024; - TmqMsgContent* buffer = malloc(sizeLimit); + STqMsgContent* buffer = malloc(sizeLimit); if (buffer == NULL) { // TODO:memory insufficient return -1; @@ -172,25 +167,25 @@ static int tqFetch(STqGroupHandle* gHandle, void** msg) { // until all topic iterated or msgs over sizeLimit while (node->next) { node = node->next; - STqBufferHandle* bufHandle = &node->bufHandle; - int idx = bufHandle->nextConsumeOffset % TQ_BUFFER_SIZE; - if (bufHandle->buffer[idx].content != NULL && bufHandle->buffer[idx].offset == bufHandle->nextConsumeOffset) { - totSize += bufHandle->buffer[idx].size; + STqTopic* topicHandle = &node->topic; + int idx = topicHandle->nextConsumeOffset % TQ_BUFFER_SIZE; + if (topicHandle->buffer[idx].content != NULL && topicHandle->buffer[idx].offset == topicHandle->nextConsumeOffset) { + totSize += topicHandle->buffer[idx].size; if (totSize > sizeLimit) { void* ptr = realloc(buffer, totSize); if (ptr == NULL) { - totSize -= bufHandle->buffer[idx].size; + totSize -= topicHandle->buffer[idx].size; // TODO:memory insufficient // return msgs already copied break; } } - *((int64_t*)buffer) = bufHandle->topicId; + *((int64_t*)buffer) = topicHandle->topicId; buffer = POINTER_SHIFT(buffer, sizeof(int64_t)); - *((int64_t*)buffer) = bufHandle->buffer[idx].size; + *((int64_t*)buffer) = topicHandle->buffer[idx].size; buffer = POINTER_SHIFT(buffer, sizeof(int64_t)); - memcpy(buffer, bufHandle->buffer[idx].content, bufHandle->buffer[idx].size); - buffer = POINTER_SHIFT(buffer, bufHandle->buffer[idx].size); + memcpy(buffer, topicHandle->buffer[idx].content, topicHandle->buffer[idx].size); + buffer = POINTER_SHIFT(buffer, topicHandle->buffer[idx].size); if (totSize > sizeLimit) { break; } @@ -199,11 +194,19 @@ static int tqFetch(STqGroupHandle* gHandle, void** msg) { return totSize; } -STqGroupHandle* tqGetGroupHandle(STQ* pTq, int64_t cId) { return NULL; } +STqGroup* tqGetGroup(STQ* pTq, int64_t clientId) { return tqHandleGet(pTq->tqMeta, clientId); } -int tqLaunchQuery(STqGroupHandle* gHandle) { return 0; } - -int tqSendLaunchQuery(STqGroupHandle* gHandle) { return 0; } +int tqSendLaunchQuery(STqMsgItem* bufItem, int64_t offset) { + if (tqQueryExecuting(bufItem->status)) { + return 0; + } + bufItem->status = 1; + // load data from wal or buffer pool + // put into exec + // send exec into non blocking queue + // when query finished, put into buffer pool + return 0; +} /*int tqMoveOffsetToNext(TqGroupHandle* gHandle) {*/ /*return 0;*/ @@ -220,23 +223,96 @@ int tqCommit(STQ* pTq) { return 0; } -int tqSetCursor(STQ* pTq, void* msg) { +int tqBufferSetOffset(STqTopic* pTopic, int64_t offset) { + int code; + memset(pTopic->buffer, 0, sizeof(pTopic->buffer)); + // launch query + for (int i = offset; i < offset + TQ_BUFFER_SIZE; i++) { + int pos = i % TQ_BUFFER_SIZE; + code = tqSendLaunchQuery(&pTopic->buffer[pos], offset); + if (code < 0) { + // TODO: error handling + } + } + // set offset + pTopic->nextConsumeOffset = offset; + pTopic->floatingCursor = offset; return 0; } -int tqConsume(STQ* pTq, STqConsumeReq* pMsg) { - if (!tqProtoCheck((TmqMsgHead*)pMsg)) { - // proto version invalid - return -1; - } - int64_t clientId = pMsg->head.clientId; - STqGroupHandle* gHandle = tqGetGroupHandle(pTq, clientId); +STqTopic* tqFindTopic(STqGroup* pGroup, int64_t topicId) { + // TODO + return NULL; +} + +int tqSetCursor(STQ* pTq, STqSetCurReq* pMsg) { + int code; + int64_t clientId = pMsg->head.clientId; + int64_t topicId = pMsg->topicId; + int64_t offset = pMsg->offset; + STqGroup* gHandle = tqGetGroup(pTq, clientId); if (gHandle == NULL) { // client not connect return -1; } + STqTopic* topicHandle = tqFindTopic(gHandle, topicId); + if (topicHandle == NULL) { + return -1; + } + if (pMsg->offset == topicHandle->nextConsumeOffset) { + return 0; + } + // TODO: check log last version + + code = tqBufferSetOffset(topicHandle, offset); + if (code < 0) { + // set error code + return -1; + } + + return 0; +} + +int tqConsume(STQ* pTq, STqConsumeReq* pMsg) { + int64_t clientId = pMsg->head.clientId; + STqGroup* pGroup = tqGetGroup(pTq, clientId); + if (pGroup == NULL) { + terrno = TSDB_CODE_TQ_GROUP_NOT_SET; + return -1; + } + + STqConsumeRsp* pRsp = (STqConsumeRsp*)pMsg; + int numOfMsgs = tqFetch(pGroup, (void**)&pRsp->msgs); + if (numOfMsgs < 0) { + return -1; + } + if (numOfMsgs == 0) { + // most recent data has been fetched + + // enable timer for blocking wait + // once new data written during wait time + // launch query and response + } + + // fetched a num of msgs, rpc response + + return 0; +} + +#if 0 +int tqConsume(STQ* pTq, STqConsumeReq* pMsg) { + if (!tqProtoCheck((STqMsgHead*)pMsg)) { + // proto version invalid + return -1; + } + int64_t clientId = pMsg->head.clientId; + STqGroup* pGroup = tqGetGroup(pTq, clientId); + if (pGroup == NULL) { + // client not connect + return -1; + } if (pMsg->acks.ackNum != 0) { - if (tqAck(gHandle, &pMsg->acks) != 0) { + if (tqAck(pGroup, &pMsg->acks) != 0) { // ack not success return -1; } @@ -244,22 +320,23 @@ int tqConsume(STQ* pTq, STqConsumeReq* pMsg) { STqConsumeRsp* pRsp = (STqConsumeRsp*)pMsg; - if (tqFetch(gHandle, (void**)&pRsp->msgs) <= 0) { + if (tqFetch(pGroup, (void**)&pRsp->msgs) <= 0) { // fetch error return -1; } // judge and launch new query - if (tqLaunchQuery(gHandle)) { - // launch query error - return -1; - } + /*if (tqSendLaunchQuery(gHandle)) {*/ + // launch query error + /*return -1;*/ + /*}*/ return 0; } +#endif -int tqSerializeGroupHandle(const STqGroupHandle* gHandle, STqSerializedHead** ppHead) { +int tqSerializeGroup(const STqGroup* pGroup, STqSerializedHead** ppHead) { // calculate size - int sz = tqGetgHandleSSize(gHandle) + sizeof(STqSerializedHead); + int sz = tqGroupSSize(pGroup) + sizeof(STqSerializedHead); if (sz > (*ppHead)->ssize) { void* tmpPtr = realloc(*ppHead, sz); if (tmpPtr == NULL) { @@ -272,53 +349,53 @@ int tqSerializeGroupHandle(const STqGroupHandle* gHandle, STqSerializedHead** pp } void* ptr = (*ppHead)->content; // do serialization - *(int64_t*)ptr = gHandle->cId; + *(int64_t*)ptr = pGroup->clientId; ptr = POINTER_SHIFT(ptr, sizeof(int64_t)); - *(int64_t*)ptr = gHandle->cgId; + *(int64_t*)ptr = pGroup->cgId; ptr = POINTER_SHIFT(ptr, sizeof(int64_t)); - *(int32_t*)ptr = gHandle->topicNum; + *(int32_t*)ptr = pGroup->topicNum; ptr = POINTER_SHIFT(ptr, sizeof(int32_t)); - if (gHandle->topicNum > 0) { - tqSerializeListHandle(gHandle->head, ptr); + if (pGroup->topicNum > 0) { + tqSerializeListHandle(pGroup->head, ptr); } return 0; } -void* tqSerializeListHandle(STqListHandle* listHandle, void* ptr) { - STqListHandle* node = listHandle; +void* tqSerializeListHandle(STqList* listHandle, void* ptr) { + STqList* node = listHandle; ASSERT(node != NULL); while (node) { - ptr = tqSerializeBufHandle(&node->bufHandle, ptr); + ptr = tqSerializeTopic(&node->topic, ptr); node = node->next; } return ptr; } -void* tqSerializeBufHandle(STqBufferHandle* bufHandle, void* ptr) { - *(int64_t*)ptr = bufHandle->nextConsumeOffset; +void* tqSerializeTopic(STqTopic* pTopic, void* ptr) { + *(int64_t*)ptr = pTopic->nextConsumeOffset; ptr = POINTER_SHIFT(ptr, sizeof(int64_t)); - *(int64_t*)ptr = bufHandle->topicId; + *(int64_t*)ptr = pTopic->topicId; ptr = POINTER_SHIFT(ptr, sizeof(int64_t)); - *(int32_t*)ptr = bufHandle->head; + *(int32_t*)ptr = pTopic->head; ptr = POINTER_SHIFT(ptr, sizeof(int32_t)); - *(int32_t*)ptr = bufHandle->tail; + *(int32_t*)ptr = pTopic->tail; ptr = POINTER_SHIFT(ptr, sizeof(int32_t)); for (int i = 0; i < TQ_BUFFER_SIZE; i++) { - ptr = tqSerializeBufItem(&bufHandle->buffer[i], ptr); + ptr = tqSerializeItem(&pTopic->buffer[i], ptr); } return ptr; } -void* tqSerializeBufItem(STqBufferItem* bufItem, void* ptr) { +void* tqSerializeItem(STqMsgItem* bufItem, void* ptr) { // TODO: do we need serialize this? // mainly for executor return ptr; } -const void* tqDeserializeGroupHandle(const STqSerializedHead* pHead, STqGroupHandle** ppGHandle) { - STqGroupHandle* gHandle = *ppGHandle; - const void* ptr = pHead->content; - gHandle->cId = *(int64_t*)ptr; +const void* tqDeserializeGroup(const STqSerializedHead* pHead, STqGroup** ppGroup) { + STqGroup* gHandle = *ppGroup; + const void* ptr = pHead->content; + gHandle->clientId = *(int64_t*)ptr; ptr = POINTER_SHIFT(ptr, sizeof(int64_t)); gHandle->cgId = *(int64_t*)ptr; ptr = POINTER_SHIFT(ptr, sizeof(int64_t)); @@ -326,63 +403,63 @@ const void* tqDeserializeGroupHandle(const STqSerializedHead* pHead, STqGroupHan gHandle->topicNum = *(int32_t*)ptr; ptr = POINTER_SHIFT(ptr, sizeof(int32_t)); gHandle->head = NULL; - STqListHandle* node = gHandle->head; + STqList* node = gHandle->head; for (int i = 0; i < gHandle->topicNum; i++) { if (gHandle->head == NULL) { - if ((node = malloc(sizeof(STqListHandle))) == NULL) { + if ((node = malloc(sizeof(STqList))) == NULL) { // TODO: error return NULL; } node->next = NULL; - ptr = tqDeserializeBufHandle(ptr, &node->bufHandle); + ptr = tqDeserializeTopic(ptr, &node->topic); gHandle->head = node; } else { - node->next = malloc(sizeof(STqListHandle)); + node->next = malloc(sizeof(STqList)); if (node->next == NULL) { // TODO: error return NULL; } node->next->next = NULL; - ptr = tqDeserializeBufHandle(ptr, &node->next->bufHandle); + ptr = tqDeserializeTopic(ptr, &node->next->topic); node = node->next; } } return ptr; } -const void* tqDeserializeBufHandle(const void* pBytes, STqBufferHandle* bufHandle) { +const void* tqDeserializeTopic(const void* pBytes, STqTopic* topic) { const void* ptr = pBytes; - bufHandle->nextConsumeOffset = *(int64_t*)ptr; + topic->nextConsumeOffset = *(int64_t*)ptr; ptr = POINTER_SHIFT(ptr, sizeof(int64_t)); - bufHandle->topicId = *(int64_t*)ptr; + topic->topicId = *(int64_t*)ptr; ptr = POINTER_SHIFT(ptr, sizeof(int64_t)); - bufHandle->head = *(int32_t*)ptr; + topic->head = *(int32_t*)ptr; ptr = POINTER_SHIFT(ptr, sizeof(int32_t)); - bufHandle->tail = *(int32_t*)ptr; + topic->tail = *(int32_t*)ptr; ptr = POINTER_SHIFT(ptr, sizeof(int32_t)); for (int i = 0; i < TQ_BUFFER_SIZE; i++) { - ptr = tqDeserializeBufItem(ptr, &bufHandle->buffer[i]); + ptr = tqDeserializeItem(ptr, &topic->buffer[i]); } return ptr; } -const void* tqDeserializeBufItem(const void* pBytes, STqBufferItem* bufItem) { return pBytes; } +const void* tqDeserializeItem(const void* pBytes, STqMsgItem* bufItem) { return pBytes; } // TODO: make this a macro -int tqGetgHandleSSize(const STqGroupHandle* gHandle) { +int tqGroupSSize(const STqGroup* gHandle) { return sizeof(int64_t) * 2 // cId + cgId + sizeof(int32_t) // topicNum - + gHandle->topicNum * tqBufHandleSSize(); + + gHandle->topicNum * tqTopicSSize(); } // TODO: make this a macro -int tqBufHandleSSize() { +int tqTopicSSize() { return sizeof(int64_t) * 2 // nextConsumeOffset + topicId + sizeof(int32_t) * 2 // head + tail - + TQ_BUFFER_SIZE * tqBufItemSSize(); + + TQ_BUFFER_SIZE * tqItemSSize(); } -int tqBufItemSSize() { +int tqItemSSize() { // TODO: do this need serialization? // mainly for executor return 0; diff --git a/source/dnode/vnode/tq/src/tqMetaStore.c b/source/dnode/vnode/tq/src/tqMetaStore.c index 082f0ad28e..57e20010e3 100644 --- a/source/dnode/vnode/tq/src/tqMetaStore.c +++ b/source/dnode/vnode/tq/src/tqMetaStore.c @@ -56,6 +56,7 @@ static inline int tqReadLastPage(int fd, STqIdxPageBuf* pBuf) { int offset = tqSeekLastPage(fd); int nBytes; if ((nBytes = read(fd, pBuf, TQ_PAGE_SIZE)) == -1) { + terrno = TAOS_SYSTEM_ERROR(errno); return -1; } if (nBytes == 0) { @@ -71,7 +72,7 @@ STqMetaStore* tqStoreOpen(const char* path, FTqSerialize serializer, FTqDeserial int32_t tqConfigFlag) { STqMetaStore* pMeta = malloc(sizeof(STqMetaStore)); if (pMeta == NULL) { - // close + terrno = TSDB_CODE_TQ_OUT_OF_MEMORY; return NULL; } memset(pMeta, 0, sizeof(STqMetaStore)); @@ -79,8 +80,9 @@ STqMetaStore* tqStoreOpen(const char* path, FTqSerialize serializer, FTqDeserial // concat data file name and index file name size_t pathLen = strlen(path); pMeta->dirPath = malloc(pathLen + 1); - if (pMeta->dirPath != NULL) { - // TODO: memory insufficient + if (pMeta->dirPath == NULL) { + terrno = TSDB_CODE_TQ_OUT_OF_MEMORY; + return NULL; } strcpy(pMeta->dirPath, path); @@ -88,13 +90,14 @@ STqMetaStore* tqStoreOpen(const char* path, FTqSerialize serializer, FTqDeserial strcpy(name, path); if (taosDirExist(name) != 0 && taosMkDir(name) != 0) { - ASSERT(false); + terrno = TSDB_CODE_TQ_FAILED_TO_CREATE_DIR; + tqError("failed to create dir:%s since %s ", name, terrstr()); } strcat(name, "/" TQ_IDX_NAME); int idxFd = open(name, O_RDWR | O_CREAT, 0755); if (idxFd < 0) { - ASSERT(false); - // close file + terrno = TAOS_SYSTEM_ERROR(errno); + tqError("failed to open file:%s since %s ", name, terrstr()); // free memory return NULL; } @@ -102,9 +105,7 @@ STqMetaStore* tqStoreOpen(const char* path, FTqSerialize serializer, FTqDeserial pMeta->idxFd = idxFd; pMeta->unpersistHead = malloc(sizeof(STqMetaList)); if (pMeta->unpersistHead == NULL) { - ASSERT(false); - // close file - // free memory + terrno = TSDB_CODE_TQ_OUT_OF_MEMORY; return NULL; } memset(pMeta->unpersistHead, 0, sizeof(STqMetaList)); @@ -114,7 +115,8 @@ STqMetaStore* tqStoreOpen(const char* path, FTqSerialize serializer, FTqDeserial strcat(name, "/" TQ_META_NAME); int fileFd = open(name, O_RDWR | O_CREAT, 0755); if (fileFd < 0) { - ASSERT(false); + terrno = TAOS_SYSTEM_ERROR(errno); + tqError("failed to open file:%s since %s", name, terrstr()); return NULL; } @@ -129,7 +131,7 @@ STqMetaStore* tqStoreOpen(const char* path, FTqSerialize serializer, FTqDeserial STqIdxPageBuf idxBuf; STqSerializedHead* serializedObj = malloc(TQ_PAGE_SIZE); if (serializedObj == NULL) { - // TODO:memory insufficient + terrno = TSDB_CODE_TQ_OUT_OF_MEMORY; } int idxRead; int allocated = TQ_PAGE_SIZE; @@ -137,14 +139,16 @@ STqMetaStore* tqStoreOpen(const char* path, FTqSerialize serializer, FTqDeserial while ((idxRead = read(idxFd, &idxBuf, TQ_PAGE_SIZE))) { if (idxRead == -1) { // TODO: handle error - ASSERT(false); + terrno = TAOS_SYSTEM_ERROR(errno); + tqError("failed to read tq index file since %s", terrstr()); } ASSERT(idxBuf.head.writeOffset == idxRead); // loop read every entry for (int i = 0; i < idxBuf.head.writeOffset - TQ_IDX_PAGE_HEAD_SIZE; i += TQ_IDX_SIZE) { STqMetaList* pNode = malloc(sizeof(STqMetaList)); if (pNode == NULL) { - // TODO: free memory and return error + terrno = TSDB_CODE_TQ_OUT_OF_MEMORY; + // TODO: free memory } memset(pNode, 0, sizeof(STqMetaList)); memcpy(&pNode->handle, &idxBuf.buffer[i], TQ_IDX_SIZE); @@ -153,7 +157,8 @@ STqMetaStore* tqStoreOpen(const char* path, FTqSerialize serializer, FTqDeserial if (allocated < pNode->handle.serializedSize) { void* ptr = realloc(serializedObj, pNode->handle.serializedSize); if (ptr == NULL) { - // TODO: memory insufficient + terrno = TSDB_CODE_TQ_OUT_OF_MEMORY; + // TODO: free memory } serializedObj = ptr; allocated = pNode->handle.serializedSize; @@ -292,7 +297,7 @@ int32_t tqStorePersist(STqMetaStore* pMeta) { STqMetaList* pNode = pHead->unpersistNext; STqSerializedHead* pSHead = malloc(sizeof(STqSerializedHead)); if (pSHead == NULL) { - // TODO: memory error + terrno = TSDB_CODE_TQ_OUT_OF_MEMORY; return -1; } pSHead->ver = TQ_SVER; @@ -403,7 +408,6 @@ static int32_t tqHandlePutCommitted(STqMetaStore* pMeta, int64_t key, void* valu STqMetaList* pNode = pMeta->bucket[bucketKey]; while (pNode) { if (pNode->handle.key == key) { - // TODO: think about thread safety if (pNode->handle.valueInUse && pNode->handle.valueInUse != TQ_DELETE_TOKEN) { pMeta->pDeleter(pNode->handle.valueInUse); } @@ -416,7 +420,7 @@ static int32_t tqHandlePutCommitted(STqMetaStore* pMeta, int64_t key, void* valu } STqMetaList* pNewNode = malloc(sizeof(STqMetaList)); if (pNewNode == NULL) { - // TODO: memory error + terrno = TSDB_CODE_TQ_OUT_OF_MEMORY; return -1; } memset(pNewNode, 0, sizeof(STqMetaList)); @@ -470,10 +474,10 @@ static inline int32_t tqHandlePutImpl(STqMetaStore* pMeta, int64_t key, void* va STqMetaList* pNode = pMeta->bucket[bucketKey]; while (pNode) { if (pNode->handle.key == key) { - // TODO: think about thread safety if (pNode->handle.valueInTxn) { - if (TqDupIntxnReject(pMeta->tqConfigFlag)) { - return -2; + if (tqDupIntxnReject(pMeta->tqConfigFlag)) { + terrno = TSDB_CODE_TQ_META_KEY_DUP_IN_TXN; + return -1; } if (pNode->handle.valueInTxn != TQ_DELETE_TOKEN) { pMeta->pDeleter(pNode->handle.valueInTxn); @@ -488,7 +492,7 @@ static inline int32_t tqHandlePutImpl(STqMetaStore* pMeta, int64_t key, void* va } STqMetaList* pNewNode = malloc(sizeof(STqMetaList)); if (pNewNode == NULL) { - // TODO: memory error + terrno = TSDB_CODE_TQ_OUT_OF_MEMORY; return -1; } memset(pNewNode, 0, sizeof(STqMetaList)); @@ -505,7 +509,7 @@ int32_t tqHandleMovePut(STqMetaStore* pMeta, int64_t key, void* value) { return int32_t tqHandleCopyPut(STqMetaStore* pMeta, int64_t key, void* value, size_t vsize) { void* vmem = malloc(vsize); if (vmem == NULL) { - // TODO: memory error + terrno = TSDB_CODE_TQ_OUT_OF_MEMORY; return -1; } memcpy(vmem, value, vsize); @@ -535,6 +539,7 @@ int32_t tqHandleCommit(STqMetaStore* pMeta, int64_t key) { while (pNode) { if (pNode->handle.key == key) { if (pNode->handle.valueInTxn == NULL) { + terrno = TSDB_CODE_TQ_META_KEY_NOT_IN_TXN; return -1; } if (pNode->handle.valueInUse && pNode->handle.valueInUse != TQ_DELETE_TOKEN) { @@ -548,7 +553,8 @@ int32_t tqHandleCommit(STqMetaStore* pMeta, int64_t key) { pNode = pNode->next; } } - return -2; + terrno = TSDB_CODE_TQ_META_NO_SUCH_KEY; + return -1; } int32_t tqHandleAbort(STqMetaStore* pMeta, int64_t key) { @@ -564,12 +570,14 @@ int32_t tqHandleAbort(STqMetaStore* pMeta, int64_t key) { tqLinkUnpersist(pMeta, pNode); return 0; } + terrno = TSDB_CODE_TQ_META_KEY_NOT_IN_TXN; return -1; } else { pNode = pNode->next; } } - return -2; + terrno = TSDB_CODE_TQ_META_NO_SUCH_KEY; + return -1; } int32_t tqHandleDel(STqMetaStore* pMeta, int64_t key) { @@ -588,7 +596,7 @@ int32_t tqHandleDel(STqMetaStore* pMeta, int64_t key) { pNode = pNode->next; } } - // no such key + terrno = TSDB_CODE_TQ_META_NO_SUCH_KEY; return -1; } diff --git a/source/dnode/vnode/tq/test/tqMetaTest.cpp b/source/dnode/vnode/tq/test/tqMetaTest.cpp index 58263efa71..d3c9b50e4a 100644 --- a/source/dnode/vnode/tq/test/tqMetaTest.cpp +++ b/source/dnode/vnode/tq/test/tqMetaTest.cpp @@ -10,8 +10,8 @@ struct Foo { }; int FooSerializer(const void* pObj, STqSerializedHead** ppHead) { - Foo* foo = (Foo*) pObj; - if((*ppHead) == NULL || (*ppHead)->ssize < sizeof(STqSerializedHead) + sizeof(int32_t)) { + Foo* foo = (Foo*)pObj; + if ((*ppHead) == NULL || (*ppHead)->ssize < sizeof(STqSerializedHead) + sizeof(int32_t)) { *ppHead = (STqSerializedHead*)realloc(*ppHead, sizeof(STqSerializedHead) + sizeof(int32_t)); (*ppHead)->ssize = sizeof(STqSerializedHead) + sizeof(int32_t); } @@ -20,36 +20,28 @@ int FooSerializer(const void* pObj, STqSerializedHead** ppHead) { } const void* FooDeserializer(const STqSerializedHead* pHead, void** ppObj) { - if(*ppObj == NULL) { + if (*ppObj == NULL) { *ppObj = realloc(*ppObj, sizeof(int32_t)); } Foo* pFoo = *(Foo**)ppObj; - pFoo->a = *(int32_t*)pHead->content; + pFoo->a = *(int32_t*)pHead->content; return NULL; } -void FooDeleter(void* pObj) { - free(pObj); -} +void FooDeleter(void* pObj) { free(pObj); } class TqMetaUpdateAppendTest : public ::testing::Test { - protected: + protected: + void SetUp() override { + taosRemoveDir(pathName); + pMeta = tqStoreOpen(pathName, FooSerializer, FooDeserializer, FooDeleter, TQ_UPDATE_APPEND); + ASSERT(pMeta); + } - void SetUp() override { - taosRemoveDir(pathName); - pMeta = tqStoreOpen(pathName, - FooSerializer, FooDeserializer, FooDeleter, - TQ_UPDATE_APPEND - ); - ASSERT(pMeta); - } + void TearDown() override { tqStoreClose(pMeta); } - void TearDown() override { - tqStoreClose(pMeta); - } - - TqMetaStore* pMeta; - const char* pathName = "/tmp/tq_test"; + STqMetaStore* pMeta; + const char* pathName = "/tmp/tq_test"; }; TEST_F(TqMetaUpdateAppendTest, copyPutTest) { @@ -57,11 +49,11 @@ TEST_F(TqMetaUpdateAppendTest, copyPutTest) { foo.a = 3; tqHandleCopyPut(pMeta, 1, &foo, sizeof(Foo)); - Foo* pFoo = (Foo*) tqHandleGet(pMeta, 1); + Foo* pFoo = (Foo*)tqHandleGet(pMeta, 1); EXPECT_EQ(pFoo == NULL, true); tqHandleCommit(pMeta, 1); - pFoo = (Foo*) tqHandleGet(pMeta, 1); + pFoo = (Foo*)tqHandleGet(pMeta, 1); EXPECT_EQ(pFoo->a, 3); } @@ -78,10 +70,7 @@ TEST_F(TqMetaUpdateAppendTest, persistTest) { EXPECT_EQ(pBar == NULL, true); tqStoreClose(pMeta); - pMeta = tqStoreOpen(pathName, - FooSerializer, FooDeserializer, FooDeleter, - TQ_UPDATE_APPEND - ); + pMeta = tqStoreOpen(pathName, FooSerializer, FooDeserializer, FooDeleter, TQ_UPDATE_APPEND); ASSERT(pMeta); pBar = (Foo*)tqHandleGet(pMeta, 1); @@ -97,7 +86,7 @@ TEST_F(TqMetaUpdateAppendTest, uncommittedTest) { pFoo->a = 3; tqHandleMovePut(pMeta, 1, pFoo); - pFoo = (Foo*) tqHandleGet(pMeta, 1); + pFoo = (Foo*)tqHandleGet(pMeta, 1); EXPECT_EQ(pFoo == NULL, true); } @@ -106,11 +95,11 @@ TEST_F(TqMetaUpdateAppendTest, abortTest) { pFoo->a = 3; tqHandleMovePut(pMeta, 1, pFoo); - pFoo = (Foo*) tqHandleGet(pMeta, 1); + pFoo = (Foo*)tqHandleGet(pMeta, 1); EXPECT_EQ(pFoo == NULL, true); tqHandleAbort(pMeta, 1); - pFoo = (Foo*) tqHandleGet(pMeta, 1); + pFoo = (Foo*)tqHandleGet(pMeta, 1); EXPECT_EQ(pFoo == NULL, true); } @@ -119,32 +108,29 @@ TEST_F(TqMetaUpdateAppendTest, deleteTest) { pFoo->a = 3; tqHandleMovePut(pMeta, 1, pFoo); - pFoo = (Foo*) tqHandleGet(pMeta, 1); + pFoo = (Foo*)tqHandleGet(pMeta, 1); EXPECT_EQ(pFoo == NULL, true); tqHandleCommit(pMeta, 1); - pFoo = (Foo*) tqHandleGet(pMeta, 1); + pFoo = (Foo*)tqHandleGet(pMeta, 1); ASSERT_EQ(pFoo != NULL, true); EXPECT_EQ(pFoo->a, 3); tqHandleDel(pMeta, 1); - pFoo = (Foo*) tqHandleGet(pMeta, 1); + pFoo = (Foo*)tqHandleGet(pMeta, 1); ASSERT_EQ(pFoo != NULL, true); EXPECT_EQ(pFoo->a, 3); tqHandleCommit(pMeta, 1); - pFoo = (Foo*) tqHandleGet(pMeta, 1); + pFoo = (Foo*)tqHandleGet(pMeta, 1); EXPECT_EQ(pFoo == NULL, true); tqStoreClose(pMeta); - pMeta = tqStoreOpen(pathName, - FooSerializer, FooDeserializer, FooDeleter, - TQ_UPDATE_APPEND - ); + pMeta = tqStoreOpen(pathName, FooSerializer, FooDeserializer, FooDeleter, TQ_UPDATE_APPEND); ASSERT(pMeta); - pFoo = (Foo*) tqHandleGet(pMeta, 1); + pFoo = (Foo*)tqHandleGet(pMeta, 1); EXPECT_EQ(pFoo == NULL, true); } @@ -162,10 +148,7 @@ TEST_F(TqMetaUpdateAppendTest, intxnPersist) { EXPECT_EQ(pFoo1->a, 3); tqStoreClose(pMeta); - pMeta = tqStoreOpen(pathName, - FooSerializer, FooDeserializer, FooDeleter, - TQ_UPDATE_APPEND - ); + pMeta = tqStoreOpen(pathName, FooSerializer, FooDeserializer, FooDeleter, TQ_UPDATE_APPEND); ASSERT(pMeta); pFoo1 = (Foo*)tqHandleGet(pMeta, 1); @@ -177,10 +160,7 @@ TEST_F(TqMetaUpdateAppendTest, intxnPersist) { EXPECT_EQ(pFoo1->a, 4); tqStoreClose(pMeta); - pMeta = tqStoreOpen(pathName, - FooSerializer, FooDeserializer, FooDeleter, - TQ_UPDATE_APPEND - ); + pMeta = tqStoreOpen(pathName, FooSerializer, FooDeserializer, FooDeleter, TQ_UPDATE_APPEND); ASSERT(pMeta); pFoo1 = (Foo*)tqHandleGet(pMeta, 1); @@ -190,13 +170,13 @@ TEST_F(TqMetaUpdateAppendTest, intxnPersist) { TEST_F(TqMetaUpdateAppendTest, multiplePage) { srand(0); std::vector v; - for(int i = 0; i < 1000; i++) { + for (int i = 0; i < 1000; i++) { v.push_back(rand()); Foo foo; foo.a = v[i]; tqHandleCopyPut(pMeta, i, &foo, sizeof(Foo)); } - for(int i = 0; i < 500; i++) { + for (int i = 0; i < 500; i++) { tqHandleCommit(pMeta, i); Foo* pFoo = (Foo*)tqHandleGet(pMeta, i); ASSERT_EQ(pFoo != NULL, true) << " at idx " << i << "\n"; @@ -204,38 +184,34 @@ TEST_F(TqMetaUpdateAppendTest, multiplePage) { } tqStoreClose(pMeta); - pMeta = tqStoreOpen(pathName, - FooSerializer, FooDeserializer, FooDeleter, - TQ_UPDATE_APPEND - ); + pMeta = tqStoreOpen(pathName, FooSerializer, FooDeserializer, FooDeleter, TQ_UPDATE_APPEND); ASSERT(pMeta); - - for(int i = 500; i < 1000; i++) { + + for (int i = 500; i < 1000; i++) { tqHandleCommit(pMeta, i); Foo* pFoo = (Foo*)tqHandleGet(pMeta, i); ASSERT_EQ(pFoo != NULL, true) << " at idx " << i << "\n"; EXPECT_EQ(pFoo->a, v[i]); } - for(int i = 0; i < 1000; i++) { + for (int i = 0; i < 1000; i++) { Foo* pFoo = (Foo*)tqHandleGet(pMeta, i); ASSERT_EQ(pFoo != NULL, true) << " at idx " << i << "\n"; EXPECT_EQ(pFoo->a, v[i]); } - } TEST_F(TqMetaUpdateAppendTest, multipleRewrite) { srand(0); std::vector v; - for(int i = 0; i < 1000; i++) { + for (int i = 0; i < 1000; i++) { v.push_back(rand()); Foo foo; foo.a = v[i]; tqHandleCopyPut(pMeta, i, &foo, sizeof(Foo)); } - for(int i = 0; i < 500; i++) { + for (int i = 0; i < 500; i++) { tqHandleCommit(pMeta, i); v[i] = rand(); Foo foo; @@ -243,25 +219,22 @@ TEST_F(TqMetaUpdateAppendTest, multipleRewrite) { tqHandleCopyPut(pMeta, i, &foo, sizeof(Foo)); } - for(int i = 500; i < 1000; i++) { + for (int i = 500; i < 1000; i++) { v[i] = rand(); Foo foo; foo.a = v[i]; tqHandleCopyPut(pMeta, i, &foo, sizeof(Foo)); } - for(int i = 0; i < 1000; i++) { + for (int i = 0; i < 1000; i++) { tqHandleCommit(pMeta, i); } tqStoreClose(pMeta); - pMeta = tqStoreOpen(pathName, - FooSerializer, FooDeserializer, FooDeleter, - TQ_UPDATE_APPEND - ); + pMeta = tqStoreOpen(pathName, FooSerializer, FooDeserializer, FooDeleter, TQ_UPDATE_APPEND); ASSERT(pMeta); - - for(int i = 500; i < 1000; i++) { + + for (int i = 500; i < 1000; i++) { v[i] = rand(); Foo foo; foo.a = v[i]; @@ -269,40 +242,38 @@ TEST_F(TqMetaUpdateAppendTest, multipleRewrite) { tqHandleCommit(pMeta, i); } - for(int i = 0; i < 1000; i++) { + for (int i = 0; i < 1000; i++) { Foo* pFoo = (Foo*)tqHandleGet(pMeta, i); ASSERT_EQ(pFoo != NULL, true) << " at idx " << i << "\n"; EXPECT_EQ(pFoo->a, v[i]); } - } TEST_F(TqMetaUpdateAppendTest, dupCommit) { srand(0); std::vector v; - for(int i = 0; i < 1000; i++) { + for (int i = 0; i < 1000; i++) { v.push_back(rand()); Foo foo; foo.a = v[i]; tqHandleCopyPut(pMeta, i, &foo, sizeof(Foo)); } - for(int i = 0; i < 1000; i++) { + for (int i = 0; i < 1000; i++) { int ret = tqHandleCommit(pMeta, i); EXPECT_EQ(ret, 0); ret = tqHandleCommit(pMeta, i); EXPECT_EQ(ret, -1); } - for(int i = 0; i < 1000; i++) { + for (int i = 0; i < 1000; i++) { int ret = tqHandleCommit(pMeta, i); EXPECT_EQ(ret, -1); } - for(int i = 0; i < 1000; i++) { + for (int i = 0; i < 1000; i++) { Foo* pFoo = (Foo*)tqHandleGet(pMeta, i); ASSERT_EQ(pFoo != NULL, true) << " at idx " << i << "\n"; EXPECT_EQ(pFoo->a, v[i]); } - } diff --git a/source/util/src/tlog.c b/source/util/src/tlog.c index 23a923a315..5bac59f913 100644 --- a/source/util/src/tlog.c +++ b/source/util/src/tlog.c @@ -14,21 +14,21 @@ */ #define _DEFAULT_SOURCE -#include "os.h" -#include "ulog.h" #include "tlog.h" +#include "os.h" #include "tnote.h" #include "tutil.h" +#include "ulog.h" #include "zlib.h" - -#define MAX_LOGLINE_SIZE (1000) -#define MAX_LOGLINE_BUFFER_SIZE (MAX_LOGLINE_SIZE + 10) -#define MAX_LOGLINE_CONTENT_SIZE (MAX_LOGLINE_SIZE - 100) -#define MAX_LOGLINE_DUMP_SIZE (65 * 1024) -#define MAX_LOGLINE_DUMP_BUFFER_SIZE (MAX_LOGLINE_DUMP_SIZE + 10) + +#define MAX_LOGLINE_SIZE (1000) +#define MAX_LOGLINE_BUFFER_SIZE (MAX_LOGLINE_SIZE + 10) +#define MAX_LOGLINE_CONTENT_SIZE (MAX_LOGLINE_SIZE - 100) +#define MAX_LOGLINE_DUMP_SIZE (65 * 1024) +#define MAX_LOGLINE_DUMP_BUFFER_SIZE (MAX_LOGLINE_DUMP_SIZE + 10) #define MAX_LOGLINE_DUMP_CONTENT_SIZE (MAX_LOGLINE_DUMP_SIZE - 100) -#define LOG_FILE_NAME_LEN 300 +#define LOG_FILE_NAME_LEN 300 #define TSDB_DEFAULT_LOG_BUF_SIZE (20 * 1024 * 1024) // 20MB #define DEFAULT_LOG_INTERVAL 25 @@ -38,13 +38,13 @@ #define LOG_MAX_WAIT_MSEC 1000 #define LOG_BUF_BUFFER(x) ((x)->buffer) -#define LOG_BUF_START(x) ((x)->buffStart) -#define LOG_BUF_END(x) ((x)->buffEnd) -#define LOG_BUF_SIZE(x) ((x)->buffSize) -#define LOG_BUF_MUTEX(x) ((x)->buffMutex) +#define LOG_BUF_START(x) ((x)->buffStart) +#define LOG_BUF_END(x) ((x)->buffEnd) +#define LOG_BUF_SIZE(x) ((x)->buffSize) +#define LOG_BUF_MUTEX(x) ((x)->buffMutex) typedef struct { - char * buffer; + char *buffer; int32_t buffStart; int32_t buffEnd; int32_t buffSize; @@ -57,18 +57,18 @@ typedef struct { } SLogBuff; typedef struct { - int32_t fileNum; - int32_t maxLines; - int32_t lines; - int32_t flag; - int32_t openInProgress; - pid_t pid; - char logName[LOG_FILE_NAME_LEN]; - SLogBuff * logHandle; + int32_t fileNum; + int32_t maxLines; + int32_t lines; + int32_t flag; + int32_t openInProgress; + pid_t pid; + char logName[LOG_FILE_NAME_LEN]; + SLogBuff *logHandle; pthread_mutex_t logMutex; } SLogObj; -int8_t tscEmbeddedInUtil = 0; +int8_t tscEmbeddedInUtil = 0; int32_t tsLogKeepDays = 0; int8_t tsAsyncLog = 1; @@ -93,19 +93,19 @@ int32_t debugFlag = 0; int32_t sDebugFlag = 135; int32_t wDebugFlag = 135; int32_t tsdbDebugFlag = 131; +int32_t tqDebugFlag = 131; int32_t cqDebugFlag = 131; int32_t fsDebugFlag = 135; int32_t ctgDebugFlag = 131; - int64_t dbgEmptyW = 0; int64_t dbgWN = 0; int64_t dbgSmallWN = 0; int64_t dbgBigWN = 0; int64_t dbgWSize = 0; -static SLogObj tsLogObj = { .fileNum = 1 }; -static void * taosAsyncOutputLog(void *param); +static SLogObj tsLogObj = {.fileNum = 1}; +static void *taosAsyncOutputLog(void *param); static int32_t taosPushLogBuffer(SLogBuff *tLogBuff, char *msg, int32_t msgLen); static SLogBuff *taosLogBuffNew(int32_t bufSize); static void taosCloseLogByFd(int32_t oldFd); @@ -139,8 +139,8 @@ static void taosStopLog() { void taosCloseLog() { taosStopLog(); - //tsem_post(&(tsLogObj.logHandle->buffNotEmpty)); - taosMsleep(MAX_LOG_INTERVAL/1000); + // tsem_post(&(tsLogObj.logHandle->buffNotEmpty)); + taosMsleep(MAX_LOG_INTERVAL / 1000); if (taosCheckPthreadValid(tsLogObj.logHandle->asyncThread)) { pthread_join(tsLogObj.logHandle->asyncThread, NULL); } @@ -217,7 +217,7 @@ static void *taosThreadToOpenNewFile(void *param) { tsLogObj.lines = 0; tsLogObj.openInProgress = 0; taosCloseLogByFd(oldFd); - + uInfo(" new log file:%d is opened", tsLogObj.flag); uInfo("=================================="); taosPrintCfg(); @@ -308,9 +308,9 @@ static void taosGetLogFileName(char *fn) { static int32_t taosOpenLogFile(char *fn, int32_t maxLines, int32_t maxFileNum) { #ifdef WINDOWS /* - * always set maxFileNum to 1 - * means client log filename is unique in windows - */ + * always set maxFileNum to 1 + * means client log filename is unique in windows + */ maxFileNum = 1; #endif @@ -381,13 +381,14 @@ static int32_t taosOpenLogFile(char *fn, int32_t maxLines, int32_t maxFileNum) { void taosPrintLog(const char *flags, int32_t dflag, const char *format, ...) { if (tsTotalLogDirGB != 0 && tsAvailLogDirGB < tsMinimalLogDirGB) { - printf("server disk:%s space remain %.3f GB, total %.1f GB, stop print log.\n", tsLogDir, tsAvailLogDirGB, tsTotalLogDirGB); + printf("server disk:%s space remain %.3f GB, total %.1f GB, stop print log.\n", tsLogDir, tsAvailLogDirGB, + tsTotalLogDirGB); fflush(stdout); return; } va_list argpointer; - char buffer[MAX_LOGLINE_BUFFER_SIZE] = { 0 }; + char buffer[MAX_LOGLINE_BUFFER_SIZE] = {0}; int32_t len; struct tm Tm, *ptm; struct timeval timeSecs; @@ -434,20 +435,20 @@ void taosPrintLog(const char *flags, int32_t dflag, const char *format, ...) { } } - if (dflag & DEBUG_SCREEN) - taosWriteFile(1, buffer, (uint32_t)len); + if (dflag & DEBUG_SCREEN) taosWriteFile(1, buffer, (uint32_t)len); if (dflag == 255) nInfo(buffer, len); } void taosDumpData(unsigned char *msg, int32_t len) { if (tsTotalLogDirGB != 0 && tsAvailLogDirGB < tsMinimalLogDirGB) { - printf("server disk:%s space remain %.3f GB, total %.1f GB, stop dump log.\n", tsLogDir, tsAvailLogDirGB, tsTotalLogDirGB); + printf("server disk:%s space remain %.3f GB, total %.1f GB, stop dump log.\n", tsLogDir, tsAvailLogDirGB, + tsTotalLogDirGB); fflush(stdout); return; } - char temp[256]; - int32_t i, pos = 0, c = 0; + char temp[256]; + int32_t i, pos = 0, c = 0; for (i = 0; i < len; ++i) { sprintf(temp + pos, "%02x ", msg[i]); @@ -468,7 +469,8 @@ void taosDumpData(unsigned char *msg, int32_t len) { void taosPrintLongString(const char *flags, int32_t dflag, const char *format, ...) { if (tsTotalLogDirGB != 0 && tsAvailLogDirGB < tsMinimalLogDirGB) { - printf("server disk:%s space remain %.3f GB, total %.1f GB, stop write log.\n", tsLogDir, tsAvailLogDirGB, tsTotalLogDirGB); + printf("server disk:%s space remain %.3f GB, total %.1f GB, stop write log.\n", tsLogDir, tsAvailLogDirGB, + tsTotalLogDirGB); fflush(stdout); return; } @@ -503,7 +505,7 @@ void taosPrintLongString(const char *flags, int32_t dflag, const char *format, . } else { taosWriteFile(tsLogObj.logHandle->fd, buffer, len); } - + if (tsLogObj.maxLines > 0) { atomic_add_fetch_32(&tsLogObj.lines, 1); @@ -542,7 +544,7 @@ static SLogBuff *taosLogBuffNew(int32_t bufSize) { tLogBuff->stop = 0; if (pthread_mutex_init(&LOG_BUF_MUTEX(tLogBuff), NULL) < 0) goto _err; - //tsem_init(&(tLogBuff->buffNotEmpty), 0, 0); + // tsem_init(&(tLogBuff->buffNotEmpty), 0, 0); return tLogBuff; @@ -576,12 +578,12 @@ static void taosCopyLogBuffer(SLogBuff *tLogBuff, int32_t start, int32_t end, ch } static int32_t taosPushLogBuffer(SLogBuff *tLogBuff, char *msg, int32_t msgLen) { - int32_t start = 0; - int32_t end = 0; - int32_t remainSize = 0; + int32_t start = 0; + int32_t end = 0; + int32_t remainSize = 0; static int64_t lostLine = 0; - char tmpBuf[40] = {0}; - int32_t tmpBufLen = 0; + char tmpBuf[40] = {0}; + int32_t tmpBufLen = 0; if (tLogBuff == NULL || tLogBuff->stop) return -1; @@ -592,7 +594,7 @@ static int32_t taosPushLogBuffer(SLogBuff *tLogBuff, char *msg, int32_t msgLen) remainSize = (start > end) ? (start - end - 1) : (start + LOG_BUF_SIZE(tLogBuff) - end - 1); if (lostLine > 0) { - sprintf(tmpBuf, "...Lost %"PRId64" lines here...\n", lostLine); + sprintf(tmpBuf, "...Lost %" PRId64 " lines here...\n", lostLine); tmpBufLen = (int32_t)strlen(tmpBuf); } @@ -610,7 +612,7 @@ static int32_t taosPushLogBuffer(SLogBuff *tLogBuff, char *msg, int32_t msgLen) taosCopyLogBuffer(tLogBuff, LOG_BUF_START(tLogBuff), LOG_BUF_END(tLogBuff), msg, msgLen); - //int32_t w = atomic_sub_fetch_32(&waitLock, 1); + // int32_t w = atomic_sub_fetch_32(&waitLock, 1); /* if (w <= 0 || ((remainSize - msgLen - tmpBufLen) < (LOG_BUF_SIZE(tLogBuff) * 4 /5))) { tsem_post(&(tLogBuff->buffNotEmpty)); @@ -622,7 +624,6 @@ static int32_t taosPushLogBuffer(SLogBuff *tLogBuff, char *msg, int32_t msgLen) pthread_mutex_unlock(&LOG_BUF_MUTEX(tLogBuff)); - return 0; } @@ -634,9 +635,9 @@ static int32_t taosGetLogRemainSize(SLogBuff *tLogBuff, int32_t start, int32_t e static void taosWriteLog(SLogBuff *tLogBuff) { static int32_t lastDuration = 0; - int32_t remainChecked = 0; - int32_t start, end, pollSize; - + int32_t remainChecked = 0; + int32_t start, end, pollSize; + do { if (remainChecked == 0) { start = LOG_BUF_START(tLogBuff); @@ -662,24 +663,24 @@ static void taosWriteLog(SLogBuff *tLogBuff) { if (start < end) { taosWriteFile(tLogBuff->fd, LOG_BUF_BUFFER(tLogBuff) + start, pollSize); } else { - int32_t tsize = LOG_BUF_SIZE(tLogBuff) - start; - taosWriteFile(tLogBuff->fd, LOG_BUF_BUFFER(tLogBuff) + start, tsize); + int32_t tsize = LOG_BUF_SIZE(tLogBuff) - start; + taosWriteFile(tLogBuff->fd, LOG_BUF_BUFFER(tLogBuff) + start, tsize); - taosWriteFile(tLogBuff->fd, LOG_BUF_BUFFER(tLogBuff), end); + taosWriteFile(tLogBuff->fd, LOG_BUF_BUFFER(tLogBuff), end); } dbgWN++; - dbgWSize+=pollSize; - + dbgWSize += pollSize; + if (pollSize < tLogBuff->minBuffSize) { dbgSmallWN++; if (writeInterval < MAX_LOG_INTERVAL) { writeInterval += LOG_INTERVAL_STEP; } - } else if (pollSize > LOG_BUF_SIZE(tLogBuff)/3) { + } else if (pollSize > LOG_BUF_SIZE(tLogBuff) / 3) { dbgBigWN++; writeInterval = MIN_LOG_INTERVAL; - } else if (pollSize > LOG_BUF_SIZE(tLogBuff)/4) { + } else if (pollSize > LOG_BUF_SIZE(tLogBuff) / 4) { if (writeInterval > MIN_LOG_INTERVAL) { writeInterval -= LOG_INTERVAL_STEP; } @@ -698,13 +699,13 @@ static void taosWriteLog(SLogBuff *tLogBuff) { writeInterval = MIN_LOG_INTERVAL; remainChecked = 1; - }while (1); + } while (1); } static void *taosAsyncOutputLog(void *param) { SLogBuff *tLogBuff = (SLogBuff *)param; setThreadName("log"); - + while (1) { taosMsleep(writeInterval); @@ -721,8 +722,8 @@ int32_t taosCompressFile(char *srcFileName, char *destFileName) { int32_t compressSize = 163840; int32_t ret = 0; int32_t len = 0; - char * data = malloc(compressSize); - FILE * srcFp = NULL; + char *data = malloc(compressSize); + FILE *srcFp = NULL; gzFile dstFp = NULL; srcFp = fopen(srcFileName, "r");