Merge remote-tracking branch 'origin/3.0' into feature/shm
This commit is contained in:
commit
41880bac8e
|
@ -212,6 +212,11 @@ if(${BUILD_WITH_UV})
|
|||
MESSAGE("Windows need set no-sign-compare")
|
||||
add_compile_options(-Wno-sign-compare)
|
||||
endif ()
|
||||
if (${CMAKE_SYSTEM_NAME} MATCHES "Windows")
|
||||
file(READ "libuv/include/uv.h" CONTENTS)
|
||||
string(REGEX REPLACE "/([\r]*)\nstruct uv_tcp_s {" "/\\1\ntypedef BOOL (PASCAL *LPFN_CONNECTEX) (SOCKET s, const struct sockaddr* name, int namelen, PVOID lpSendBuffer, DWORD dwSendDataLength,LPDWORD lpdwBytesSent, LPOVERLAPPED lpOverlapped);\\1\nstruct uv_tcp_s {" CONTENTS_NEW "${CONTENTS}")
|
||||
file(WRITE "libuv/include/uv.h" "${CONTENTS_NEW}")
|
||||
endif ()
|
||||
add_subdirectory(libuv)
|
||||
endif(${BUILD_WITH_UV})
|
||||
|
||||
|
@ -243,6 +248,10 @@ if(${BUILD_WITH_SQLITE})
|
|||
endif(${BUILD_WITH_SQLITE})
|
||||
|
||||
# pthread
|
||||
if(${BUILD_PTHREAD})
|
||||
ADD_DEFINITIONS("-DPTW32_STATIC_LIB")
|
||||
add_subdirectory(pthread-win32)
|
||||
endif(${BUILD_PTHREAD})
|
||||
|
||||
|
||||
# ================================================================================================
|
||||
|
|
|
@ -257,6 +257,12 @@ DLL_EXPORT void tmq_conf_set_offset_commit_cb(tmq_conf_t *conf, tmq_co
|
|||
void tmqShowMsg(tmq_message_t *tmq_message);
|
||||
int32_t tmqGetSkipLogNum(tmq_message_t *tmq_message);
|
||||
|
||||
/* -------------------------TMQ MSG HANDLE INTERFACE---------------------- */
|
||||
|
||||
DLL_EXPORT TAOS_ROW tmq_get_row(tmq_message_t *message);
|
||||
DLL_EXPORT char *tmq_get_topic_name(tmq_message_t *message);
|
||||
|
||||
/* ---------------------- OTHER ---------------------------- */
|
||||
typedef void (*TAOS_SUBSCRIBE_CALLBACK)(TAOS_SUB *tsub, TAOS_RES *res, void *param, int code);
|
||||
|
||||
DLL_EXPORT int taos_stmt_affected_rows(TAOS_STMT *stmt);
|
||||
|
|
|
@ -62,18 +62,17 @@ typedef struct SDataBlockInfo {
|
|||
union {int64_t uid; int64_t blockId;};
|
||||
} SDataBlockInfo;
|
||||
|
||||
typedef struct SConstantItem {
|
||||
SColumnInfo info;
|
||||
int32_t startRow; // run-length-encoding to save the space for multiple rows
|
||||
int32_t endRow;
|
||||
SVariant value;
|
||||
} SConstantItem;
|
||||
//typedef struct SConstantItem {
|
||||
// SColumnInfo info;
|
||||
// int32_t startRow; // run-length-encoding to save the space for multiple rows
|
||||
// int32_t endRow;
|
||||
// SVariant value;
|
||||
//} SConstantItem;
|
||||
|
||||
// info.numOfCols = taosArrayGetSize(pDataBlock) + taosArrayGetSize(pConstantList);
|
||||
typedef struct SSDataBlock {
|
||||
SColumnDataAgg *pBlockAgg;
|
||||
SArray *pDataBlock; // SArray<SColumnInfoData>
|
||||
SArray *pConstantList; // SArray<SConstantItem>, it is a constant/tags value of the corresponding result value.
|
||||
SDataBlockInfo info;
|
||||
} SSDataBlock;
|
||||
|
||||
|
@ -95,66 +94,15 @@ typedef struct SColumnInfoData {
|
|||
};
|
||||
} SColumnInfoData;
|
||||
|
||||
static FORCE_INLINE int32_t tEncodeDataBlock(void** buf, const SSDataBlock* pBlock) {
|
||||
int64_t tbUid = pBlock->info.uid;
|
||||
int16_t numOfCols = pBlock->info.numOfCols;
|
||||
int16_t hasVarCol = pBlock->info.hasVarCol;
|
||||
int32_t rows = pBlock->info.rows;
|
||||
int32_t sz = taosArrayGetSize(pBlock->pDataBlock);
|
||||
|
||||
int32_t tlen = 0;
|
||||
tlen += taosEncodeFixedI64(buf, tbUid);
|
||||
tlen += taosEncodeFixedI16(buf, numOfCols);
|
||||
tlen += taosEncodeFixedI16(buf, hasVarCol);
|
||||
tlen += taosEncodeFixedI32(buf, rows);
|
||||
tlen += taosEncodeFixedI32(buf, sz);
|
||||
for (int32_t i = 0; i < sz; i++) {
|
||||
SColumnInfoData* pColData = (SColumnInfoData*)taosArrayGet(pBlock->pDataBlock, i);
|
||||
tlen += taosEncodeFixedI16(buf, pColData->info.colId);
|
||||
tlen += taosEncodeFixedI16(buf, pColData->info.type);
|
||||
tlen += taosEncodeFixedI32(buf, pColData->info.bytes);
|
||||
int32_t colSz = rows * pColData->info.bytes;
|
||||
tlen += taosEncodeBinary(buf, pColData->pData, colSz);
|
||||
}
|
||||
return tlen;
|
||||
}
|
||||
|
||||
static FORCE_INLINE void* tDecodeDataBlock(const void* buf, SSDataBlock* pBlock) {
|
||||
int32_t sz;
|
||||
|
||||
buf = taosDecodeFixedI64(buf, &pBlock->info.uid);
|
||||
buf = taosDecodeFixedI16(buf, &pBlock->info.numOfCols);
|
||||
buf = taosDecodeFixedI16(buf, &pBlock->info.hasVarCol);
|
||||
buf = taosDecodeFixedI32(buf, &pBlock->info.rows);
|
||||
buf = taosDecodeFixedI32(buf, &sz);
|
||||
pBlock->pDataBlock = taosArrayInit(sz, sizeof(SColumnInfoData));
|
||||
for (int32_t i = 0; i < sz; i++) {
|
||||
SColumnInfoData data = {0};
|
||||
buf = taosDecodeFixedI16(buf, &data.info.colId);
|
||||
buf = taosDecodeFixedI16(buf, &data.info.type);
|
||||
buf = taosDecodeFixedI32(buf, &data.info.bytes);
|
||||
int32_t colSz = pBlock->info.rows * data.info.bytes;
|
||||
buf = taosDecodeBinary(buf, (void**)&data.pData, colSz);
|
||||
taosArrayPush(pBlock->pDataBlock, &data);
|
||||
}
|
||||
return (void*)buf;
|
||||
}
|
||||
void* blockDataDestroy(SSDataBlock* pBlock);
|
||||
int32_t tEncodeDataBlock(void** buf, const SSDataBlock* pBlock);
|
||||
void* tDecodeDataBlock(const void* buf, SSDataBlock* pBlock);
|
||||
|
||||
static FORCE_INLINE void tDeleteSSDataBlock(SSDataBlock* pBlock) {
|
||||
if (pBlock == NULL) {
|
||||
return;
|
||||
}
|
||||
|
||||
// int32_t numOfOutput = pBlock->info.numOfCols;
|
||||
int32_t sz = taosArrayGetSize(pBlock->pDataBlock);
|
||||
for (int32_t i = 0; i < sz; ++i) {
|
||||
SColumnInfoData* pColInfoData = (SColumnInfoData*)taosArrayGet(pBlock->pDataBlock, i);
|
||||
tfree(pColInfoData->pData);
|
||||
}
|
||||
|
||||
taosArrayDestroy(pBlock->pDataBlock);
|
||||
tfree(pBlock->pBlockAgg);
|
||||
// tfree(pBlock);
|
||||
blockDataDestroy(pBlock);
|
||||
}
|
||||
|
||||
static FORCE_INLINE int32_t tEncodeSMqPollRsp(void** buf, const SMqPollRsp* pRsp) {
|
||||
|
|
|
@ -52,6 +52,21 @@ SEpSet getEpSet_s(SCorEpSet* pEpSet);
|
|||
BMCharPos(bm_, r_) |= (1u << (7u - BitPos(r_))); \
|
||||
} while (0)
|
||||
|
||||
static FORCE_INLINE bool colDataIsNull_s(const SColumnInfoData* pColumnInfoData, uint32_t row) {
|
||||
if (!pColumnInfoData->hasNull) {
|
||||
return false;
|
||||
}
|
||||
if (IS_VAR_DATA_TYPE(pColumnInfoData->info.type)) {
|
||||
return pColumnInfoData->varmeta.offset[row] == -1;
|
||||
} else {
|
||||
if (pColumnInfoData->nullbitmap == NULL) {
|
||||
return false;
|
||||
}
|
||||
|
||||
return colDataIsNull_f(pColumnInfoData->nullbitmap, row);
|
||||
}
|
||||
}
|
||||
|
||||
static FORCE_INLINE bool colDataIsNull(const SColumnInfoData* pColumnInfoData, uint32_t totalRows, uint32_t row,
|
||||
SColumnDataAgg* pColAgg) {
|
||||
if (!pColumnInfoData->hasNull) {
|
||||
|
@ -81,7 +96,7 @@ static FORCE_INLINE bool colDataIsNull(const SColumnInfoData* pColumnInfoData, u
|
|||
|
||||
#define BitmapLen(_n) (((_n) + ((1 << NBIT) - 1)) >> NBIT)
|
||||
|
||||
|
||||
// SColumnInfoData, rowNumber
|
||||
#define colDataGetData(p1_, r_) \
|
||||
((IS_VAR_DATA_TYPE((p1_)->info.type)) ? ((p1_)->pData + (p1_)->varmeta.offset[(r_)]) \
|
||||
: ((p1_)->pData + ((r_) * (p1_)->info.bytes)))
|
||||
|
|
|
@ -419,7 +419,7 @@ typedef struct {
|
|||
};
|
||||
} SColumnFilterList;
|
||||
/*
|
||||
* for client side struct, we only need the column id, type, bytes are not necessary
|
||||
* for client side struct, only column id, type, bytes are necessary
|
||||
* But for data in vnode side, we need all the following information.
|
||||
*/
|
||||
typedef struct {
|
||||
|
@ -2159,6 +2159,26 @@ typedef struct {
|
|||
SArray* topics; // SArray<SMqSubTopicEp>
|
||||
} SMqCMGetSubEpRsp;
|
||||
|
||||
typedef struct {
|
||||
SMqRspHead head;
|
||||
union {
|
||||
SMqPollRsp consumeRsp;
|
||||
SMqCMGetSubEpRsp getEpRsp;
|
||||
};
|
||||
void* extra;
|
||||
} SMqMsgWrapper;
|
||||
|
||||
typedef struct {
|
||||
int32_t curBlock;
|
||||
int32_t curRow;
|
||||
void** uData;
|
||||
} SMqRowIter;
|
||||
|
||||
struct tmq_message_t_v1 {
|
||||
SMqPollRsp rsp;
|
||||
SMqRowIter iter;
|
||||
};
|
||||
|
||||
struct tmq_message_t {
|
||||
SMqRspHead head;
|
||||
union {
|
||||
|
@ -2166,6 +2186,9 @@ struct tmq_message_t {
|
|||
SMqCMGetSubEpRsp getEpRsp;
|
||||
};
|
||||
void* extra;
|
||||
int32_t curBlock;
|
||||
int32_t curRow;
|
||||
void** uData;
|
||||
};
|
||||
|
||||
static FORCE_INLINE void tDeleteSMqSubTopicEp(SMqSubTopicEp* pSubTopicEp) { taosArrayDestroy(pSubTopicEp->vgs); }
|
||||
|
|
|
@ -189,6 +189,7 @@ enum {
|
|||
|
||||
TD_DEF_MSG_TYPE(TDMT_VND_SUBSCRIBE, "vnode-subscribe", SMVSubscribeReq, SMVSubscribeRsp)
|
||||
TD_DEF_MSG_TYPE(TDMT_VND_CONSUME, "vnode-consume", SMqCVConsumeReq, SMqCVConsumeRsp)
|
||||
TD_DEF_MSG_TYPE(TDMT_VND_TASK_DEPLOY, "vnode-task-deploy", SStreamTaskDeployReq, SStreamTaskDeployRsp)
|
||||
|
||||
TD_DEF_MSG_TYPE(TDMT_VND_CREATE_SMA, "vnode-create-sma", NULL, NULL)
|
||||
TD_DEF_MSG_TYPE(TDMT_VND_CANCEL_SMA, "vnode-cancel-sma", NULL, NULL)
|
||||
|
|
|
@ -69,7 +69,7 @@ typedef struct SProjectLogicNode {
|
|||
} SProjectLogicNode;
|
||||
|
||||
typedef struct SVnodeModifLogicNode {
|
||||
SLogicNode node;;
|
||||
SLogicNode node;
|
||||
int32_t msgType;
|
||||
SArray* pDataBlocks;
|
||||
SVgDataBlocks* pVgDataBlocks;
|
||||
|
@ -124,7 +124,7 @@ typedef struct SSubLogicPlan {
|
|||
} SSubLogicPlan;
|
||||
|
||||
typedef struct SQueryLogicPlan {
|
||||
ENodeType type;;
|
||||
ENodeType type;
|
||||
int32_t totalLevel;
|
||||
SNodeList* pTopSubplans;
|
||||
} SQueryLogicPlan;
|
||||
|
@ -252,7 +252,7 @@ typedef struct SSubplan {
|
|||
} SSubplan;
|
||||
|
||||
typedef struct SQueryPlan {
|
||||
ENodeType type;;
|
||||
ENodeType type;
|
||||
uint64_t queryId;
|
||||
int32_t numOfSubplans;
|
||||
SNodeList* pSubplans; // Element is SNodeListNode. The execution level of subplan, starting from 0.
|
||||
|
|
|
@ -49,6 +49,10 @@ typedef struct STableComInfo {
|
|||
int32_t rowSize; // row size of the schema
|
||||
} STableComInfo;
|
||||
|
||||
typedef struct SIndexMeta {
|
||||
|
||||
} SIndexMeta;
|
||||
|
||||
/*
|
||||
* ASSERT(sizeof(SCTableMeta) == 24)
|
||||
* ASSERT(tableType == TSDB_CHILD_TABLE)
|
||||
|
|
|
@ -198,6 +198,16 @@ void tfsBasename(const STfsFile *pFile, char *dest);
|
|||
*/
|
||||
void tfsDirname(const STfsFile *pFile, char *dest);
|
||||
|
||||
/**
|
||||
* @brief Get the absolute file name of rname.
|
||||
*
|
||||
* @param pTfs
|
||||
* @param diskId
|
||||
* @param rname relative file name
|
||||
* @param aname absolute file name
|
||||
*/
|
||||
void tfsAbsoluteName(STfs *pTfs, SDiskID diskId, const char *rname, char *aname);
|
||||
|
||||
/**
|
||||
* @brief Remove file in tfs.
|
||||
*
|
||||
|
|
|
@ -25,10 +25,11 @@ extern "C" {
|
|||
#include <pthread.h>
|
||||
#include <semaphore.h>
|
||||
|
||||
#include <regex.h>
|
||||
|
||||
#if !defined(WINDOWS)
|
||||
#include <unistd.h>
|
||||
#include <dirent.h>
|
||||
#include <regex.h>
|
||||
#include <sched.h>
|
||||
#include <wordexp.h>
|
||||
#include <libgen.h>
|
||||
|
@ -36,6 +37,12 @@ extern "C" {
|
|||
#include <sys/utsname.h>
|
||||
#include <sys/param.h>
|
||||
#include <sys/mman.h>
|
||||
#include <sys/ioctl.h>
|
||||
#include <sys/stat.h>
|
||||
#include <sys/time.h>
|
||||
#include <sys/types.h>
|
||||
#include <termios.h>
|
||||
#include <sys/statvfs.h>
|
||||
|
||||
#if defined(DARWIN)
|
||||
#else
|
||||
|
@ -61,12 +68,7 @@ extern "C" {
|
|||
#include <stdio.h>
|
||||
#include <stdlib.h>
|
||||
#include <string.h>
|
||||
#include <sys/ioctl.h>
|
||||
#include <sys/stat.h>
|
||||
#include <sys/time.h>
|
||||
#include <sys/types.h>
|
||||
#include <wchar.h>
|
||||
#include <termios.h>
|
||||
#include <wctype.h>
|
||||
|
||||
|
||||
|
|
|
@ -56,6 +56,7 @@ extern "C" {
|
|||
// specific
|
||||
typedef int (*__compar_fn_t)(const void *, const void *);
|
||||
#define ssize_t int
|
||||
#define _SSIZE_T_
|
||||
#define bzero(ptr, size) memset((ptr), 0, (size))
|
||||
#define strcasecmp _stricmp
|
||||
#define strncasecmp _strnicmp
|
||||
|
|
|
@ -24,7 +24,7 @@ extern "C" {
|
|||
|
||||
#if defined(WINDOWS)
|
||||
typedef int32_t FileFd;
|
||||
typedef SOCKET SocketFd;
|
||||
typedef int32_t SocketFd;
|
||||
#else
|
||||
typedef int32_t FileFd;
|
||||
typedef int32_t SocketFd;
|
||||
|
|
|
@ -25,6 +25,8 @@
|
|||
#define epoll_create EPOLL_CREATE_FUNC_TAOS_FORBID
|
||||
#define epoll_ctl EPOLL_CTL_FUNC_TAOS_FORBID
|
||||
#define epoll_wait EPOLL_WAIT_FUNC_TAOS_FORBID
|
||||
#define inet_addr INET_ADDR_FUNC_TAOS_FORBID
|
||||
#define inet_ntoa INET_NTOA_FUNC_TAOS_FORBID
|
||||
#endif
|
||||
|
||||
#if defined(WINDOWS)
|
||||
|
|
|
@ -16,7 +16,6 @@
|
|||
#ifndef _TD_OS_SYSINFO_H_
|
||||
#define _TD_OS_SYSINFO_H_
|
||||
|
||||
#include <sys/statvfs.h>
|
||||
#include "os.h"
|
||||
|
||||
#ifdef __cplusplus
|
||||
|
|
|
@ -75,7 +75,7 @@ typedef struct {
|
|||
#define TD_CODER_CURRENT(CODER) ((CODER)->data + (CODER)->pos)
|
||||
#define TD_CODER_MOVE_POS(CODER, MOVE) ((CODER)->pos += (MOVE))
|
||||
#define TD_CODER_CHECK_CAPACITY_FAILED(CODER, EXPSIZE) (((CODER)->size - (CODER)->pos) < (EXPSIZE))
|
||||
#define TCODER_MALLOC(SIZE, CODER) TFL_MALLOC(SIZE, &((CODER)->fl))
|
||||
#define TCODER_MALLOC(PTR, TYPE, SIZE, CODER) TFL_MALLOC(PTR, TYPE, SIZE, &((CODER)->fl))
|
||||
|
||||
void tCoderInit(SCoder* pCoder, td_endian_t endian, uint8_t* data, int32_t size, td_coder_t type);
|
||||
void tCoderClear(SCoder* pCoder);
|
||||
|
|
|
@ -29,15 +29,17 @@ struct SFreeListNode {
|
|||
|
||||
typedef TD_SLIST(SFreeListNode) SFreeList;
|
||||
|
||||
#define TFL_MALLOC(SIZE, LIST) \
|
||||
({ \
|
||||
#define TFL_MALLOC(PTR, TYPE, SIZE, LIST) \
|
||||
do { \
|
||||
void *ptr = malloc((SIZE) + sizeof(struct SFreeListNode)); \
|
||||
if (ptr) { \
|
||||
TD_SLIST_PUSH((LIST), (struct SFreeListNode *)ptr); \
|
||||
ptr = ((struct SFreeListNode *)ptr)->payload; \
|
||||
(PTR) = (TYPE)(ptr); \
|
||||
}else{ \
|
||||
(PTR) = NULL; \
|
||||
} \
|
||||
ptr; \
|
||||
})
|
||||
}while(0);
|
||||
|
||||
#define tFreeListInit(pFL) TD_SLIST_INIT(pFL)
|
||||
|
||||
|
|
|
@ -82,7 +82,7 @@ typedef uint16_t VarDataLenT; // maxVarDataLen: 32767
|
|||
#define VARSTR_HEADER_SIZE sizeof(VarDataLenT)
|
||||
|
||||
#define varDataLen(v) ((VarDataLenT *)(v))[0]
|
||||
#define varDataVal(v) ((void *)((char *)v + VARSTR_HEADER_SIZE))
|
||||
#define varDataVal(v) ((char *)(v) + VARSTR_HEADER_SIZE)
|
||||
|
||||
typedef int32_t VarDataOffsetT;
|
||||
|
||||
|
|
|
@ -700,6 +700,10 @@ int32_t tmqPollCb(void* param, const SDataBuf* pMsg, int32_t code) {
|
|||
}
|
||||
memcpy(pRsp, pMsg->pData, sizeof(SMqRspHead));
|
||||
tDecodeSMqPollRsp(POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)), &pRsp->consumeRsp);
|
||||
pRsp->curBlock = 0;
|
||||
pRsp->curRow = 0;
|
||||
// TODO: alloc mem
|
||||
/*pRsp->*/
|
||||
/*printf("rsp commit off:%ld rsp off:%ld has data:%d\n", pRsp->committedOffset, pRsp->rspOffset, pRsp->numOfTopics);*/
|
||||
if (pRsp->consumeRsp.numOfTopics == 0) {
|
||||
/*printf("no data\n");*/
|
||||
|
@ -758,9 +762,9 @@ int32_t tmqAskEpCb(void* param, const SDataBuf* pMsg, int32_t code) {
|
|||
goto END;
|
||||
}
|
||||
|
||||
// tmq's epoch is monotomically increase,
|
||||
// tmq's epoch is monotonically increase,
|
||||
// so it's safe to discard any old epoch msg.
|
||||
// epoch will only increase when received newer epoch ep msg
|
||||
// Epoch will only increase when received newer epoch ep msg
|
||||
SMqRspHead* head = pMsg->pData;
|
||||
int32_t epoch = atomic_load_32(&tmq->epoch);
|
||||
if (head->epoch <= epoch) {
|
||||
|
@ -1282,6 +1286,34 @@ const char* tmq_err2str(tmq_resp_err_t err) {
|
|||
return "fail";
|
||||
}
|
||||
|
||||
TAOS_ROW tmq_get_row(tmq_message_t* message) {
|
||||
SMqPollRsp* rsp = &message->consumeRsp;
|
||||
while (1) {
|
||||
if (message->curBlock < taosArrayGetSize(rsp->pBlockData)) {
|
||||
SSDataBlock* pBlock = taosArrayGet(rsp->pBlockData, message->curBlock);
|
||||
if (message->curRow < pBlock->info.rows) {
|
||||
for (int i = 0; i < pBlock->info.numOfCols; i++) {
|
||||
SColumnInfoData* pData = taosArrayGet(pBlock->pDataBlock, i);
|
||||
if (colDataIsNull_s(pData, message->curRow))
|
||||
message->uData[i] = NULL;
|
||||
else {
|
||||
message->uData[i] = colDataGetData(pData, message->curRow);
|
||||
}
|
||||
}
|
||||
message->curRow++;
|
||||
return message->uData;
|
||||
} else {
|
||||
message->curBlock++;
|
||||
message->curRow = 0;
|
||||
continue;
|
||||
}
|
||||
}
|
||||
return NULL;
|
||||
}
|
||||
}
|
||||
|
||||
char* tmq_get_topic_name(tmq_message_t* message) { return "not implemented yet"; }
|
||||
|
||||
#if 0
|
||||
tmq_t* tmqCreateConsumerImpl(TAOS* conn, tmq_conf_t* conf) {
|
||||
tmq_t* pTmq = malloc(sizeof(tmq_t));
|
||||
|
|
|
@ -240,10 +240,7 @@ int32_t colDataMergeCol(SColumnInfoData* pColumnInfoData, uint32_t numOfRow1, co
|
|||
}
|
||||
|
||||
size_t blockDataGetNumOfCols(const SSDataBlock* pBlock) {
|
||||
ASSERT(pBlock);
|
||||
|
||||
size_t constantCols = (pBlock->pConstantList != NULL)? taosArrayGetSize(pBlock->pConstantList):0;
|
||||
ASSERT( pBlock->info.numOfCols == taosArrayGetSize(pBlock->pDataBlock) + constantCols);
|
||||
ASSERT(pBlock && pBlock->info.numOfCols == taosArrayGetSize(pBlock->pDataBlock));
|
||||
return pBlock->info.numOfCols;
|
||||
}
|
||||
|
||||
|
@ -1166,3 +1163,67 @@ SSDataBlock* createOneDataBlock(const SSDataBlock* pDataBlock) {
|
|||
size_t blockDataGetCapacityInRow(const SSDataBlock* pBlock, size_t pageSize) {
|
||||
return pageSize / (blockDataGetSerialRowSize(pBlock) + blockDataGetSerialMetaSize(pBlock));
|
||||
}
|
||||
|
||||
int32_t tEncodeDataBlock(void** buf, const SSDataBlock* pBlock) {
|
||||
int64_t tbUid = pBlock->info.uid;
|
||||
int16_t numOfCols = pBlock->info.numOfCols;
|
||||
int16_t hasVarCol = pBlock->info.hasVarCol;
|
||||
int32_t rows = pBlock->info.rows;
|
||||
int32_t sz = taosArrayGetSize(pBlock->pDataBlock);
|
||||
|
||||
int32_t tlen = 0;
|
||||
tlen += taosEncodeFixedI64(buf, tbUid);
|
||||
tlen += taosEncodeFixedI16(buf, numOfCols);
|
||||
tlen += taosEncodeFixedI16(buf, hasVarCol);
|
||||
tlen += taosEncodeFixedI32(buf, rows);
|
||||
tlen += taosEncodeFixedI32(buf, sz);
|
||||
for (int32_t i = 0; i < sz; i++) {
|
||||
SColumnInfoData* pColData = (SColumnInfoData*)taosArrayGet(pBlock->pDataBlock, i);
|
||||
tlen += taosEncodeFixedI16(buf, pColData->info.colId);
|
||||
tlen += taosEncodeFixedI16(buf, pColData->info.type);
|
||||
tlen += taosEncodeFixedI32(buf, pColData->info.bytes);
|
||||
|
||||
if (IS_VAR_DATA_TYPE(pColData->info.type)) {
|
||||
tlen += taosEncodeBinary(buf, pColData->varmeta.offset, sizeof(int32_t) * rows);
|
||||
} else {
|
||||
tlen += taosEncodeBinary(buf, pColData->nullbitmap, BitmapLen(rows));
|
||||
}
|
||||
|
||||
int32_t len = colDataGetLength(pColData, rows);
|
||||
taosEncodeFixedI32(buf, len);
|
||||
|
||||
tlen += taosEncodeBinary(buf, pColData->pData, len);
|
||||
}
|
||||
return tlen;
|
||||
}
|
||||
|
||||
void* tDecodeDataBlock(const void* buf, SSDataBlock* pBlock) {
|
||||
int32_t sz;
|
||||
|
||||
buf = taosDecodeFixedI64(buf, &pBlock->info.uid);
|
||||
buf = taosDecodeFixedI16(buf, &pBlock->info.numOfCols);
|
||||
buf = taosDecodeFixedI16(buf, &pBlock->info.hasVarCol);
|
||||
buf = taosDecodeFixedI32(buf, &pBlock->info.rows);
|
||||
buf = taosDecodeFixedI32(buf, &sz);
|
||||
pBlock->pDataBlock = taosArrayInit(sz, sizeof(SColumnInfoData));
|
||||
for (int32_t i = 0; i < sz; i++) {
|
||||
SColumnInfoData data = {0};
|
||||
buf = taosDecodeFixedI16(buf, &data.info.colId);
|
||||
buf = taosDecodeFixedI16(buf, &data.info.type);
|
||||
buf = taosDecodeFixedI32(buf, &data.info.bytes);
|
||||
|
||||
if (IS_VAR_DATA_TYPE(data.info.type)) {
|
||||
buf = taosDecodeBinary(buf, (void**)&data.varmeta.offset, pBlock->info.rows * sizeof(int32_t));
|
||||
data.varmeta.length = pBlock->info.rows * sizeof(int32_t);
|
||||
data.varmeta.allocLen = data.varmeta.length;
|
||||
} else {
|
||||
buf = taosDecodeBinary(buf, (void**)&data.nullbitmap, BitmapLen(pBlock->info.rows));
|
||||
}
|
||||
|
||||
int32_t len = 0;
|
||||
buf = taosDecodeFixedI32(buf, &len);
|
||||
buf = taosDecodeBinary(buf, (void**)&data.pData, len);
|
||||
taosArrayPush(pBlock->pDataBlock, &data);
|
||||
}
|
||||
return (void*)buf;
|
||||
}
|
|
@ -2469,7 +2469,7 @@ int32_t tEncodeSMqCMCommitOffsetReq(SCoder *encoder, const SMqCMCommitOffsetReq
|
|||
int32_t tDecodeSMqCMCommitOffsetReq(SCoder *decoder, SMqCMCommitOffsetReq *pReq) {
|
||||
if (tStartDecode(decoder) < 0) return -1;
|
||||
if (tDecodeI32(decoder, &pReq->num) < 0) return -1;
|
||||
pReq->offsets = TCODER_MALLOC(pReq->num * sizeof(SMqOffset), decoder);
|
||||
TCODER_MALLOC(pReq->offsets, SMqOffset*, pReq->num * sizeof(SMqOffset), decoder);
|
||||
if (pReq->offsets == NULL) return -1;
|
||||
for (int32_t i = 0; i < pReq->num; i++) {
|
||||
tDecodeSMqOffset(decoder, &pReq->offsets[i]);
|
||||
|
|
|
@ -217,7 +217,7 @@ int32_t tNameSetDbName(SName* dst, int32_t acct, const char* dbName, size_t name
|
|||
}
|
||||
|
||||
int32_t tNameSetAcctId(SName* dst, int32_t acctId) {
|
||||
assert(dst != NULL && acct != NULL);
|
||||
assert(dst != NULL);
|
||||
dst->acctId = acctId;
|
||||
return 0;
|
||||
}
|
||||
|
|
|
@ -27,7 +27,7 @@ static struct {
|
|||
} global = {0};
|
||||
|
||||
static void dndSigintHandle(int signum, void *info, void *ctx) {
|
||||
dInfo("singal:%d is received", signum);
|
||||
dInfo("signal:%d is received", signum);
|
||||
dndHandleEvent(global.pDnode, DND_EVENT_STOP);
|
||||
}
|
||||
|
||||
|
|
|
@ -27,6 +27,22 @@ int32_t tEncodeSStreamObj(SCoder *pEncoder, const SStreamObj *pObj) {
|
|||
if (tEncodeCStr(pEncoder, pObj->sql) < 0) return -1;
|
||||
if (tEncodeCStr(pEncoder, pObj->logicalPlan) < 0) return -1;
|
||||
if (tEncodeCStr(pEncoder, pObj->physicalPlan) < 0) return -1;
|
||||
// TODO encode tasks
|
||||
if (pObj->tasks) {
|
||||
int32_t sz = taosArrayGetSize(pObj->tasks);
|
||||
tEncodeI32(pEncoder, sz);
|
||||
for (int32_t i = 0; i < sz; i++) {
|
||||
SArray *pArray = taosArrayGet(pObj->tasks, i);
|
||||
int32_t innerSz = taosArrayGetSize(pArray);
|
||||
tEncodeI32(pEncoder, innerSz);
|
||||
for (int32_t j = 0; j < innerSz; j++) {
|
||||
SStreamTask *pTask = taosArrayGet(pArray, j);
|
||||
tEncodeSStreamTask(pEncoder, pTask);
|
||||
}
|
||||
}
|
||||
} else {
|
||||
tEncodeI32(pEncoder, 0);
|
||||
}
|
||||
return pEncoder->pos;
|
||||
}
|
||||
|
||||
|
@ -42,5 +58,23 @@ int32_t tDecodeSStreamObj(SCoder *pDecoder, SStreamObj *pObj) {
|
|||
if (tDecodeCStrAlloc(pDecoder, &pObj->sql) < 0) return -1;
|
||||
if (tDecodeCStrAlloc(pDecoder, &pObj->logicalPlan) < 0) return -1;
|
||||
if (tDecodeCStrAlloc(pDecoder, &pObj->physicalPlan) < 0) return -1;
|
||||
int32_t sz;
|
||||
if (tDecodeI32(pDecoder, &sz) < 0) return -1;
|
||||
if (sz != 0) {
|
||||
pObj->tasks = taosArrayInit(sz, sizeof(SArray));
|
||||
for (int32_t i = 0; i < sz; i++) {
|
||||
int32_t innerSz;
|
||||
if (tDecodeI32(pDecoder, &innerSz) < 0) return -1;
|
||||
SArray *pArray = taosArrayInit(innerSz, sizeof(SStreamTask));
|
||||
for (int32_t j = 0; j < innerSz; j++) {
|
||||
SStreamTask task;
|
||||
if (tDecodeSStreamTask(pDecoder, &task) < 0) return -1;
|
||||
taosArrayPush(pArray, &task);
|
||||
}
|
||||
taosArrayPush(pObj->tasks, pArray);
|
||||
}
|
||||
} else {
|
||||
pObj->tasks = NULL;
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
|
|
@ -77,6 +77,32 @@ int32_t mndScheduleStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream) {
|
|||
return -1;
|
||||
}
|
||||
taosArrayPush(taskOneLevel, pTask);
|
||||
|
||||
SCoder encoder;
|
||||
tCoderInit(&encoder, TD_LITTLE_ENDIAN, NULL, 0, TD_ENCODER);
|
||||
tEncodeSStreamTask(&encoder, pTask);
|
||||
int32_t tlen = sizeof(SMsgHead) + encoder.pos;
|
||||
tCoderClear(&encoder);
|
||||
void* buf = rpcMallocCont(tlen);
|
||||
if (buf == NULL) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
return -1;
|
||||
}
|
||||
((SMsgHead*)buf)->streamTaskId = pTask->taskId;
|
||||
void* abuf = POINTER_SHIFT(buf, sizeof(SMsgHead));
|
||||
tCoderInit(&encoder, TD_LITTLE_ENDIAN, abuf, tlen, TD_ENCODER);
|
||||
tEncodeSStreamTask(&encoder, pTask);
|
||||
tCoderClear(&encoder);
|
||||
|
||||
STransAction action = {0};
|
||||
action.epSet = plan->execNode.epSet;
|
||||
action.pCont = buf;
|
||||
action.contLen = tlen;
|
||||
action.msgType = TDMT_VND_TASK_DEPLOY;
|
||||
if (mndTransAppendRedoAction(pTrans, &action) != 0) {
|
||||
rpcFreeCont(buf);
|
||||
return -1;
|
||||
}
|
||||
}
|
||||
} else if (plan->subplanType == SUBPLAN_TYPE_SCAN) {
|
||||
// duplicatable
|
||||
|
@ -101,6 +127,32 @@ int32_t mndScheduleStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream) {
|
|||
}
|
||||
|
||||
taosArrayPush(taskOneLevel, pTask);
|
||||
|
||||
SCoder encoder;
|
||||
tCoderInit(&encoder, TD_LITTLE_ENDIAN, NULL, 0, TD_ENCODER);
|
||||
tEncodeSStreamTask(&encoder, pTask);
|
||||
int32_t tlen = sizeof(SMsgHead) + encoder.pos;
|
||||
tCoderClear(&encoder);
|
||||
void* buf = rpcMallocCont(tlen);
|
||||
if (buf == NULL) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
return -1;
|
||||
}
|
||||
((SMsgHead*)buf)->streamTaskId = pTask->taskId;
|
||||
void* abuf = POINTER_SHIFT(buf, sizeof(SMsgHead));
|
||||
tCoderInit(&encoder, TD_LITTLE_ENDIAN, abuf, tlen, TD_ENCODER);
|
||||
tEncodeSStreamTask(&encoder, pTask);
|
||||
tCoderClear(&encoder);
|
||||
|
||||
STransAction action = {0};
|
||||
action.epSet = plan->execNode.epSet;
|
||||
action.pCont = buf;
|
||||
action.contLen = tlen;
|
||||
action.msgType = TDMT_SND_TASK_DEPLOY;
|
||||
if (mndTransAppendRedoAction(pTrans, &action) != 0) {
|
||||
rpcFreeCont(buf);
|
||||
return -1;
|
||||
}
|
||||
}
|
||||
} else {
|
||||
// not duplicatable
|
||||
|
@ -117,6 +169,32 @@ int32_t mndScheduleStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream) {
|
|||
return -1;
|
||||
}
|
||||
taosArrayPush(taskOneLevel, pTask);
|
||||
|
||||
SCoder encoder;
|
||||
tCoderInit(&encoder, TD_LITTLE_ENDIAN, NULL, 0, TD_ENCODER);
|
||||
tEncodeSStreamTask(&encoder, pTask);
|
||||
int32_t tlen = sizeof(SMsgHead) + encoder.pos;
|
||||
tCoderClear(&encoder);
|
||||
void* buf = rpcMallocCont(tlen);
|
||||
if (buf == NULL) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
return -1;
|
||||
}
|
||||
((SMsgHead*)buf)->streamTaskId = pTask->taskId;
|
||||
void* abuf = POINTER_SHIFT(buf, sizeof(SMsgHead));
|
||||
tCoderInit(&encoder, TD_LITTLE_ENDIAN, abuf, tlen, TD_ENCODER);
|
||||
tEncodeSStreamTask(&encoder, pTask);
|
||||
tCoderClear(&encoder);
|
||||
|
||||
STransAction action = {0};
|
||||
action.epSet = plan->execNode.epSet;
|
||||
action.pCont = buf;
|
||||
action.contLen = tlen;
|
||||
action.msgType = TDMT_SND_TASK_DEPLOY;
|
||||
if (mndTransAppendRedoAction(pTrans, &action) != 0) {
|
||||
rpcFreeCont(buf);
|
||||
return -1;
|
||||
}
|
||||
}
|
||||
taosArrayPush(pStream->tasks, taskOneLevel);
|
||||
}
|
||||
|
|
|
@ -230,6 +230,12 @@ static int32_t mndCreateStream(SMnode *pMnode, SNodeMsg *pReq, SCMCreateStreamRe
|
|||
}
|
||||
mDebug("trans:%d, used to create stream:%s", pTrans->id, pCreate->name);
|
||||
|
||||
if (mndScheduleStream(pMnode, pTrans, &streamObj) < 0) {
|
||||
mError("stream:%ld, schedule stream since %s", streamObj.uid, terrstr());
|
||||
mndTransDrop(pTrans);
|
||||
return -1;
|
||||
}
|
||||
|
||||
SSdbRaw *pRedoRaw = mndStreamActionEncode(&streamObj);
|
||||
if (pRedoRaw == NULL || mndTransAppendRedolog(pTrans, pRedoRaw) != 0) {
|
||||
mError("trans:%d, failed to append redo log since %s", pTrans->id, terrstr());
|
||||
|
@ -238,12 +244,6 @@ static int32_t mndCreateStream(SMnode *pMnode, SNodeMsg *pReq, SCMCreateStreamRe
|
|||
}
|
||||
sdbSetRawStatus(pRedoRaw, SDB_STATUS_READY);
|
||||
|
||||
if (mndScheduleStream(pMnode, pTrans, &streamObj) < 0) {
|
||||
mError("stream:%ld, schedule stream since %s", streamObj.uid, terrstr());
|
||||
mndTransDrop(pTrans);
|
||||
return -1;
|
||||
}
|
||||
|
||||
if (mndTransPrepare(pMnode, pTrans) != 0) {
|
||||
mError("trans:%d, failed to prepare since %s", pTrans->id, terrstr());
|
||||
mndTransDrop(pTrans);
|
||||
|
|
|
@ -55,6 +55,7 @@ int tqCommit(STQ*);
|
|||
int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg);
|
||||
int32_t tqProcessSetConnReq(STQ* pTq, char* msg);
|
||||
int32_t tqProcessRebReq(STQ* pTq, char* msg);
|
||||
int32_t tqProcessTaskDeploy(STQ* pTq, char* msg, int32_t msgLen);
|
||||
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
|
|
|
@ -161,6 +161,7 @@ struct STQ {
|
|||
STqMemRef tqMemRef;
|
||||
STqMetaStore* tqMeta;
|
||||
STqPushMgr* tqPushMgr;
|
||||
SHashObj* pStreamTasks;
|
||||
SWal* pWal;
|
||||
SMeta* pVnodeMeta;
|
||||
};
|
||||
|
|
|
@ -55,6 +55,8 @@ STQ* tqOpen(const char* path, SWal* pWal, SMeta* pVnodeMeta, STqCfg* tqConfig, S
|
|||
return NULL;
|
||||
}
|
||||
|
||||
pTq->pStreamTasks = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_NO_LOCK);
|
||||
|
||||
return pTq;
|
||||
}
|
||||
|
||||
|
@ -416,3 +418,18 @@ int32_t tqProcessSetConnReq(STQ* pTq, char* msg) {
|
|||
terrno = TSDB_CODE_SUCCESS;
|
||||
return 0;
|
||||
}
|
||||
|
||||
int32_t tqProcessTaskDeploy(STQ* pTq, char* msg, int32_t msgLen) {
|
||||
SStreamTask* pTask = malloc(sizeof(SStreamTask));
|
||||
if (pTask == NULL) {
|
||||
return -1;
|
||||
}
|
||||
SCoder decoder;
|
||||
tCoderInit(&decoder, TD_LITTLE_ENDIAN, (uint8_t*)msg, msgLen, TD_DECODER);
|
||||
tDecodeSStreamTask(&decoder, pTask);
|
||||
tCoderClear(&decoder);
|
||||
|
||||
taosHashPut(pTq->pStreamTasks, &pTask->taskId, sizeof(int32_t), pTask, sizeof(SStreamTask));
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
|
|
@ -20,8 +20,7 @@ static const char *TSDB_SMA_DNAME[] = {
|
|||
"tsma", // TSDB_SMA_TYPE_TIME_RANGE
|
||||
"rsma", // TSDB_SMA_TYPE_ROLLUP
|
||||
};
|
||||
#define SMA_CHECK_HASH
|
||||
#undef SMA_PRINT_DEBUG_LOG
|
||||
#undef _TEST_SMA_PRINT_DEBUG_LOG_
|
||||
#define SMA_STORAGE_TSDB_DAYS 30
|
||||
#define SMA_STORAGE_TSDB_TIMES 10
|
||||
#define SMA_STORAGE_SPLIT_HOURS 24
|
||||
|
@ -33,8 +32,8 @@ static const char *TSDB_SMA_DNAME[] = {
|
|||
#define SMA_TEST_INDEX_NAME "smaTestIndexName" // TODO: just for test
|
||||
#define SMA_TEST_INDEX_UID 2000000001 // TODO: just for test
|
||||
typedef enum {
|
||||
SMA_STORAGE_LEVEL_TSDB = 0, // use days of self-defined e.g. vnode${N}/tsdb/tsma/sma_index_uid/v2t200.dat
|
||||
SMA_STORAGE_LEVEL_DFILESET = 1 // use days of TS data e.g. vnode${N}/tsdb/rsma/sma_index_uid/v2r200.dat
|
||||
SMA_STORAGE_LEVEL_TSDB = 0, // use days of self-defined e.g. vnode${N}/tsdb/tsma/sma_index_uid/v2f200.tsma
|
||||
SMA_STORAGE_LEVEL_DFILESET = 1 // use days of TS data e.g. vnode${N}/tsdb/tsma/sma_index_uid/v2f1906.tsma
|
||||
} ESmaStorageLevel;
|
||||
|
||||
typedef struct {
|
||||
|
@ -47,6 +46,7 @@ typedef struct {
|
|||
int32_t iter;
|
||||
int32_t fid;
|
||||
} SmaFsIter;
|
||||
|
||||
typedef struct {
|
||||
STsdb * pTsdb;
|
||||
SDBFile dFile;
|
||||
|
@ -71,11 +71,12 @@ typedef struct {
|
|||
} SSmaStatItem;
|
||||
|
||||
struct SSmaStat {
|
||||
SHashObj *smaStatItems; // key: indexName, value: SSmaStatItem
|
||||
SHashObj *smaStatItems; // key: indexUid, value: SSmaStatItem
|
||||
T_REF_DECLARE()
|
||||
};
|
||||
|
||||
// declaration of static functions
|
||||
|
||||
// expired window
|
||||
static int32_t tsdbUpdateExpiredWindow(STsdb *pTsdb, ETsdbSmaType smaType, char *msg);
|
||||
static int32_t tsdbInitSmaStat(SSmaStat **pSmaStat);
|
||||
|
@ -159,22 +160,12 @@ static int32_t tsdbInitSmaEnv(STsdb *pTsdb, const char *path, SSmaEnv **pEnv) {
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
if (tsdbLockRepo(pTsdb) != 0) {
|
||||
return TSDB_CODE_FAILED;
|
||||
}
|
||||
|
||||
if (*pEnv == NULL) { // 2nd phase check
|
||||
if (*pEnv == NULL) {
|
||||
if ((*pEnv = tsdbNewSmaEnv(pTsdb, path)) == NULL) {
|
||||
tsdbUnlockRepo(pTsdb);
|
||||
return TSDB_CODE_FAILED;
|
||||
}
|
||||
}
|
||||
|
||||
if (tsdbUnlockRepo(pTsdb) != 0) {
|
||||
*pEnv = tsdbFreeSmaEnv(*pEnv);
|
||||
return TSDB_CODE_FAILED;
|
||||
}
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
|
@ -222,11 +213,14 @@ static int32_t tsdbInitSmaStat(SSmaStat **pSmaStat) {
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
// TODO: lock. lazy mode when update expired window, or hungry mode during tsdbNew.
|
||||
/**
|
||||
* 1. Lazy mode utilized when init SSmaStat to update expired window(or hungry mode when tsdbNew).
|
||||
* 2. Currently, there is mutex lock when init SSmaEnv, thus no need add lock on SSmaStat, and please add lock if
|
||||
* tsdbInitSmaStat invoked in other multithread environment later.
|
||||
*/
|
||||
if (*pSmaStat == NULL) {
|
||||
*pSmaStat = (SSmaStat *)calloc(1, sizeof(SSmaStat));
|
||||
if (*pSmaStat == NULL) {
|
||||
// TODO: unlock
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
return TSDB_CODE_FAILED;
|
||||
}
|
||||
|
@ -236,11 +230,9 @@ static int32_t tsdbInitSmaStat(SSmaStat **pSmaStat) {
|
|||
|
||||
if ((*pSmaStat)->smaStatItems == NULL) {
|
||||
tfree(*pSmaStat);
|
||||
// TODO: unlock
|
||||
return TSDB_CODE_FAILED;
|
||||
}
|
||||
}
|
||||
// TODO: unlock
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
|
@ -279,14 +271,17 @@ int32_t tsdbDestroySmaState(SSmaStat *pSmaStat) {
|
|||
}
|
||||
|
||||
static int32_t tsdbCheckAndInitSmaEnv(STsdb *pTsdb, int8_t smaType) {
|
||||
SSmaEnv *pEnv = NULL;
|
||||
|
||||
// return if already init
|
||||
switch (smaType) {
|
||||
case TSDB_SMA_TYPE_TIME_RANGE:
|
||||
if (pTsdb->pTSmaEnv) {
|
||||
if ((pEnv = (SSmaEnv *)atomic_load_ptr(&pTsdb->pTSmaEnv)) != NULL) {
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
break;
|
||||
case TSDB_SMA_TYPE_ROLLUP:
|
||||
if (pTsdb->pRSmaEnv) {
|
||||
if ((pEnv = (SSmaEnv *)atomic_load_ptr(&pTsdb->pRSmaEnv)) != NULL) {
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
break;
|
||||
|
@ -295,18 +290,38 @@ static int32_t tsdbCheckAndInitSmaEnv(STsdb *pTsdb, int8_t smaType) {
|
|||
return TSDB_CODE_FAILED;
|
||||
}
|
||||
|
||||
// SDiskID did = {0};
|
||||
SSmaEnv *pEnv = NULL;
|
||||
char smaPath[TSDB_FILENAME_LEN] = "/proj/.sma/";
|
||||
if (tsdbInitSmaEnv(pTsdb, smaPath, &pEnv) != TSDB_CODE_SUCCESS) {
|
||||
// init sma env
|
||||
tsdbLockRepo(pTsdb);
|
||||
if (pTsdb->pTSmaEnv == NULL) {
|
||||
char rname[TSDB_FILENAME_LEN] = {0};
|
||||
char aname[TSDB_FILENAME_LEN * 2 + 32] = {0}; // TODO: make TMPNAME_LEN public as TSDB_FILENAME_LEN?
|
||||
|
||||
SDiskID did = {0};
|
||||
tfsAllocDisk(pTsdb->pTfs, TFS_PRIMARY_LEVEL, &did);
|
||||
if (did.level < 0 || did.id < 0) {
|
||||
tsdbUnlockRepo(pTsdb);
|
||||
return TSDB_CODE_FAILED;
|
||||
}
|
||||
tsdbGetSmaDir(REPO_ID(pTsdb), smaType, rname);
|
||||
tfsAbsoluteName(pTsdb->pTfs, did, rname, aname);
|
||||
|
||||
if (tfsMkdirRecurAt(pTsdb->pTfs, rname, did) != TSDB_CODE_SUCCESS) {
|
||||
tsdbUnlockRepo(pTsdb);
|
||||
return TSDB_CODE_FAILED;
|
||||
}
|
||||
|
||||
if (tsdbInitSmaEnv(pTsdb, aname, &pEnv) != TSDB_CODE_SUCCESS) {
|
||||
tsdbUnlockRepo(pTsdb);
|
||||
return TSDB_CODE_FAILED;
|
||||
}
|
||||
|
||||
if (smaType == TSDB_SMA_TYPE_TIME_RANGE) {
|
||||
pTsdb->pTSmaEnv = pEnv;
|
||||
atomic_store_ptr(&pTsdb->pTSmaEnv, pEnv);
|
||||
} else {
|
||||
pTsdb->pRSmaEnv = pEnv;
|
||||
atomic_store_ptr(&pTsdb->pRSmaEnv, pEnv);
|
||||
}
|
||||
}
|
||||
tsdbUnlockRepo(pTsdb);
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
};
|
||||
|
@ -379,7 +394,7 @@ int32_t tsdbUpdateExpiredWindow(STsdb *pTsdb, ETsdbSmaType smaType, char *msg) {
|
|||
|
||||
int8_t state = TSDB_SMA_STAT_EXPIRED;
|
||||
for (int32_t i = 0; i < SMA_TEST_EXPIRED_WINDOW_SIZE; ++i) {
|
||||
if (taosHashPut(pItem->expiredWindows, (void *)(expiredWindows + i), sizeof(TSKEY), &state, sizeof(state)) != 0) {
|
||||
if (taosHashPut(pItem->expiredWindows, expiredWindows + i, sizeof(TSKEY), &state, sizeof(state)) != 0) {
|
||||
// If error occurs during taosHashPut expired windows, remove the smaIndex from pTsdb->pSmaStat, thus TSDB would
|
||||
// tell query module to query raw TS data.
|
||||
// N.B.
|
||||
|
@ -497,12 +512,12 @@ static int32_t tsdbInsertTSmaBlocks(STSmaWriteH *pSmaH, void *smaKey, uint32_t k
|
|||
return TSDB_CODE_FAILED;
|
||||
}
|
||||
|
||||
#ifdef SMA_PRINT_DEBUG_LOG
|
||||
#ifdef _TEST_SMA_PRINT_DEBUG_LOG_
|
||||
uint32_t valueSize = 0;
|
||||
void * data = tsdbGetSmaDataByKey(pDBFile, smaKey, keyLen, &valueSize);
|
||||
ASSERT(data != NULL);
|
||||
for (uint32_t v = 0; v < valueSize; v += 8) {
|
||||
tsdbWarn("vgId:%d sma data - val[%d] is %" PRIi64, REPO_ID(pSmaH->pTsdb), v, *(int64_t *)POINTER_SHIFT(data, v));
|
||||
tsdbWarn("vgId:%d insert sma data val[%d] %" PRIi64, REPO_ID(pSmaH->pTsdb), v, *(int64_t *)POINTER_SHIFT(data, v));
|
||||
}
|
||||
#endif
|
||||
return TSDB_CODE_SUCCESS;
|
||||
|
@ -935,12 +950,14 @@ static int32_t tsdbGetTSmaDataImpl(STsdb *pTsdb, STSmaDataWrapper *pData, int64_
|
|||
tsdbCloseDBF(&tReadH.dFile);
|
||||
return TSDB_CODE_FAILED;
|
||||
}
|
||||
tfree(result);
|
||||
#ifdef SMA_PRINT_DEBUG_LOG
|
||||
|
||||
#ifdef _TEST_SMA_PRINT_DEBUG_LOG_
|
||||
for (uint32_t v = 0; v < valueSize; v += 8) {
|
||||
tsdbWarn("vgId:%d v[%d]=%" PRIi64, REPO_ID(pTsdb), v, *(int64_t *)POINTER_SHIFT(result, v));
|
||||
tsdbWarn("vgId:%d get sma data v[%d]=%" PRIi64, REPO_ID(pTsdb), v, *(int64_t *)POINTER_SHIFT(result, v));
|
||||
}
|
||||
#endif
|
||||
tfree(result); // TODO: fill the result to output
|
||||
|
||||
#if 0
|
||||
int32_t nResult = 0;
|
||||
int64_t lastKey = 0;
|
||||
|
|
|
@ -132,6 +132,11 @@ int vnodeApplyWMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) {
|
|||
if (tqProcessRebReq(pVnode->pTq, POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead))) < 0) {
|
||||
}
|
||||
} break;
|
||||
case TDMT_VND_TASK_DEPLOY: {
|
||||
if (tqProcessTaskDeploy(pVnode->pTq, POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)),
|
||||
pMsg->contLen - sizeof(SMsgHead)) < 0) {
|
||||
}
|
||||
} break;
|
||||
case TDMT_VND_CREATE_SMA: { // timeRangeSMA
|
||||
SSmaCfg vCreateSmaReq = {0};
|
||||
if (tDeserializeSVCreateTSmaReq(POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)), &vCreateSmaReq) == NULL) {
|
||||
|
|
|
@ -301,8 +301,14 @@ TEST(testCase, tSma_Data_Insert_Query_Test) {
|
|||
break;
|
||||
}
|
||||
|
||||
SDiskCfg pDisks = {.level = 0, .primary = 1};
|
||||
strncpy(pDisks.dir, "/var/lib/taos", TSDB_FILENAME_LEN);
|
||||
int32_t numOfDisks = 1;
|
||||
tsdb.pTfs = tfsOpen(&pDisks, numOfDisks);
|
||||
ASSERT_NE(tsdb.pTfs, nullptr);
|
||||
|
||||
char *msg = (char *)calloc(1, 100);
|
||||
assert(msg != NULL);
|
||||
ASSERT_NE(msg, nullptr);
|
||||
ASSERT_EQ(tsdbUpdateSmaWindow(&tsdb, TSDB_SMA_TYPE_TIME_RANGE, msg), 0);
|
||||
|
||||
// init
|
||||
|
|
|
@ -2349,6 +2349,9 @@ _return:
|
|||
CTG_API_LEAVE(code);
|
||||
}
|
||||
|
||||
int32_t catalogUpdateVgEpSet(SCatalog* pCtg, const char* dbFName, int32_t vgId, SEpSet *epSet) {
|
||||
|
||||
}
|
||||
|
||||
int32_t catalogRemoveDB(SCatalog* pCtg, const char* dbFName, uint64_t dbId) {
|
||||
CTG_API_ENTER();
|
||||
|
@ -2394,6 +2397,9 @@ _return:
|
|||
CTG_API_LEAVE(code);
|
||||
}
|
||||
|
||||
int32_t catalogGetIndexMeta(SCatalog* pCtg, void *pTrans, const SEpSet* pMgmtEps, const SName* pTableName, const char *pIndexName, SIndexMeta** pIndexMeta) {
|
||||
|
||||
}
|
||||
|
||||
int32_t catalogGetTableMeta(SCatalog* pCtg, void *pTrans, const SEpSet* pMgmtEps, const SName* pTableName, STableMeta** pTableMeta) {
|
||||
CTG_API_ENTER();
|
||||
|
@ -2663,11 +2669,14 @@ _return:
|
|||
int32_t catalogGetQnodeList(SCatalog* pCtg, void *pRpc, const SEpSet* pMgmtEps, SArray* pQnodeList) {
|
||||
CTG_API_ENTER();
|
||||
|
||||
int32_t code = 0;
|
||||
if (NULL == pCtg || NULL == pRpc || NULL == pMgmtEps || NULL == pQnodeList) {
|
||||
CTG_API_LEAVE(TSDB_CODE_CTG_INVALID_INPUT);
|
||||
}
|
||||
|
||||
//TODO
|
||||
CTG_ERR_JRET(ctgGetQnodeListFromMnode(pCtg, pRpc, pMgmtEps, &pQnodeList));
|
||||
|
||||
_return:
|
||||
|
||||
CTG_API_LEAVE(TSDB_CODE_SUCCESS);
|
||||
}
|
||||
|
|
|
@ -89,7 +89,7 @@ static SSubLogicPlan* stsCreateScanSubplan(SSplitContext* pCxt, SScanLogicNode*
|
|||
pSubplan->id.groupId = pCxt->groupId;
|
||||
pSubplan->subplanType = SUBPLAN_TYPE_SCAN;
|
||||
pSubplan->pNode = (SLogicNode*)nodesCloneNode(pScan);
|
||||
TSWAP(pSubplan->pVgroupList, ((SScanLogicNode*)pSubplan->pNode)->pVgroupList, SVgroupsInfo);
|
||||
TSWAP(pSubplan->pVgroupList, ((SScanLogicNode*)pSubplan->pNode)->pVgroupList, SVgroupsInfo*);
|
||||
SPLIT_FLAG_SET_MASK(pSubplan->splitFlag, SPLIT_FLAG_STS);
|
||||
return pSubplan;
|
||||
}
|
||||
|
|
|
@ -163,15 +163,15 @@ typedef struct SyncClientRequest {
|
|||
} SyncClientRequest;
|
||||
|
||||
SyncClientRequest* syncClientRequestBuild(uint32_t dataLen);
|
||||
SyncClientRequest* syncClientRequestBuild2(const SRpcMsg* pOriginalRpcMsg, uint64_t seqNum, bool isWeak);
|
||||
SyncClientRequest* syncClientRequestBuild2(const SRpcMsg* pOriginalRpcMsg, uint64_t seqNum, bool isWeak); // step 1
|
||||
void syncClientRequestDestroy(SyncClientRequest* pMsg);
|
||||
void syncClientRequestSerialize(const SyncClientRequest* pMsg, char* buf, uint32_t bufLen);
|
||||
void syncClientRequestDeserialize(const char* buf, uint32_t len, SyncClientRequest* pMsg);
|
||||
char* syncClientRequestSerialize2(const SyncClientRequest* pMsg, uint32_t* len);
|
||||
SyncClientRequest* syncClientRequestDeserialize2(const char* buf, uint32_t len);
|
||||
void syncClientRequest2RpcMsg(const SyncClientRequest* pMsg, SRpcMsg* pRpcMsg);
|
||||
void syncClientRequest2RpcMsg(const SyncClientRequest* pMsg, SRpcMsg* pRpcMsg); // step 2
|
||||
void syncClientRequestFromRpcMsg(const SRpcMsg* pRpcMsg, SyncClientRequest* pMsg);
|
||||
SyncClientRequest* syncClientRequestFromRpcMsg2(const SRpcMsg* pRpcMsg);
|
||||
SyncClientRequest* syncClientRequestFromRpcMsg2(const SRpcMsg* pRpcMsg); // step 3
|
||||
cJSON* syncClientRequest2Json(const SyncClientRequest* pMsg);
|
||||
char* syncClientRequest2Str(const SyncClientRequest* pMsg);
|
||||
|
||||
|
|
|
@ -40,12 +40,13 @@ typedef struct SSyncRaftEntry {
|
|||
} SSyncRaftEntry;
|
||||
|
||||
SSyncRaftEntry* syncEntryBuild(uint32_t dataLen);
|
||||
SSyncRaftEntry* syncEntryBuild2(SyncClientRequest* pMsg, SyncTerm term, SyncIndex index);
|
||||
SSyncRaftEntry* syncEntryBuild2(SyncClientRequest* pMsg, SyncTerm term, SyncIndex index); // step 4
|
||||
void syncEntryDestory(SSyncRaftEntry* pEntry);
|
||||
char* syncEntrySerialize(const SSyncRaftEntry* pEntry, uint32_t* len);
|
||||
SSyncRaftEntry* syncEntryDeserialize(const char* buf, uint32_t len);
|
||||
char* syncEntrySerialize(const SSyncRaftEntry* pEntry, uint32_t* len); // step 5
|
||||
SSyncRaftEntry* syncEntryDeserialize(const char* buf, uint32_t len); // step 6
|
||||
cJSON* syncEntry2Json(const SSyncRaftEntry* pEntry);
|
||||
char* syncEntry2Str(const SSyncRaftEntry* pEntry);
|
||||
void syncEntry2OriginalRpc(const SSyncRaftEntry* pEntry, SRpcMsg* pRpcMsg); // step 7
|
||||
|
||||
// for debug ----------------------
|
||||
void syncEntryPrint(const SSyncRaftEntry* pObj);
|
||||
|
|
|
@ -101,7 +101,7 @@ int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg) {
|
|||
assert(pMsg->dataLen >= 0);
|
||||
|
||||
SyncTerm localPreLogTerm = 0;
|
||||
if (pMsg->prevLogTerm >= SYNC_INDEX_BEGIN && pMsg->prevLogTerm <= ths->pLogStore->getLastIndex(ths->pLogStore)) {
|
||||
if (pMsg->prevLogIndex >= SYNC_INDEX_BEGIN && pMsg->prevLogIndex <= ths->pLogStore->getLastIndex(ths->pLogStore)) {
|
||||
SSyncRaftEntry* pEntry = logStoreGetEntry(ths->pLogStore, pMsg->prevLogIndex);
|
||||
assert(pEntry != NULL);
|
||||
localPreLogTerm = pEntry->term;
|
||||
|
@ -174,7 +174,12 @@ int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg) {
|
|||
pReply->destId = pMsg->srcId;
|
||||
pReply->term = ths->pRaftStore->currentTerm;
|
||||
pReply->success = true;
|
||||
|
||||
if (pMsg->dataLen > 0) {
|
||||
pReply->matchIndex = pMsg->prevLogIndex + 1;
|
||||
} else {
|
||||
pReply->matchIndex = pMsg->prevLogIndex;
|
||||
}
|
||||
|
||||
SRpcMsg rpcMsg;
|
||||
syncAppendEntriesReply2RpcMsg(pReply, &rpcMsg);
|
||||
|
|
|
@ -39,7 +39,8 @@ int32_t syncNodeOnAppendEntriesReplyCb(SSyncNode* ths, SyncAppendEntriesReply* p
|
|||
syncAppendEntriesReplyLog2("==syncNodeOnAppendEntriesReplyCb==", pMsg);
|
||||
|
||||
if (pMsg->term < ths->pRaftStore->currentTerm) {
|
||||
sTrace("DropStaleResponse, receive term:%" PRIu64 ", current term:%" PRIu64 "", pMsg->term, ths->pRaftStore->currentTerm);
|
||||
sTrace("DropStaleResponse, receive term:%" PRIu64 ", current term:%" PRIu64 "", pMsg->term,
|
||||
ths->pRaftStore->currentTerm);
|
||||
return ret;
|
||||
}
|
||||
|
||||
|
|
|
@ -15,6 +15,7 @@
|
|||
|
||||
#include "syncIndexMgr.h"
|
||||
#include "syncInt.h"
|
||||
#include "syncRaftLog.h"
|
||||
|
||||
// \* Leader i advances its commitIndex.
|
||||
// \* This is done as a separate step from handling AppendEntries responses,
|
||||
|
@ -42,4 +43,27 @@
|
|||
void syncNodeMaybeAdvanceCommitIndex(SSyncNode* pSyncNode) {
|
||||
syncIndexMgrLog2("==syncNodeMaybeAdvanceCommitIndex== pNextIndex", pSyncNode->pNextIndex);
|
||||
syncIndexMgrLog2("==syncNodeMaybeAdvanceCommitIndex== pMatchIndex", pSyncNode->pMatchIndex);
|
||||
|
||||
// update commit index
|
||||
|
||||
if (pSyncNode->pFsm != NULL) {
|
||||
SyncIndex beginIndex = SYNC_INDEX_INVALID;
|
||||
SyncIndex endIndex = SYNC_INDEX_INVALID;
|
||||
for (SyncIndex i = beginIndex; i <= endIndex; ++i) {
|
||||
if (i != SYNC_INDEX_INVALID) {
|
||||
SSyncRaftEntry* pEntry = pSyncNode->pLogStore->getEntry(pSyncNode->pLogStore, i);
|
||||
assert(pEntry != NULL);
|
||||
|
||||
SRpcMsg rpcMsg;
|
||||
syncEntry2OriginalRpc(pEntry, &rpcMsg);
|
||||
|
||||
if (pSyncNode->pFsm->FpCommitCb != NULL) {
|
||||
pSyncNode->pFsm->FpCommitCb(pSyncNode->pFsm, &rpcMsg, pEntry->index, pEntry->isWeak, 0);
|
||||
}
|
||||
|
||||
rpcFreeCont(rpcMsg.pCont);
|
||||
syncEntryDestory(pEntry);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
|
@ -54,8 +54,9 @@ static void syncEnvTick(void *param, void *tmrId) {
|
|||
SSyncEnv *pSyncEnv = (SSyncEnv *)param;
|
||||
if (atomic_load_64(&pSyncEnv->envTickTimerLogicClockUser) <= atomic_load_64(&pSyncEnv->envTickTimerLogicClock)) {
|
||||
++(pSyncEnv->envTickTimerCounter);
|
||||
sTrace(
|
||||
"syncEnvTick do ... envTickTimerLogicClockUser:%" PRIu64 ", envTickTimerLogicClock:%" PRIu64 ", envTickTimerCounter:%" PRIu64 ", "
|
||||
sTrace("syncEnvTick do ... envTickTimerLogicClockUser:%" PRIu64 ", envTickTimerLogicClock:%" PRIu64
|
||||
", envTickTimerCounter:%" PRIu64
|
||||
", "
|
||||
"envTickTimerMS:%d, tmrId:%p",
|
||||
pSyncEnv->envTickTimerLogicClockUser, pSyncEnv->envTickTimerLogicClock, pSyncEnv->envTickTimerCounter,
|
||||
pSyncEnv->envTickTimerMS, tmrId);
|
||||
|
@ -63,8 +64,9 @@ static void syncEnvTick(void *param, void *tmrId) {
|
|||
// do something, tick ...
|
||||
taosTmrReset(syncEnvTick, pSyncEnv->envTickTimerMS, pSyncEnv, pSyncEnv->pTimerManager, &pSyncEnv->pEnvTickTimer);
|
||||
} else {
|
||||
sTrace(
|
||||
"syncEnvTick pass ... envTickTimerLogicClockUser:%" PRIu64 ", envTickTimerLogicClock:%" PRIu64 ", envTickTimerCounter:%" PRIu64 ", "
|
||||
sTrace("syncEnvTick pass ... envTickTimerLogicClockUser:%" PRIu64 ", envTickTimerLogicClock:%" PRIu64
|
||||
", envTickTimerCounter:%" PRIu64
|
||||
", "
|
||||
"envTickTimerMS:%d, tmrId:%p",
|
||||
pSyncEnv->envTickTimerLogicClockUser, pSyncEnv->envTickTimerLogicClock, pSyncEnv->envTickTimerCounter,
|
||||
pSyncEnv->envTickTimerMS, tmrId);
|
||||
|
|
|
@ -731,14 +731,44 @@ static int32_t syncNodeOnClientRequestCb(SSyncNode* ths, SyncClientRequest* pMsg
|
|||
int32_t ret = 0;
|
||||
syncClientRequestLog2("==syncNodeOnClientRequestCb==", pMsg);
|
||||
|
||||
SyncIndex index = ths->pLogStore->getLastIndex(ths->pLogStore) + 1;
|
||||
SyncTerm term = ths->pRaftStore->currentTerm;
|
||||
SSyncRaftEntry* pEntry = syncEntryBuild2((SyncClientRequest*)pMsg, term, index);
|
||||
assert(pEntry != NULL);
|
||||
|
||||
if (ths->state == TAOS_SYNC_STATE_LEADER) {
|
||||
SSyncRaftEntry* pEntry = syncEntryDeserialize(pMsg->data, pMsg->dataLen);
|
||||
ths->pLogStore->appendEntry(ths->pLogStore, pEntry);
|
||||
|
||||
// only myself, maybe commit
|
||||
syncNodeMaybeAdvanceCommitIndex(ths);
|
||||
|
||||
// start replicate right now!
|
||||
syncNodeReplicate(ths);
|
||||
syncEntryDestory(pEntry);
|
||||
|
||||
// pre commit
|
||||
SRpcMsg rpcMsg;
|
||||
syncEntry2OriginalRpc(pEntry, &rpcMsg);
|
||||
|
||||
if (ths->pFsm != NULL) {
|
||||
if (ths->pFsm->FpPreCommitCb != NULL) {
|
||||
ths->pFsm->FpPreCommitCb(ths->pFsm, &rpcMsg, pEntry->index, pEntry->isWeak, 0);
|
||||
}
|
||||
}
|
||||
rpcFreeCont(rpcMsg.pCont);
|
||||
|
||||
} else {
|
||||
// ths->pFsm->FpCommitCb(-1)
|
||||
// pre commit
|
||||
SRpcMsg rpcMsg;
|
||||
syncEntry2OriginalRpc(pEntry, &rpcMsg);
|
||||
|
||||
if (ths->pFsm != NULL) {
|
||||
if (ths->pFsm->FpPreCommitCb != NULL) {
|
||||
ths->pFsm->FpPreCommitCb(ths->pFsm, &rpcMsg, pEntry->index, pEntry->isWeak, -1);
|
||||
}
|
||||
}
|
||||
rpcFreeCont(rpcMsg.pCont);
|
||||
}
|
||||
|
||||
syncEntryDestory(pEntry);
|
||||
return ret;
|
||||
}
|
||||
|
|
|
@ -76,7 +76,7 @@ cJSON* syncRpcMsg2Json(SRpcMsg* pRpcMsg) {
|
|||
free(s);
|
||||
|
||||
} else {
|
||||
pRoot = syncRpcUnknownMsg2Json();
|
||||
pRoot = cJSON_CreateObject();
|
||||
char* s;
|
||||
s = syncUtilprintBin((char*)(pRpcMsg->pCont), pRpcMsg->contLen);
|
||||
cJSON_AddStringToObject(pRoot, "pCont", s);
|
||||
|
@ -608,6 +608,7 @@ SyncClientRequest* syncClientRequestBuild(uint32_t dataLen) {
|
|||
return pMsg;
|
||||
}
|
||||
|
||||
// step 1. original SRpcMsg => SyncClientRequest, add seqNum, isWeak
|
||||
SyncClientRequest* syncClientRequestBuild2(const SRpcMsg* pOriginalRpcMsg, uint64_t seqNum, bool isWeak) {
|
||||
SyncClientRequest* pMsg = syncClientRequestBuild(pOriginalRpcMsg->contLen);
|
||||
pMsg->originalRpcType = pOriginalRpcMsg->msgType;
|
||||
|
@ -652,6 +653,7 @@ SyncClientRequest* syncClientRequestDeserialize2(const char* buf, uint32_t len)
|
|||
return pMsg;
|
||||
}
|
||||
|
||||
// step 2. SyncClientRequest => RpcMsg, to queue
|
||||
void syncClientRequest2RpcMsg(const SyncClientRequest* pMsg, SRpcMsg* pRpcMsg) {
|
||||
memset(pRpcMsg, 0, sizeof(*pRpcMsg));
|
||||
pRpcMsg->msgType = pMsg->msgType;
|
||||
|
@ -664,6 +666,7 @@ void syncClientRequestFromRpcMsg(const SRpcMsg* pRpcMsg, SyncClientRequest* pMsg
|
|||
syncClientRequestDeserialize(pRpcMsg->pCont, pRpcMsg->contLen, pMsg);
|
||||
}
|
||||
|
||||
// step 3. RpcMsg => SyncClientRequest, from queue
|
||||
SyncClientRequest* syncClientRequestFromRpcMsg2(const SRpcMsg* pRpcMsg) {
|
||||
SyncClientRequest* pMsg = syncClientRequestDeserialize2(pRpcMsg->pCont, pRpcMsg->contLen);
|
||||
return pMsg;
|
||||
|
|
|
@ -26,6 +26,7 @@ SSyncRaftEntry* syncEntryBuild(uint32_t dataLen) {
|
|||
return pEntry;
|
||||
}
|
||||
|
||||
// step 4. SyncClientRequest => SSyncRaftEntry, add term, index
|
||||
SSyncRaftEntry* syncEntryBuild2(SyncClientRequest* pMsg, SyncTerm term, SyncIndex index) {
|
||||
SSyncRaftEntry* pEntry = syncEntryBuild(pMsg->dataLen);
|
||||
assert(pEntry != NULL);
|
||||
|
@ -48,6 +49,7 @@ void syncEntryDestory(SSyncRaftEntry* pEntry) {
|
|||
}
|
||||
}
|
||||
|
||||
// step 5. SSyncRaftEntry => bin, to raft log
|
||||
char* syncEntrySerialize(const SSyncRaftEntry* pEntry, uint32_t* len) {
|
||||
char* buf = malloc(pEntry->bytes);
|
||||
assert(buf != NULL);
|
||||
|
@ -58,6 +60,7 @@ char* syncEntrySerialize(const SSyncRaftEntry* pEntry, uint32_t* len) {
|
|||
return buf;
|
||||
}
|
||||
|
||||
// step 6. bin => SSyncRaftEntry, from raft log
|
||||
SSyncRaftEntry* syncEntryDeserialize(const char* buf, uint32_t len) {
|
||||
uint32_t bytes = *((uint32_t*)buf);
|
||||
SSyncRaftEntry* pEntry = malloc(bytes);
|
||||
|
@ -106,6 +109,15 @@ char* syncEntry2Str(const SSyncRaftEntry* pEntry) {
|
|||
return serialized;
|
||||
}
|
||||
|
||||
// step 7. SSyncRaftEntry => original SRpcMsg, commit to user, delete seqNum, isWeak, term, index
|
||||
void syncEntry2OriginalRpc(const SSyncRaftEntry* pEntry, SRpcMsg* pRpcMsg) {
|
||||
memset(pRpcMsg, 0, sizeof(*pRpcMsg));
|
||||
pRpcMsg->msgType = pEntry->originalRpcType;
|
||||
pRpcMsg->contLen = pEntry->dataLen;
|
||||
pRpcMsg->pCont = rpcMallocCont(pRpcMsg->contLen);
|
||||
memcpy(pRpcMsg->pCont, pEntry->data, pRpcMsg->contLen);
|
||||
}
|
||||
|
||||
// for debug ----------------------
|
||||
void syncEntryPrint(const SSyncRaftEntry* pObj) {
|
||||
char* serialized = syncEntry2Str(pObj);
|
||||
|
|
|
@ -73,7 +73,8 @@ int32_t syncNodeAppendEntriesPeers(SSyncNode* pSyncNode) {
|
|||
SyncAppendEntries* pMsg = NULL;
|
||||
SSyncRaftEntry* pEntry = logStoreGetEntry(pSyncNode->pLogStore, nextIndex);
|
||||
if (pEntry != NULL) {
|
||||
SyncAppendEntries* pMsg = syncAppendEntriesBuild(pEntry->bytes);
|
||||
pMsg = syncAppendEntriesBuild(pEntry->bytes);
|
||||
assert(pMsg != NULL);
|
||||
|
||||
// add pEntry into msg
|
||||
uint32_t len;
|
||||
|
@ -86,9 +87,11 @@ int32_t syncNodeAppendEntriesPeers(SSyncNode* pSyncNode) {
|
|||
|
||||
} else {
|
||||
// maybe overflow, send empty record
|
||||
SyncAppendEntries* pMsg = syncAppendEntriesBuild(0);
|
||||
pMsg = syncAppendEntriesBuild(0);
|
||||
assert(pMsg != NULL);
|
||||
}
|
||||
|
||||
assert(pMsg != NULL);
|
||||
pMsg->srcId = pSyncNode->myRaftId;
|
||||
pMsg->destId = *pDestId;
|
||||
pMsg->term = pSyncNode->pRaftStore->currentTerm;
|
||||
|
|
|
@ -41,7 +41,8 @@ int32_t syncNodeOnRequestVoteReplyCb(SSyncNode* ths, SyncRequestVoteReply* pMsg)
|
|||
syncRequestVoteReplyLog2("==syncNodeOnRequestVoteReplyCb==", pMsg);
|
||||
|
||||
if (pMsg->term < ths->pRaftStore->currentTerm) {
|
||||
sTrace("DropStaleResponse, receive term:%" PRIu64 ", current term:%" PRIu64 "", pMsg->term, ths->pRaftStore->currentTerm);
|
||||
sTrace("DropStaleResponse, receive term:%" PRIu64 ", current term:%" PRIu64 "", pMsg->term,
|
||||
ths->pRaftStore->currentTerm);
|
||||
return ret;
|
||||
}
|
||||
|
||||
|
|
|
@ -14,15 +14,12 @@
|
|||
*/
|
||||
|
||||
#include "syncUtil.h"
|
||||
#include <arpa/inet.h>
|
||||
#include <netinet/in.h>
|
||||
#include <sys/socket.h>
|
||||
#include "syncEnv.h"
|
||||
|
||||
// ---- encode / decode
|
||||
uint64_t syncUtilAddr2U64(const char* host, uint16_t port) {
|
||||
uint64_t u64;
|
||||
uint32_t hostU32 = (uint32_t)inet_addr(host);
|
||||
uint32_t hostU32 = (uint32_t)taosInetAddr(host);
|
||||
// assert(hostU32 != (uint32_t)-1);
|
||||
u64 = (((uint64_t)hostU32) << 32) | (((uint32_t)port) << 16);
|
||||
return u64;
|
||||
|
@ -33,7 +30,7 @@ void syncUtilU642Addr(uint64_t u64, char* host, size_t len, uint16_t* port) {
|
|||
|
||||
struct in_addr addr;
|
||||
addr.s_addr = hostU32;
|
||||
snprintf(host, len, "%s", inet_ntoa(addr));
|
||||
snprintf(host, len, "%s", taosInetNtoa(addr));
|
||||
*port = (uint16_t)((u64 & 0x00000000FFFF0000) >> 16);
|
||||
}
|
||||
|
||||
|
|
|
@ -28,6 +28,8 @@ add_executable(syncRpcMsgTest "")
|
|||
add_executable(syncPingTimerTest2 "")
|
||||
add_executable(syncPingSelfTest "")
|
||||
add_executable(syncElectTest "")
|
||||
add_executable(syncEncodeTest "")
|
||||
add_executable(syncWriteTest "")
|
||||
|
||||
|
||||
target_sources(syncTest
|
||||
|
@ -150,6 +152,14 @@ target_sources(syncElectTest
|
|||
PRIVATE
|
||||
"syncElectTest.cpp"
|
||||
)
|
||||
target_sources(syncEncodeTest
|
||||
PRIVATE
|
||||
"syncEncodeTest.cpp"
|
||||
)
|
||||
target_sources(syncWriteTest
|
||||
PRIVATE
|
||||
"syncWriteTest.cpp"
|
||||
)
|
||||
|
||||
|
||||
target_include_directories(syncTest
|
||||
|
@ -307,6 +317,16 @@ target_include_directories(syncElectTest
|
|||
"${CMAKE_SOURCE_DIR}/include/libs/sync"
|
||||
"${CMAKE_CURRENT_SOURCE_DIR}/../inc"
|
||||
)
|
||||
target_include_directories(syncEncodeTest
|
||||
PUBLIC
|
||||
"${CMAKE_SOURCE_DIR}/include/libs/sync"
|
||||
"${CMAKE_CURRENT_SOURCE_DIR}/../inc"
|
||||
)
|
||||
target_include_directories(syncWriteTest
|
||||
PUBLIC
|
||||
"${CMAKE_SOURCE_DIR}/include/libs/sync"
|
||||
"${CMAKE_CURRENT_SOURCE_DIR}/../inc"
|
||||
)
|
||||
|
||||
|
||||
target_link_libraries(syncTest
|
||||
|
@ -429,6 +449,14 @@ target_link_libraries(syncElectTest
|
|||
sync
|
||||
gtest_main
|
||||
)
|
||||
target_link_libraries(syncEncodeTest
|
||||
sync
|
||||
gtest_main
|
||||
)
|
||||
target_link_libraries(syncWriteTest
|
||||
sync
|
||||
gtest_main
|
||||
)
|
||||
|
||||
|
||||
enable_testing()
|
||||
|
|
|
@ -116,7 +116,9 @@ int main(int argc, char** argv) {
|
|||
|
||||
//---------------------------
|
||||
while (1) {
|
||||
sTrace("while 1 sleep, state: %d, %s", gSyncNode->state, syncUtilState2String(gSyncNode->state));
|
||||
sTrace("while 1 sleep, state: %d, %s, electTimerLogicClock:%lu, electTimerLogicClockUser:%lu, electTimerMS:%d",
|
||||
gSyncNode->state, syncUtilState2String(gSyncNode->state), gSyncNode->electTimerLogicClock,
|
||||
gSyncNode->electTimerLogicClockUser, gSyncNode->electTimerMS);
|
||||
taosMsleep(1000);
|
||||
}
|
||||
|
||||
|
|
|
@ -0,0 +1,204 @@
|
|||
#include <gtest/gtest.h>
|
||||
#include <stdio.h>
|
||||
#include "syncEnv.h"
|
||||
#include "syncIO.h"
|
||||
#include "syncInt.h"
|
||||
#include "syncMessage.h"
|
||||
#include "syncRaftEntry.h"
|
||||
#include "syncRaftLog.h"
|
||||
#include "syncRaftStore.h"
|
||||
#include "syncUtil.h"
|
||||
|
||||
void logTest() {
|
||||
sTrace("--- sync log test: trace");
|
||||
sDebug("--- sync log test: debug");
|
||||
sInfo("--- sync log test: info");
|
||||
sWarn("--- sync log test: warn");
|
||||
sError("--- sync log test: error");
|
||||
sFatal("--- sync log test: fatal");
|
||||
}
|
||||
|
||||
uint16_t ports[] = {7010, 7110, 7210, 7310, 7410};
|
||||
int32_t replicaNum = 1;
|
||||
int32_t myIndex = 0;
|
||||
|
||||
SRaftId ids[TSDB_MAX_REPLICA];
|
||||
SSyncInfo syncInfo;
|
||||
SSyncFSM * pFsm;
|
||||
SWal * pWal;
|
||||
SSyncNode *pSyncNode;
|
||||
|
||||
SSyncNode *syncNodeInit() {
|
||||
syncInfo.vgId = 1234;
|
||||
syncInfo.rpcClient = gSyncIO->clientRpc;
|
||||
syncInfo.FpSendMsg = syncIOSendMsg;
|
||||
syncInfo.queue = gSyncIO->pMsgQ;
|
||||
syncInfo.FpEqMsg = syncIOEqMsg;
|
||||
syncInfo.pFsm = pFsm;
|
||||
snprintf(syncInfo.path, sizeof(syncInfo.path), "%s", "./");
|
||||
|
||||
int code = walInit();
|
||||
assert(code == 0);
|
||||
SWalCfg walCfg;
|
||||
memset(&walCfg, 0, sizeof(SWalCfg));
|
||||
walCfg.vgId = syncInfo.vgId;
|
||||
walCfg.fsyncPeriod = 1000;
|
||||
walCfg.retentionPeriod = 1000;
|
||||
walCfg.rollPeriod = 1000;
|
||||
walCfg.retentionSize = 1000;
|
||||
walCfg.segSize = 1000;
|
||||
walCfg.level = TAOS_WAL_FSYNC;
|
||||
pWal = walOpen("./wal_test", &walCfg);
|
||||
assert(pWal != NULL);
|
||||
|
||||
syncInfo.pWal = pWal;
|
||||
|
||||
SSyncCfg *pCfg = &syncInfo.syncCfg;
|
||||
pCfg->myIndex = myIndex;
|
||||
pCfg->replicaNum = replicaNum;
|
||||
|
||||
for (int i = 0; i < replicaNum; ++i) {
|
||||
pCfg->nodeInfo[i].nodePort = ports[i];
|
||||
snprintf(pCfg->nodeInfo[i].nodeFqdn, sizeof(pCfg->nodeInfo[i].nodeFqdn), "%s", "127.0.0.1");
|
||||
// taosGetFqdn(pCfg->nodeInfo[0].nodeFqdn);
|
||||
}
|
||||
|
||||
pSyncNode = syncNodeOpen(&syncInfo);
|
||||
assert(pSyncNode != NULL);
|
||||
|
||||
gSyncIO->FpOnSyncPing = pSyncNode->FpOnPing;
|
||||
gSyncIO->FpOnSyncPingReply = pSyncNode->FpOnPingReply;
|
||||
gSyncIO->FpOnSyncRequestVote = pSyncNode->FpOnRequestVote;
|
||||
gSyncIO->FpOnSyncRequestVoteReply = pSyncNode->FpOnRequestVoteReply;
|
||||
gSyncIO->FpOnSyncAppendEntries = pSyncNode->FpOnAppendEntries;
|
||||
gSyncIO->FpOnSyncAppendEntriesReply = pSyncNode->FpOnAppendEntriesReply;
|
||||
gSyncIO->FpOnSyncPing = pSyncNode->FpOnPing;
|
||||
gSyncIO->FpOnSyncPingReply = pSyncNode->FpOnPingReply;
|
||||
gSyncIO->FpOnSyncTimeout = pSyncNode->FpOnTimeout;
|
||||
gSyncIO->pSyncNode = pSyncNode;
|
||||
|
||||
return pSyncNode;
|
||||
}
|
||||
|
||||
SSyncNode *syncInitTest() { return syncNodeInit(); }
|
||||
|
||||
void initRaftId(SSyncNode *pSyncNode) {
|
||||
for (int i = 0; i < replicaNum; ++i) {
|
||||
ids[i] = pSyncNode->replicasId[i];
|
||||
char *s = syncUtilRaftId2Str(&ids[i]);
|
||||
printf("raftId[%d] : %s\n", i, s);
|
||||
free(s);
|
||||
}
|
||||
}
|
||||
|
||||
SRpcMsg *step0() {
|
||||
SRpcMsg *pMsg = (SRpcMsg *)malloc(sizeof(SRpcMsg));
|
||||
memset(pMsg, 0, sizeof(SRpcMsg));
|
||||
pMsg->msgType = 9999;
|
||||
pMsg->contLen = 32;
|
||||
pMsg->pCont = malloc(pMsg->contLen);
|
||||
snprintf((char *)(pMsg->pCont), pMsg->contLen, "hello, world");
|
||||
return pMsg;
|
||||
}
|
||||
|
||||
SyncClientRequest *step1(const SRpcMsg *pMsg) {
|
||||
SyncClientRequest *pRetMsg = syncClientRequestBuild2(pMsg, 123, true);
|
||||
return pRetMsg;
|
||||
}
|
||||
|
||||
SRpcMsg *step2(const SyncClientRequest *pMsg) {
|
||||
SRpcMsg *pRetMsg = (SRpcMsg *)malloc(sizeof(SRpcMsg));
|
||||
syncClientRequest2RpcMsg(pMsg, pRetMsg);
|
||||
return pRetMsg;
|
||||
}
|
||||
|
||||
SyncClientRequest *step3(const SRpcMsg *pMsg) {
|
||||
SyncClientRequest *pRetMsg = syncClientRequestFromRpcMsg2(pMsg);
|
||||
return pRetMsg;
|
||||
}
|
||||
|
||||
SSyncRaftEntry *step4(const SyncClientRequest *pMsg) {
|
||||
SSyncRaftEntry *pRetMsg = syncEntryBuild2((SyncClientRequest *)pMsg, 100, 0);
|
||||
return pRetMsg;
|
||||
}
|
||||
|
||||
char *step5(const SSyncRaftEntry *pMsg, uint32_t *len) {
|
||||
char *pRetMsg = syncEntrySerialize(pMsg, len);
|
||||
return pRetMsg;
|
||||
}
|
||||
|
||||
SSyncRaftEntry *step6(const char *pMsg, uint32_t len) {
|
||||
SSyncRaftEntry *pRetMsg = syncEntryDeserialize(pMsg, len);
|
||||
return pRetMsg;
|
||||
}
|
||||
|
||||
SRpcMsg *step7(const SSyncRaftEntry *pMsg) {
|
||||
SRpcMsg *pRetMsg = (SRpcMsg *)malloc(sizeof(SRpcMsg));
|
||||
syncEntry2OriginalRpc(pMsg, pRetMsg);
|
||||
return pRetMsg;
|
||||
}
|
||||
|
||||
int main(int argc, char **argv) {
|
||||
// taosInitLog((char *)"syncTest.log", 100000, 10);
|
||||
tsAsyncLog = 0;
|
||||
sDebugFlag = 143 + 64;
|
||||
void logTest();
|
||||
|
||||
myIndex = 0;
|
||||
if (argc >= 2) {
|
||||
myIndex = atoi(argv[1]);
|
||||
}
|
||||
|
||||
int32_t ret = syncIOStart((char *)"127.0.0.1", ports[myIndex]);
|
||||
assert(ret == 0);
|
||||
|
||||
ret = syncEnvStart();
|
||||
assert(ret == 0);
|
||||
|
||||
taosRemoveDir("./wal_test");
|
||||
|
||||
// step0
|
||||
SRpcMsg *pMsg0 = step0();
|
||||
syncRpcMsgPrint2((char *)"==step0==", pMsg0);
|
||||
|
||||
// step1
|
||||
SyncClientRequest *pMsg1 = step1(pMsg0);
|
||||
syncClientRequestPrint2((char *)"==step1==", pMsg1);
|
||||
|
||||
// step2
|
||||
SRpcMsg *pMsg2 = step2(pMsg1);
|
||||
syncRpcMsgPrint2((char *)"==step2==", pMsg2);
|
||||
|
||||
// step3
|
||||
SyncClientRequest *pMsg3 = step3(pMsg2);
|
||||
syncClientRequestPrint2((char *)"==step3==", pMsg3);
|
||||
|
||||
// step4
|
||||
SSyncRaftEntry *pMsg4 = step4(pMsg3);
|
||||
syncEntryPrint2((char *)"==step4==", pMsg4);
|
||||
|
||||
// log, relog
|
||||
SSyncNode *pSyncNode = syncNodeInit();
|
||||
assert(pSyncNode != NULL);
|
||||
SSyncRaftEntry *pEntry = pMsg4;
|
||||
pSyncNode->pLogStore->appendEntry(pSyncNode->pLogStore, pEntry);
|
||||
SSyncRaftEntry *pEntry2 = pSyncNode->pLogStore->getEntry(pSyncNode->pLogStore, pEntry->index);
|
||||
syncEntryPrint2((char *)"==pEntry2==", pEntry2);
|
||||
|
||||
// step5
|
||||
uint32_t len;
|
||||
char * pMsg5 = step5(pMsg4, &len);
|
||||
char * s = syncUtilprintBin(pMsg5, len);
|
||||
printf("==step5== [%s] \n", s);
|
||||
free(s);
|
||||
|
||||
// step6
|
||||
SSyncRaftEntry *pMsg6 = step6(pMsg5, len);
|
||||
syncEntryPrint2((char *)"==step6==", pMsg6);
|
||||
|
||||
// step7
|
||||
SRpcMsg *pMsg7 = step7(pMsg6);
|
||||
syncRpcMsgPrint2((char *)"==step7==", pMsg7);
|
||||
|
||||
return 0;
|
||||
}
|
|
@ -0,0 +1,180 @@
|
|||
#include <gtest/gtest.h>
|
||||
#include <stdio.h>
|
||||
#include "syncEnv.h"
|
||||
#include "syncIO.h"
|
||||
#include "syncInt.h"
|
||||
#include "syncMessage.h"
|
||||
#include "syncRaftEntry.h"
|
||||
#include "syncRaftLog.h"
|
||||
#include "syncRaftStore.h"
|
||||
#include "syncUtil.h"
|
||||
|
||||
void logTest() {
|
||||
sTrace("--- sync log test: trace");
|
||||
sDebug("--- sync log test: debug");
|
||||
sInfo("--- sync log test: info");
|
||||
sWarn("--- sync log test: warn");
|
||||
sError("--- sync log test: error");
|
||||
sFatal("--- sync log test: fatal");
|
||||
}
|
||||
|
||||
uint16_t ports[] = {7010, 7110, 7210, 7310, 7410};
|
||||
int32_t replicaNum = 1;
|
||||
int32_t myIndex = 0;
|
||||
|
||||
SRaftId ids[TSDB_MAX_REPLICA];
|
||||
SSyncInfo syncInfo;
|
||||
SSyncFSM * pFsm;
|
||||
SWal * pWal;
|
||||
SSyncNode *gSyncNode;
|
||||
|
||||
void CommitCb(struct SSyncFSM *pFsm, const SRpcMsg *pBuf, SyncIndex index, bool isWeak, int32_t code) {
|
||||
printf("==CommitCb== pFsm:%p, index:%ld, isWeak:%d, code:%d \n", pFsm, index, isWeak, code);
|
||||
syncRpcMsgPrint2((char *)"==CommitCb==", (SRpcMsg *)pBuf);
|
||||
}
|
||||
|
||||
void PreCommitCb(struct SSyncFSM *pFsm, const SRpcMsg *pBuf, SyncIndex index, bool isWeak, int32_t code) {
|
||||
printf("==PreCommitCb== pFsm:%p, index:%ld, isWeak:%d, code:%d \n", pFsm, index, isWeak, code);
|
||||
syncRpcMsgPrint2((char *)"==PreCommitCb==", (SRpcMsg *)pBuf);
|
||||
}
|
||||
|
||||
void RollBackCb(struct SSyncFSM *pFsm, const SRpcMsg *pBuf, SyncIndex index, bool isWeak, int32_t code) {
|
||||
printf("==RollBackCb== pFsm:%p, index:%ld, isWeak:%d, code:%d \n", pFsm, index, isWeak, code);
|
||||
syncRpcMsgPrint2((char *)"==RollBackCb==", (SRpcMsg *)pBuf);
|
||||
}
|
||||
|
||||
void initFsm() {
|
||||
pFsm = (SSyncFSM *)malloc(sizeof(SSyncFSM));
|
||||
pFsm->FpCommitCb = CommitCb;
|
||||
pFsm->FpPreCommitCb = PreCommitCb;
|
||||
pFsm->FpRollBackCb = RollBackCb;
|
||||
}
|
||||
|
||||
SSyncNode *syncNodeInit() {
|
||||
syncInfo.vgId = 1234;
|
||||
syncInfo.rpcClient = gSyncIO->clientRpc;
|
||||
syncInfo.FpSendMsg = syncIOSendMsg;
|
||||
syncInfo.queue = gSyncIO->pMsgQ;
|
||||
syncInfo.FpEqMsg = syncIOEqMsg;
|
||||
syncInfo.pFsm = pFsm;
|
||||
snprintf(syncInfo.path, sizeof(syncInfo.path), "%s", "./write_test");
|
||||
|
||||
int code = walInit();
|
||||
assert(code == 0);
|
||||
SWalCfg walCfg;
|
||||
memset(&walCfg, 0, sizeof(SWalCfg));
|
||||
walCfg.vgId = syncInfo.vgId;
|
||||
walCfg.fsyncPeriod = 1000;
|
||||
walCfg.retentionPeriod = 1000;
|
||||
walCfg.rollPeriod = 1000;
|
||||
walCfg.retentionSize = 1000;
|
||||
walCfg.segSize = 1000;
|
||||
walCfg.level = TAOS_WAL_FSYNC;
|
||||
pWal = walOpen("./write_test_wal", &walCfg);
|
||||
assert(pWal != NULL);
|
||||
|
||||
syncInfo.pWal = pWal;
|
||||
|
||||
SSyncCfg *pCfg = &syncInfo.syncCfg;
|
||||
pCfg->myIndex = myIndex;
|
||||
pCfg->replicaNum = replicaNum;
|
||||
|
||||
for (int i = 0; i < replicaNum; ++i) {
|
||||
pCfg->nodeInfo[i].nodePort = ports[i];
|
||||
snprintf(pCfg->nodeInfo[i].nodeFqdn, sizeof(pCfg->nodeInfo[i].nodeFqdn), "%s", "127.0.0.1");
|
||||
// taosGetFqdn(pCfg->nodeInfo[0].nodeFqdn);
|
||||
}
|
||||
|
||||
SSyncNode *pSyncNode = syncNodeOpen(&syncInfo);
|
||||
assert(pSyncNode != NULL);
|
||||
|
||||
gSyncIO->FpOnSyncPing = pSyncNode->FpOnPing;
|
||||
gSyncIO->FpOnSyncClientRequest = pSyncNode->FpOnClientRequest;
|
||||
gSyncIO->FpOnSyncPingReply = pSyncNode->FpOnPingReply;
|
||||
gSyncIO->FpOnSyncRequestVote = pSyncNode->FpOnRequestVote;
|
||||
gSyncIO->FpOnSyncRequestVoteReply = pSyncNode->FpOnRequestVoteReply;
|
||||
gSyncIO->FpOnSyncAppendEntries = pSyncNode->FpOnAppendEntries;
|
||||
gSyncIO->FpOnSyncAppendEntriesReply = pSyncNode->FpOnAppendEntriesReply;
|
||||
gSyncIO->FpOnSyncTimeout = pSyncNode->FpOnTimeout;
|
||||
gSyncIO->pSyncNode = pSyncNode;
|
||||
|
||||
return pSyncNode;
|
||||
}
|
||||
|
||||
SSyncNode *syncInitTest() { return syncNodeInit(); }
|
||||
|
||||
void initRaftId(SSyncNode *pSyncNode) {
|
||||
for (int i = 0; i < replicaNum; ++i) {
|
||||
ids[i] = pSyncNode->replicasId[i];
|
||||
char *s = syncUtilRaftId2Str(&ids[i]);
|
||||
printf("raftId[%d] : %s\n", i, s);
|
||||
free(s);
|
||||
}
|
||||
}
|
||||
|
||||
SRpcMsg *step0() {
|
||||
SRpcMsg *pMsg = (SRpcMsg *)malloc(sizeof(SRpcMsg));
|
||||
memset(pMsg, 0, sizeof(SRpcMsg));
|
||||
pMsg->msgType = 9999;
|
||||
pMsg->contLen = 32;
|
||||
pMsg->pCont = malloc(pMsg->contLen);
|
||||
snprintf((char *)(pMsg->pCont), pMsg->contLen, "hello, world");
|
||||
return pMsg;
|
||||
}
|
||||
|
||||
SyncClientRequest *step1(const SRpcMsg *pMsg) {
|
||||
SyncClientRequest *pRetMsg = syncClientRequestBuild2(pMsg, 123, true);
|
||||
return pRetMsg;
|
||||
}
|
||||
|
||||
int main(int argc, char **argv) {
|
||||
// taosInitLog((char *)"syncTest.log", 100000, 10);
|
||||
tsAsyncLog = 0;
|
||||
sDebugFlag = 143 + 64;
|
||||
void logTest();
|
||||
|
||||
myIndex = 0;
|
||||
if (argc >= 2) {
|
||||
myIndex = atoi(argv[1]);
|
||||
}
|
||||
|
||||
int32_t ret = syncIOStart((char *)"127.0.0.1", ports[myIndex]);
|
||||
assert(ret == 0);
|
||||
|
||||
ret = syncEnvStart();
|
||||
assert(ret == 0);
|
||||
|
||||
taosRemoveDir("./wal_test");
|
||||
|
||||
initFsm();
|
||||
|
||||
gSyncNode = syncInitTest();
|
||||
assert(gSyncNode != NULL);
|
||||
syncNodePrint2((char *)"", gSyncNode);
|
||||
|
||||
initRaftId(gSyncNode);
|
||||
|
||||
// step0
|
||||
SRpcMsg *pMsg0 = step0();
|
||||
syncRpcMsgPrint2((char *)"==step0==", pMsg0);
|
||||
|
||||
// step1
|
||||
SyncClientRequest *pMsg1 = step1(pMsg0);
|
||||
syncClientRequestPrint2((char *)"==step1==", pMsg1);
|
||||
|
||||
for (int i = 0; i < 5; ++i) {
|
||||
SyncClientRequest *pSyncClientRequest = pMsg1;
|
||||
SRpcMsg rpcMsg;
|
||||
syncClientRequest2RpcMsg(pSyncClientRequest, &rpcMsg);
|
||||
gSyncNode->FpEqMsg(gSyncNode->queue, &rpcMsg);
|
||||
|
||||
taosMsleep(1000);
|
||||
}
|
||||
|
||||
while (1) {
|
||||
sTrace("while 1 sleep");
|
||||
taosMsleep(1000);
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
|
@ -27,7 +27,8 @@ typedef struct {
|
|||
} SBtIdx;
|
||||
|
||||
// Btree page header definition
|
||||
typedef struct __attribute__((__packed__)) {
|
||||
#pragma pack (push,1)
|
||||
typedef struct {
|
||||
uint8_t flag; // page flag
|
||||
int32_t vlen; // value length of current page, TDB_VARIANT_LEN for variant length
|
||||
uint16_t nPayloads; // number of total payloads
|
||||
|
@ -36,6 +37,7 @@ typedef struct __attribute__((__packed__)) {
|
|||
pgoff_t offPayload; // payload offset
|
||||
pgno_t rChildPgno; // right most child page number
|
||||
} SBtPgHdr;
|
||||
#pragma pack(pop)
|
||||
|
||||
typedef int (*BtreeCmprFn)(const void *, const void *);
|
||||
|
||||
|
|
|
@ -30,11 +30,7 @@ struct STDbEnv {
|
|||
} pgfht; // page file hash table;
|
||||
};
|
||||
|
||||
#define TDB_ENV_PGF_HASH(fileid) \
|
||||
({ \
|
||||
uint8_t *tmp = (uint8_t *)(fileid); \
|
||||
tmp[0] + tmp[1] + tmp[2]; \
|
||||
})
|
||||
#define TDB_ENV_PGF_HASH(fileid) (((uint8_t *)(fileid))[0] + ((uint8_t *)(fileid))[1] + ((uint8_t *)(fileid))[2])
|
||||
|
||||
static int tdbEnvDestroy(TENV *pEnv);
|
||||
|
||||
|
|
|
@ -106,11 +106,7 @@ int pgCacheClose(SPgCache *pPgCache) {
|
|||
return 0;
|
||||
}
|
||||
|
||||
#define PG_CACHE_HASH(fileid, pgno) \
|
||||
({ \
|
||||
uint64_t *tmp = (uint64_t *)(fileid); \
|
||||
(tmp[0] + tmp[1] + tmp[2] + (pgno)); \
|
||||
})
|
||||
#define PG_CACHE_HASH(fileid, pgno) (((uint64_t *)(fileid))[0] + ((uint64_t *)(fileid))[1] + ((uint64_t *)(fileid))[2] + (pgno))
|
||||
|
||||
SPage *pgCacheFetch(SPgCache *pPgCache, pgid_t pgid) {
|
||||
SPage * pPage;
|
||||
|
|
|
@ -20,13 +20,15 @@
|
|||
extern "C" {
|
||||
#endif
|
||||
|
||||
typedef struct __attribute__((__packed__)) {
|
||||
#pragma pack (push,1)
|
||||
typedef struct {
|
||||
char hdrInfo[16]; // info string
|
||||
pgsz_t szPage; // page size of current file
|
||||
int32_t cno; // commit number counter
|
||||
pgno_t freePgno; // freelist page number
|
||||
uint8_t resv[100]; // reserved space
|
||||
} SPgFileHdr;
|
||||
#pragma pack(pop)
|
||||
|
||||
#define TDB_PG_FILE_HDR_SIZE 128
|
||||
|
||||
|
|
|
@ -202,6 +202,11 @@ void tfsDirname(const STfsFile *pFile, char *dest) {
|
|||
tstrncpy(dest, taosDirName(tname), TSDB_FILENAME_LEN);
|
||||
}
|
||||
|
||||
void tfsAbsoluteName(STfs *pTfs, SDiskID diskId, const char *rname, char *aname) {
|
||||
STfsDisk *pDisk = TFS_DISK_AT(pTfs, diskId);
|
||||
snprintf(aname, TMPNAME_LEN, "%s%s%s", pDisk->path, TD_DIRSEP, rname);
|
||||
}
|
||||
|
||||
int32_t tfsRemoveFile(const STfsFile *pFile) { return taosRemoveFile(pFile->aname); }
|
||||
|
||||
int32_t tfsCopyFile(const STfsFile *pFile1, const STfsFile *pFile2) {
|
||||
|
|
|
@ -207,8 +207,8 @@ void cliHandleResp(SCliConn* conn) {
|
|||
}
|
||||
|
||||
tDebug("%s cli conn %p %s received from %s:%d, local info: %s:%d, msg size: %d", pTransInst->label, conn,
|
||||
TMSG_INFO(pHead->msgType), inet_ntoa(conn->addr.sin_addr), ntohs(conn->addr.sin_port),
|
||||
inet_ntoa(conn->locaddr.sin_addr), ntohs(conn->locaddr.sin_port), transMsg.contLen);
|
||||
TMSG_INFO(pHead->msgType), taosInetNtoa(conn->addr.sin_addr), ntohs(conn->addr.sin_port),
|
||||
taosInetNtoa(conn->locaddr.sin_addr), ntohs(conn->locaddr.sin_port), transMsg.contLen);
|
||||
|
||||
conn->secured = pHead->secured;
|
||||
|
||||
|
@ -519,8 +519,8 @@ void cliSend(SCliConn* pConn) {
|
|||
|
||||
uv_buf_t wb = uv_buf_init((char*)pHead, msgLen);
|
||||
tDebug("%s cli conn %p %s is send to %s:%d, local info %s:%d", CONN_GET_INST_LABEL(pConn), pConn,
|
||||
TMSG_INFO(pHead->msgType), inet_ntoa(pConn->addr.sin_addr), ntohs(pConn->addr.sin_port),
|
||||
inet_ntoa(pConn->locaddr.sin_addr), ntohs(pConn->locaddr.sin_port));
|
||||
TMSG_INFO(pHead->msgType), taosInetNtoa(pConn->addr.sin_addr), ntohs(pConn->addr.sin_port),
|
||||
taosInetNtoa(pConn->locaddr.sin_addr), ntohs(pConn->locaddr.sin_port));
|
||||
|
||||
pConn->writeReq.data = pConn;
|
||||
uv_write(&pConn->writeReq, (uv_stream_t*)pConn->stream, &wb, 1, cliSendCb);
|
||||
|
|
|
@ -202,7 +202,7 @@ static void uvHandleReq(SSrvConn* pConn) {
|
|||
|
||||
STransMsgHead* pHead = (STransMsgHead*)p->msg;
|
||||
if (pHead->secured == 1) {
|
||||
STransUserMsg* uMsg = (p->msg + p->msgLen - sizeof(STransUserMsg));
|
||||
STransUserMsg* uMsg = (STransUserMsg*)((char*)p->msg + p->msgLen - sizeof(STransUserMsg));
|
||||
memcpy(pConn->user, uMsg->user, tListLen(uMsg->user));
|
||||
memcpy(pConn->secret, uMsg->secret, tListLen(uMsg->secret));
|
||||
}
|
||||
|
@ -235,12 +235,12 @@ static void uvHandleReq(SSrvConn* pConn) {
|
|||
transRefSrvHandle(pConn);
|
||||
transMsg.handle = pConn;
|
||||
tDebug("server conn %p %s received from %s:%d, local info: %s:%d, msg size: %d", pConn, TMSG_INFO(transMsg.msgType),
|
||||
inet_ntoa(pConn->addr.sin_addr), ntohs(pConn->addr.sin_port), inet_ntoa(pConn->locaddr.sin_addr),
|
||||
taosInetNtoa(pConn->addr.sin_addr), ntohs(pConn->addr.sin_port), taosInetNtoa(pConn->locaddr.sin_addr),
|
||||
ntohs(pConn->locaddr.sin_port), transMsg.contLen);
|
||||
} else {
|
||||
tDebug("server conn %p %s received from %s:%d, local info: %s:%d, msg size: %d, no resp ", pConn,
|
||||
TMSG_INFO(transMsg.msgType), inet_ntoa(pConn->addr.sin_addr), ntohs(pConn->addr.sin_port),
|
||||
inet_ntoa(pConn->locaddr.sin_addr), ntohs(pConn->locaddr.sin_port), transMsg.contLen);
|
||||
TMSG_INFO(transMsg.msgType), taosInetNtoa(pConn->addr.sin_addr), ntohs(pConn->addr.sin_port),
|
||||
taosInetNtoa(pConn->locaddr.sin_addr), ntohs(pConn->locaddr.sin_port), transMsg.contLen);
|
||||
}
|
||||
|
||||
STrans* pTransInst = (STrans*)p->shandle;
|
||||
|
@ -347,7 +347,7 @@ static void uvPrepareSendData(SSrvMsg* smsg, uv_buf_t* wb) {
|
|||
// impl later
|
||||
}
|
||||
tDebug("server conn %p %s is sent to %s:%d, local info: %s:%d", pConn, TMSG_INFO(pHead->msgType),
|
||||
inet_ntoa(pConn->addr.sin_addr), ntohs(pConn->addr.sin_port), inet_ntoa(pConn->locaddr.sin_addr),
|
||||
taosInetNtoa(pConn->addr.sin_addr), ntohs(pConn->addr.sin_port), taosInetNtoa(pConn->locaddr.sin_addr),
|
||||
ntohs(pConn->locaddr.sin_port));
|
||||
|
||||
pHead->msgLen = htonl(len);
|
||||
|
@ -374,8 +374,8 @@ static void uvStartSendResp(SSrvMsg* smsg) {
|
|||
transUnrefSrvHandle(pConn);
|
||||
|
||||
if (taosArrayGetSize(pConn->srvMsgs) > 0) {
|
||||
tDebug("server conn %p push data to client %s:%d, local info: %s:%d", pConn, inet_ntoa(pConn->addr.sin_addr),
|
||||
ntohs(pConn->addr.sin_port), inet_ntoa(pConn->locaddr.sin_addr), ntohs(pConn->locaddr.sin_port));
|
||||
tDebug("server conn %p push data to client %s:%d, local info: %s:%d", pConn, taosInetNtoa(pConn->addr.sin_addr),
|
||||
ntohs(pConn->addr.sin_port), taosInetNtoa(pConn->locaddr.sin_addr), ntohs(pConn->locaddr.sin_port));
|
||||
taosArrayPush(pConn->srvMsgs, &smsg);
|
||||
return;
|
||||
}
|
||||
|
|
|
@ -19,9 +19,6 @@
|
|||
#include "tref.h"
|
||||
#include "walInt.h"
|
||||
|
||||
#include <libgen.h>
|
||||
#include <regex.h>
|
||||
|
||||
int64_t inline walGetFirstVer(SWal* pWal) { return pWal->vers.firstVer; }
|
||||
|
||||
int64_t inline walGetSnaphostVer(SWal* pWal) { return pWal->vers.snapshotVer; }
|
||||
|
|
|
@ -3,9 +3,10 @@ add_library(os STATIC ${OS_SRC})
|
|||
target_include_directories(
|
||||
os
|
||||
PUBLIC "${CMAKE_SOURCE_DIR}/include/os"
|
||||
PRIVATE "${CMAKE_SOURCE_DIR}/include"
|
||||
PRIVATE "${CMAKE_SOURCE_DIR}/include/util"
|
||||
PRIVATE "${CMAKE_SOURCE_DIR}/contrib/pthread-win32"
|
||||
PUBLIC "${CMAKE_SOURCE_DIR}/include"
|
||||
PUBLIC "${CMAKE_SOURCE_DIR}/include/util"
|
||||
PUBLIC "${CMAKE_SOURCE_DIR}/contrib/pthread-win32"
|
||||
PUBLIC "${CMAKE_SOURCE_DIR}/contrib/gnuregex"
|
||||
)
|
||||
target_link_libraries(
|
||||
os pthread dl rt m
|
||||
|
|
|
@ -427,7 +427,7 @@ int32_t compareWStrPatternMatch(const void *pLeft, const void *pRight) {
|
|||
wchar_t *pattern = calloc(varDataLen(pRight) + 1, sizeof(wchar_t));
|
||||
memcpy(pattern, varDataVal(pRight), varDataLen(pRight));
|
||||
|
||||
int32_t ret = WCSPatternMatch(pattern, varDataVal(pLeft), varDataLen(pLeft) / TSDB_NCHAR_SIZE, &pInfo);
|
||||
int32_t ret = WCSPatternMatch(pattern, (const wchar_t *)varDataVal(pLeft), varDataLen(pLeft) / TSDB_NCHAR_SIZE, &pInfo);
|
||||
free(pattern);
|
||||
|
||||
return (ret == TSDB_PATTERN_MATCH) ? 0 : 1;
|
||||
|
|
|
@ -230,7 +230,7 @@ static int32_t tSStructA_v1_decode(SCoder *pCoder, SStructA_v1 *pSAV1) {
|
|||
const char *tstr;
|
||||
uint64_t len;
|
||||
if (tDecodeCStrAndLen(pCoder, &tstr, &len) < 0) return -1;
|
||||
pSAV1->A_c = (char *)TCODER_MALLOC(len + 1, pCoder);
|
||||
TCODER_MALLOC(pSAV1->A_c, char*, len + 1, pCoder);
|
||||
memcpy(pSAV1->A_c, tstr, len + 1);
|
||||
|
||||
tEndDecode(pCoder);
|
||||
|
@ -269,7 +269,7 @@ static int32_t tSStructA_v2_decode(SCoder *pCoder, SStructA_v2 *pSAV2) {
|
|||
const char *tstr;
|
||||
uint64_t len;
|
||||
if (tDecodeCStrAndLen(pCoder, &tstr, &len) < 0) return -1;
|
||||
pSAV2->A_c = (char *)TCODER_MALLOC(len + 1, pCoder);
|
||||
TCODER_MALLOC(pSAV2->A_c, char*, len + 1, pCoder);
|
||||
memcpy(pSAV2->A_c, tstr, len + 1);
|
||||
|
||||
// ------------------------NEW FIELDS DECODE-------------------------------
|
||||
|
@ -305,7 +305,7 @@ static int32_t tSFinalReq_v1_encode(SCoder *pCoder, const SFinalReq_v1 *ps1) {
|
|||
static int32_t tSFinalReq_v1_decode(SCoder *pCoder, SFinalReq_v1 *ps1) {
|
||||
if (tStartDecode(pCoder) < 0) return -1;
|
||||
|
||||
ps1->pA = (SStructA_v1 *)TCODER_MALLOC(sizeof(*(ps1->pA)), pCoder);
|
||||
TCODER_MALLOC(ps1->pA, SStructA_v1*, sizeof(*(ps1->pA)), pCoder);
|
||||
if (tSStructA_v1_decode(pCoder, ps1->pA) < 0) return -1;
|
||||
if (tDecodeI32(pCoder, &ps1->v_a) < 0) return -1;
|
||||
if (tDecodeI8(pCoder, &ps1->v_b) < 0) return -1;
|
||||
|
@ -339,7 +339,7 @@ static int32_t tSFinalReq_v2_encode(SCoder *pCoder, const SFinalReq_v2 *ps2) {
|
|||
static int32_t tSFinalReq_v2_decode(SCoder *pCoder, SFinalReq_v2 *ps2) {
|
||||
if (tStartDecode(pCoder) < 0) return -1;
|
||||
|
||||
ps2->pA = (SStructA_v2 *)TCODER_MALLOC(sizeof(*(ps2->pA)), pCoder);
|
||||
TCODER_MALLOC(ps2->pA, SStructA_v2*, sizeof(*(ps2->pA)), pCoder);
|
||||
if (tSStructA_v2_decode(pCoder, ps2->pA) < 0) return -1;
|
||||
if (tDecodeI32(pCoder, &ps2->v_a) < 0) return -1;
|
||||
if (tDecodeI8(pCoder, &ps2->v_b) < 0) return -1;
|
||||
|
|
|
@ -8,7 +8,8 @@ TEST(TD_UTIL_FREELIST_TEST, simple_test) {
|
|||
tFreeListInit(&fl);
|
||||
|
||||
for (size_t i = 0; i < 1000; i++) {
|
||||
void *ptr = TFL_MALLOC(1024, &fl);
|
||||
void *ptr = NULL;
|
||||
TFL_MALLOC(ptr, void*, 1024, &fl);
|
||||
GTEST_ASSERT_NE(ptr, nullptr);
|
||||
}
|
||||
|
||||
|
|
|
@ -378,7 +378,7 @@ class TdeSubProcess:
|
|||
@classmethod
|
||||
def _stopForSure(cls, proc: Popen, sig: int):
|
||||
'''
|
||||
Stop a process and all sub processes with a singal, and SIGKILL if necessary
|
||||
Stop a process and all sub processes with a signal, and SIGKILL if necessary
|
||||
'''
|
||||
def doKillTdService(proc: Popen, sig: int):
|
||||
Logging.info("Killing sub-sub process {} with signal {}".format(proc.pid, sig))
|
||||
|
|
Loading…
Reference in New Issue