Merge branch '3.0' of https://github.com/taosdata/TDengine into feature/meta

This commit is contained in:
Hongze Cheng 2022-05-18 06:44:47 +00:00
commit eb6612887a
54 changed files with 1061 additions and 704 deletions

View File

@ -29,27 +29,42 @@ extern "C" {
typedef struct SSchema SSchema;
typedef struct STColumn STColumn;
typedef struct STSchema STSchema;
typedef struct SColVal SColVal;
typedef struct STSRow2 STSRow2;
typedef struct STSRowBuilder STSRowBuilder;
typedef struct SKVIdx SKVIdx;
// STSchema
// STSRow2
int32_t tEncodeTSRow(SEncoder *pEncoder, const STSRow2 *pRow);
int32_t tDecodeTSRow(SDecoder *pDecoder, STSRow2 *pRow);
typedef struct STagVal STagVal;
typedef struct STag STag;
// STSchema
int32_t tTSchemaCreate(int32_t sver, SSchema *pSchema, int32_t nCols, STSchema **ppTSchema);
void tTSchemaDestroy(STSchema *pTSchema);
// SColVal
#define ColValNONE ((SColVal){.type = COL_VAL_NONE, .nData = 0, .pData = NULL})
#define ColValNULL ((SColVal){.type = COL_VAL_NULL, .nData = 0, .pData = NULL})
#define ColValDATA(nData, pData) ((SColVal){.type = COL_VAL_DATA, .nData = (nData), .pData = (pData)})
// STSRow2
int32_t tPutTSRow(uint8_t *p, STSRow2 *pRow);
int32_t tGetTSRow(uint8_t *p, STSRow2 *pRow);
int32_t tTSRowDup(const STSRow2 *pRow, STSRow2 **ppRow);
void tTSRowFree(STSRow2 *pRow);
int32_t tTSRowGet(const STSRow2 *pRow, STSchema *pTSchema, int32_t iCol, SColVal *pColVal);
// STSRowBuilder
int32_t tTSRowBuilderInit(STSRowBuilder *pBuilder, int32_t sver, SSchema *pSchema, int32_t nCols);
int32_t tTSRowBuilderInit(STSRowBuilder *pBuilder, int32_t sver, int32_t nCols, SSchema *pSchema);
void tTSRowBuilderClear(STSRowBuilder *pBuilder);
void tTSRowBuilderReset(STSRowBuilder *pBuilder);
int32_t tTSRowBuilderPut(STSRowBuilder *pBuilder, int32_t cid, const uint8_t *pData, uint32_t nData);
int32_t tTSRowBuilderPut(STSRowBuilder *pBuilder, int32_t cid, uint8_t *pData, uint32_t nData);
int32_t tTSRowBuilderGetRow(STSRowBuilder *pBuilder, const STSRow2 **ppRow);
// STag
int32_t tTagNew(STagVal *pTagVals, int16_t nTag, STag **ppTag);
void tTagFree(STag *pTag);
void tTagGet(STag *pTag, int16_t cid, int8_t type, uint8_t **ppData, int32_t *nData);
int32_t tEncodeTag(SEncoder *pEncoder, STag *pTag);
int32_t tDecodeTag(SDecoder *pDecoder, const STag **ppTag);
// STRUCT =================
struct STColumn {
col_id_t colId;
@ -68,31 +83,47 @@ struct STSchema {
STColumn columns[];
};
#define TSROW_HAS_NONE ((uint8_t)0x1)
#define TSROW_HAS_NULL ((uint8_t)0x2U)
#define TSROW_HAS_VAL ((uint8_t)0x4U)
#define TSROW_KV_ROW ((uint8_t)0x10U)
struct STSRow2 {
TSKEY ts;
uint32_t flags;
union {
uint8_t flags;
int32_t sver;
int32_t ncols;
};
uint32_t nData;
const uint8_t *pData;
uint8_t *pData;
};
struct STSRowBuilder {
STColumn *pTColumn;
STSchema *pTSchema;
int32_t szBitMap1;
int32_t szBitMap2;
int32_t szKVBuf;
uint8_t *pKVBuf;
int32_t szTPBuf;
uint8_t *pTPBuf;
int32_t nCols;
int32_t kvVLen;
int32_t tpVLen;
int32_t iCol;
int32_t vlenKV;
int32_t vlenTP;
STSRow2 row;
};
#if 1 //====================================
typedef enum { COL_VAL_NONE = 0, COL_VAL_NULL = 1, COL_VAL_DATA = 2 } EColValT;
struct SColVal {
EColValT type;
uint32_t nData;
uint8_t *pData;
};
struct STagVal {
int16_t cid;
int8_t type;
uint32_t nData;
uint8_t *pData;
};
#if 1 //================================================================================================================================================
// Imported since 3.0 and use bitmap to demonstrate None/Null/Norm, while use Null/Norm below 3.0 without of bitmap.
#define TD_SUPPORT_BITMAP
#define TD_SUPPORT_READ2

View File

@ -277,8 +277,8 @@ void tFreeSSubmitRsp(SSubmitRsp* pRsp);
#define COL_SMA_ON ((int8_t)0x1)
#define COL_IDX_ON ((int8_t)0x2)
#define COL_VAL_SET ((int8_t)0x4)
#define COL_SET_NULL ((int8_t)0x10)
#define COL_SET_VAL ((int8_t)0x20)
typedef struct SSchema {
int8_t type;
int8_t flags;
@ -287,6 +287,9 @@ typedef struct SSchema {
char name[TSDB_COL_NAME_LEN];
} SSchema;
#define COL_IS_SET(FLG) ((FLG) & (COL_SET_VAL | COL_SET_NULL) != 0)
#define COL_CLR_SET(FLG) ((FLG) &= (~(COL_SET_VAL | COL_SET_NULL)))
#define IS_BSMA_ON(s) (((s)->flags & 0x01) == COL_SMA_ON)
#define SSCHMEA_TYPE(s) ((s)->type)

View File

@ -41,8 +41,8 @@ typedef enum {
typedef int32_t (*PutToQueueFp)(void* pMgmt, SRpcMsg* pMsg);
typedef int32_t (*GetQueueSizeFp)(void* pMgmt, int32_t vgId, EQueueType qtype);
typedef int32_t (*SendReqFp)(const SEpSet* pEpSet, SRpcMsg* pMsg);
typedef void (*SendRspFp)(const SRpcMsg* pMsg);
typedef void (*SendRedirectRspFp)(const SRpcMsg* pMsg, const SEpSet* pNewEpSet);
typedef void (*SendRspFp)(SRpcMsg* pMsg);
typedef void (*SendRedirectRspFp)(SRpcMsg* pMsg, const SEpSet* pNewEpSet);
typedef void (*RegisterBrokenLinkArgFp)(SRpcMsg* pMsg);
typedef void (*ReleaseHandleFp)(SRpcHandleInfo* pHandle, int8_t type);
typedef void (*ReportStartup)(const char* name, const char* desc);
@ -64,8 +64,8 @@ void tmsgSetDefaultMsgCb(const SMsgCb* pMsgCb);
int32_t tmsgPutToQueue(const SMsgCb* pMsgCb, EQueueType qtype, SRpcMsg* pMsg);
int32_t tmsgGetQueueSize(const SMsgCb* pMsgCb, int32_t vgId, EQueueType qtype);
int32_t tmsgSendReq(const SEpSet* epSet, SRpcMsg* pMsg);
void tmsgSendRsp(const SRpcMsg* pMsg);
void tmsgSendRedirectRsp(const SRpcMsg* pMsg, const SEpSet* pNewEpSet);
void tmsgSendRsp(SRpcMsg* pMsg);
void tmsgSendRedirectRsp(SRpcMsg* pMsg, const SEpSet* pNewEpSet);
void tmsgRegisterBrokenLinkArg(SRpcMsg* pMsg);
void tmsgReleaseHandle(SRpcHandleInfo* pHandle, int8_t type);
void tmsgReportStartup(const char* name, const char* desc);

View File

@ -456,52 +456,189 @@ static FORCE_INLINE void* tDecoderMalloc(SDecoder* pCoder, int32_t size) {
return p;
}
static FORCE_INLINE int32_t tPutBinary(uint8_t* p, const uint8_t* pData, uint32_t nData) {
// ===========================================
#define tPutV(p, v) \
do { \
int32_t n = 0; \
for (;;) { \
if (v <= 0x7f) { \
if (p) p[n] = v; \
n++; \
break; \
} \
if (p) p[n] = (v & 0x7f) | 0x80; \
n++; \
v >>= 7; \
} \
return n; \
} while (0)
#define tGetV(p, v) \
do { \
int32_t n = 0; \
if (v) *v = 0; \
for (;;) { \
if (p[n] <= 0x7f) { \
if (v) (*v) |= (p[n] << (7 * n)); \
n++; \
break; \
} \
if (v) (*v) |= ((p[n] & 0x7f) << (7 * n)); \
n++; \
} \
return n; \
} while (0)
// PUT
static FORCE_INLINE int32_t tPutU8(uint8_t* p, uint8_t v) {
if (p) ((uint8_t*)p)[0] = v;
return sizeof(uint8_t);
}
static FORCE_INLINE int32_t tPutI8(uint8_t* p, int8_t v) {
if (p) ((int8_t*)p)[0] = v;
return sizeof(int8_t);
}
static FORCE_INLINE int32_t tPutU16(uint8_t* p, uint16_t v) {
if (p) ((uint16_t*)p)[0] = v;
return sizeof(uint16_t);
}
static FORCE_INLINE int32_t tPutI16(uint8_t* p, int16_t v) {
if (p) ((int16_t*)p)[0] = v;
return sizeof(int16_t);
}
static FORCE_INLINE int32_t tPutU32(uint8_t* p, uint32_t v) {
if (p) ((uint32_t*)p)[0] = v;
return sizeof(uint32_t);
}
static FORCE_INLINE int32_t tPutI32(uint8_t* p, int32_t v) {
if (p) ((int32_t*)p)[0] = v;
return sizeof(int32_t);
}
static FORCE_INLINE int32_t tPutU64(uint8_t* p, uint64_t v) {
if (p) ((uint64_t*)p)[0] = v;
return sizeof(uint64_t);
}
static FORCE_INLINE int32_t tPutI64(uint8_t* p, int64_t v) {
if (p) ((int64_t*)p)[0] = v;
return sizeof(int64_t);
}
static FORCE_INLINE int32_t tPutU16v(uint8_t* p, uint16_t v) { tPutV(p, v); }
static FORCE_INLINE int32_t tPutI16v(uint8_t* p, int16_t v) { return tPutU16v(p, ZIGZAGE(int16_t, v)); }
static FORCE_INLINE int32_t tPutU32v(uint8_t* p, uint32_t v) { tPutV(p, v); }
static FORCE_INLINE int32_t tPutI32v(uint8_t* p, int32_t v) { return tPutU32v(p, ZIGZAGE(int32_t, v)); }
static FORCE_INLINE int32_t tPutU64v(uint8_t* p, uint64_t v) { tPutV(p, v); }
static FORCE_INLINE int32_t tPutI64v(uint8_t* p, int64_t v) { return tPutU64v(p, ZIGZAGE(int64_t, v)); }
// GET
static FORCE_INLINE int32_t tGetU8(uint8_t* p, uint8_t* v) {
if (v) *v = ((uint8_t*)p)[0];
return sizeof(uint8_t);
}
static FORCE_INLINE int32_t tGetI8(uint8_t* p, int8_t* v) {
if (v) *v = ((int8_t*)p)[0];
return sizeof(int8_t);
}
static FORCE_INLINE int32_t tGetU16(uint8_t* p, uint16_t* v) {
if (v) *v = ((uint16_t*)p)[0];
return sizeof(uint16_t);
}
static FORCE_INLINE int32_t tGetI16(uint8_t* p, int16_t* v) {
if (v) *v = ((int16_t*)p)[0];
return sizeof(int16_t);
}
static FORCE_INLINE int32_t tGetU32(uint8_t* p, uint32_t* v) {
if (v) *v = ((uint32_t*)p)[0];
return sizeof(uint32_t);
}
static FORCE_INLINE int32_t tGetI32(uint8_t* p, int32_t* v) {
if (v) *v = ((int32_t*)p)[0];
return sizeof(int32_t);
}
static FORCE_INLINE int32_t tGetU64(uint8_t* p, uint64_t* v) {
if (v) *v = ((uint64_t*)p)[0];
return sizeof(uint64_t);
}
static FORCE_INLINE int32_t tGetI64(uint8_t* p, int64_t* v) {
if (v) *v = ((int64_t*)p)[0];
return sizeof(int64_t);
}
static FORCE_INLINE int32_t tGetU16v(uint8_t* p, uint16_t* v) { tGetV(p, v); }
static FORCE_INLINE int32_t tGetI16v(uint8_t* p, int16_t* v) {
int32_t n;
uint16_t tv;
n = tGetU16v(p, &tv);
if (v) *v = ZIGZAGD(int16_t, tv);
return n;
}
static FORCE_INLINE int32_t tGetU32v(uint8_t* p, uint32_t* v) { tGetV(p, v); }
static FORCE_INLINE int32_t tGetI32v(uint8_t* p, int32_t* v) {
int32_t n;
uint32_t tv;
n = tGetU32v(p, &tv);
if (v) *v = ZIGZAGD(int32_t, tv);
return n;
}
static FORCE_INLINE int32_t tGetU64v(uint8_t* p, uint64_t* v) { tGetV(p, v); }
static FORCE_INLINE int32_t tGetI64v(uint8_t* p, int64_t* v) {
int32_t n;
uint64_t tv;
n = tGetU64v(p, &tv);
if (v) *v = ZIGZAGD(int64_t, tv);
return n;
}
// =====================
static FORCE_INLINE int32_t tPutBinary(uint8_t* p, uint8_t* pData, uint32_t nData) {
int n = 0;
uint32_t v = nData;
for (;;) {
if (v <= 0x7f) {
if (p) p[n] = v;
n++;
break;
}
if (p) p[n] = (v & 0x7f) | 0x80;
n++;
v >>= 7;
}
if (p) {
memcpy(p + n, pData, nData);
}
n += tPutU32v(p ? p + n : p, nData);
if (p) memcpy(p + n, pData, nData);
n += nData;
return n;
}
static FORCE_INLINE int32_t tGetBinary(const uint8_t* p, const uint8_t** ppData, uint32_t* nData) {
static FORCE_INLINE int32_t tGetBinary(uint8_t* p, uint8_t** ppData, uint32_t* nData) {
int32_t n = 0;
uint32_t tv = 0;
uint32_t t;
uint32_t nt;
for (;;) {
if (p[n] <= 0x7f) {
t = p[n];
tv |= (t << (7 * n));
n++;
break;
}
t = p[n] & 0x7f;
tv |= (t << (7 * n));
n++;
}
if (nData) *nData = n;
n += tGetU32v(p, &nt);
if (nData) *nData = nt;
if (ppData) *ppData = p + n;
n += nt;
n += tv;
return n;
}

View File

@ -714,25 +714,31 @@ static bool smlIsNchar(const char *pVal, uint16_t len) {
static int64_t smlGetTimeValue(const char *value, int32_t len, int8_t type) {
char *endPtr = NULL;
double ts = (double)strtoll(value, &endPtr, 10);
int64_t tsInt64 = strtoll(value, &endPtr, 10);
if(value + len != endPtr){
return -1;
}
double ts = tsInt64;
switch (type) {
case TSDB_TIME_PRECISION_HOURS:
ts *= (3600 * 1e9);
tsInt64 *= (3600 * 1e9);
break;
case TSDB_TIME_PRECISION_MINUTES:
ts *= (60 * 1e9);
tsInt64 *= (60 * 1e9);
break;
case TSDB_TIME_PRECISION_SECONDS:
ts *= (1e9);
tsInt64 *= (1e9);
break;
case TSDB_TIME_PRECISION_MILLI:
ts *= (1e6);
tsInt64 *= (1e6);
break;
case TSDB_TIME_PRECISION_MICRO:
ts *= (1e3);
tsInt64 *= (1e3);
break;
case TSDB_TIME_PRECISION_NANO:
break;
@ -743,7 +749,7 @@ static int64_t smlGetTimeValue(const char *value, int32_t len, int8_t type) {
return -1;
}
return (int64_t)ts;
return tsInt64;
}
static int64_t smlGetTimeNow(int8_t precision) {

View File

@ -1188,3 +1188,36 @@ TEST(testCase, smlParseTelnetLine_diff_json_type2_Test) {
destroyRequest(request);
smlDestroyInfo(info);
}
TEST(testCase, sml_TD15662_Test) {
TAOS *taos = taos_connect("localhost", "root", "taosdata", NULL, 0);
ASSERT_NE(taos, nullptr);
TAOS_RES *pRes = taos_query(taos, "create database if not exists db_15662 precision 'ns'");
taos_free_result(pRes);
pRes = taos_query(taos, "use db_15662");
taos_free_result(pRes);
SRequestObj *request = (SRequestObj *)createRequest((STscObj *)taos, NULL, NULL, TSDB_SQL_INSERT);
ASSERT_NE(request, nullptr);
SSmlHandle *info = smlBuildSmlInfo(taos, request, TSDB_SML_LINE_PROTOCOL, TSDB_SML_TIMESTAMP_NANO_SECONDS);
ASSERT_NE(info, nullptr);
const char *sql[] = {
"iyyyje,id=iyyyje_41943_1303,t0=t,t1=127i8,t2=32767i16,t3=2147483647i32,t4=9223372036854775807i64,t5=11.12345f32,t6=22.123456789f64,t7=\"binaryTagValue\",t8=L\"ncharTagValue\" c0=false,c1=127i8,c2=32767i16,c3=2147483647i32,c4=9223372036854775807i64,c5=11.12345f32,c6=22.123456789f64,c7=\"binaryColValue\",c8=L\"ncharColValue\",c9=7u64 1626006833639000000",
};
int ret = smlProcess(info, (char **)sql, sizeof(sql) / sizeof(sql[0]));
ASSERT_EQ(ret, 0);
// case 1
TAOS_RES *res = taos_query(taos, "select * from t_a5615048edae55218a22a149edebdc82");
ASSERT_NE(res, nullptr);
TAOS_ROW row = taos_fetch_row(res);
int64_t ts = *(int64_t*)row[0];
ASSERT_EQ(ts, 1626006833639000000);
taos_free_result(res);
}

View File

@ -19,37 +19,209 @@
#include "tdatablock.h"
#include "tlog.h"
#define TD_KV_ROW 0x1U
struct SKVIdx {
typedef struct SKVIdx {
int32_t cid;
int32_t offset;
} SKVIdx;
#pragma pack(push, 1)
typedef struct {
int16_t nCols;
SKVIdx idx[];
} STSKVRow;
#pragma pack(pop)
typedef struct STagIdx {
int16_t cid;
uint16_t offset;
} STagIdx;
#pragma pack(push, 1)
struct STag {
uint16_t len;
uint16_t nTag;
STagIdx idx[];
};
#pragma pack(pop)
int32_t tEncodeTSRow(SEncoder *pEncoder, const STSRow2 *pRow) {
if (tEncodeI64(pEncoder, pRow->ts) < 0) return -1;
if (tEncodeU32v(pEncoder, pRow->flags) < 0) return -1;
if (pRow->flags & TD_KV_ROW) {
if (tEncodeI32v(pEncoder, pRow->ncols) < 0) return -1;
} else {
if (tEncodeI32v(pEncoder, pRow->sver) < 0) return -1;
#define TSROW_IS_KV_ROW(r) ((r)->flags & TSROW_KV_ROW)
#define BIT1_SIZE(n) (((n)-1) / 8 + 1)
#define BIT2_SIZE(n) (((n)-1) / 4 + 1)
#define SET_BIT1(p, i, v) ((p)[(i) / 8] = (p)[(i) / 8] & (~(((uint8_t)1) << ((i) % 8))) | ((v) << ((i) % 8)))
#define SET_BIT2(p, i, v) ((p)[(i) / 4] = (p)[(i) / 4] & (~(((uint8_t)3) << ((i) % 4))) | ((v) << ((i) % 4)))
#define GET_BIT1(p, i) (((p)[(i) / 8] >> ((i) % 8)) & ((uint8_t)1))
#define GET_BIT2(p, i) (((p)[(i) / 4] >> ((i) % 4)) & ((uint8_t)3))
static FORCE_INLINE int tSKVIdxCmprFn(const void *p1, const void *p2);
// STSRow2
int32_t tPutTSRow(uint8_t *p, STSRow2 *pRow) {
int32_t n = 0;
n += tPutI64(p ? p + n : p, pRow->ts);
n += tPutI8(p ? p + n : p, pRow->flags);
n += tPutI32v(p ? p + n : p, pRow->sver);
ASSERT(pRow->flags & 0xf);
switch (pRow->flags & 0xf) {
case TSROW_HAS_NONE:
case TSROW_HAS_NULL:
break;
default:
n += tPutBinary(p ? p + n : p, pRow->pData, pRow->nData);
break;
}
if (tEncodeBinary(pEncoder, pRow->pData, pRow->nData) < 0) return -1;
return n;
}
int32_t tGetTSRow(uint8_t *p, STSRow2 *pRow) {
int32_t n = 0;
uint8_t flags;
n += tGetI64(p + n, pRow ? &pRow->ts : NULL);
n += tGetI8(p + n, pRow ? &pRow->flags : &flags);
n += tGetI32v(p + n, pRow ? &pRow->sver : NULL);
if (pRow) flags = pRow->flags;
switch (flags & 0xf) {
case TSROW_HAS_NONE:
case TSROW_HAS_NULL:
break;
default:
n += tGetBinary(p + n, pRow ? &pRow->pData : NULL, pRow ? &pRow->nData : NULL);
break;
}
return n;
}
int32_t tTSRowDup(const STSRow2 *pRow, STSRow2 **ppRow) {
(*ppRow) = taosMemoryMalloc(sizeof(*pRow) + pRow->nData);
if (*ppRow == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
(*ppRow)->ts = pRow->ts;
(*ppRow)->flags = pRow->flags;
(*ppRow)->sver = pRow->sver;
(*ppRow)->nData = pRow->nData;
if (pRow->nData) {
(*ppRow)->pData = (uint8_t *)(&(*ppRow)[1]);
memcpy((*ppRow)->pData, pRow->pData, pRow->nData);
} else {
(*ppRow)->pData = NULL;
}
return 0;
}
int32_t tDecodeTSRow(SDecoder *pDecoder, STSRow2 *pRow) {
if (tDecodeI64(pDecoder, &pRow->ts) < 0) return -1;
if (tDecodeU32v(pDecoder, &pRow->flags) < 0) return -1;
if (pRow->flags & TD_KV_ROW) {
if (tDecodeI32v(pDecoder, &pRow->ncols) < 0) return -1;
} else {
if (tDecodeI32v(pDecoder, &pRow->sver) < 0) return -1;
void tTSRowFree(STSRow2 *pRow) {
if (pRow) taosMemoryFree(pRow);
}
if (tDecodeBinary(pDecoder, &pRow->pData, &pRow->nData) < 0) return -1;
int32_t tTSRowGet(const STSRow2 *pRow, STSchema *pTSchema, int32_t iCol, SColVal *pColVal) {
uint32_t n;
uint8_t *p;
uint8_t v;
int32_t bidx = iCol - 1;
STColumn *pTColumn = &pTSchema->columns[iCol];
STSKVRow *pTSKVRow;
SKVIdx *pKVIdx;
ASSERT(iCol != 0);
ASSERT(pTColumn->colId != 0);
ASSERT(pRow->flags & 0xf != 0);
switch (pRow->flags & 0xf) {
case TSROW_HAS_NONE:
*pColVal = ColValNONE;
return 0;
case TSROW_HAS_NULL:
*pColVal = ColValNULL;
return 0;
}
if (TSROW_IS_KV_ROW(pRow)) {
ASSERT((pRow->flags & 0xf) != TSROW_HAS_VAL);
pTSKVRow = (STSKVRow *)pRow->pData;
pKVIdx =
bsearch(&((SKVIdx){.cid = pTColumn->colId}), pTSKVRow->idx, pTSKVRow->nCols, sizeof(SKVIdx), tSKVIdxCmprFn);
if (pKVIdx == NULL) {
*pColVal = ColValNONE;
} else if (pKVIdx->offset < 0) {
*pColVal = ColValNULL;
} else {
p = pRow->pData + sizeof(STSKVRow) + sizeof(SKVIdx) * pTSKVRow->nCols + pKVIdx->offset;
pColVal->type = COL_VAL_DATA;
tGetBinary(p, &pColVal->pData, &pColVal->nData);
}
} else {
// get bitmap
p = pRow->pData;
switch (pRow->flags & 0xf) {
case TSROW_HAS_NULL | TSROW_HAS_NONE:
v = GET_BIT1(p, bidx);
if (v == 0) {
*pColVal = ColValNONE;
} else {
*pColVal = ColValNULL;
}
return 0;
case TSROW_HAS_VAL | TSROW_HAS_NONE:
v = GET_BIT1(p, bidx);
if (v == 1) {
p = p + BIT1_SIZE(pTSchema->numOfCols - 1);
break;
} else {
*pColVal = ColValNONE;
return 0;
}
case TSROW_HAS_VAL | TSROW_HAS_NULL:
v = GET_BIT1(p, bidx);
if (v == 1) {
p = p + BIT1_SIZE(pTSchema->numOfCols - 1);
break;
} else {
*pColVal = ColValNULL;
return 0;
}
case TSROW_HAS_VAL | TSROW_HAS_NULL | TSROW_HAS_NONE:
v = GET_BIT2(p, bidx);
if (v == 0) {
*pColVal = ColValNONE;
return 0;
} else if (v == 1) {
*pColVal = ColValNULL;
return 0;
} else if (v == 2) {
p = p + BIT2_SIZE(pTSchema->numOfCols - 1);
break;
} else {
ASSERT(0);
}
default:
break;
}
// get real value
p = p + pTColumn->offset;
pColVal->type = COL_VAL_DATA;
if (IS_VAR_DATA_TYPE(pTColumn->type)) {
tGetBinary(p + pTSchema->flen + *(int32_t *)p, &pColVal->pData, &pColVal->nData);
} else {
pColVal->pData = p;
pColVal->nData = pTColumn->bytes;
}
}
return 0;
}
// STSchema
int32_t tTSchemaCreate(int32_t sver, SSchema *pSchema, int32_t ncols, STSchema **ppTSchema) {
*ppTSchema = (STSchema *)taosMemoryMalloc(sizeof(STSchema) + sizeof(STColumn) * ncols);
if (*ppTSchema == NULL) {
@ -85,170 +257,360 @@ int32_t tTSchemaCreate(int32_t sver, SSchema *pSchema, int32_t ncols, STSchema *
return 0;
}
void tTSchemaDestroy(STSchema *pTSchema) { taosMemoryFree(pTSchema); }
int32_t tTSRowBuilderInit(STSRowBuilder *pBuilder, int32_t sver, SSchema *pSchema, int32_t nCols) {
int32_t kvBufLen;
int32_t tpBufLen;
uint8_t *p;
void tTSchemaDestroy(STSchema *pTSchema) {
if (pTSchema) taosMemoryFree(pTSchema);
}
// STSRowBuilder
int32_t tTSRowBuilderInit(STSRowBuilder *pBuilder, int32_t sver, int32_t nCols, SSchema *pSchema) {
if (tTSchemaCreate(sver, pSchema, nCols, &pBuilder->pTSchema) < 0) return -1;
kvBufLen = sizeof(SKVIdx) * nCols + pBuilder->pTSchema->flen + pBuilder->pTSchema->vlen;
tpBufLen = pBuilder->pTSchema->flen + pBuilder->pTSchema->vlen;
if (pBuilder->szKVBuf < kvBufLen) {
p = taosMemoryRealloc(pBuilder->pKVBuf, kvBufLen);
if (p == NULL) {
pBuilder->szBitMap1 = BIT1_SIZE(nCols - 1);
pBuilder->szBitMap2 = BIT2_SIZE(nCols - 1);
pBuilder->szKVBuf =
sizeof(STSKVRow) + sizeof(SKVIdx) * (nCols - 1) + pBuilder->pTSchema->flen + pBuilder->pTSchema->vlen;
pBuilder->szTPBuf = pBuilder->szBitMap2 + pBuilder->pTSchema->flen + pBuilder->pTSchema->vlen;
pBuilder->pKVBuf = taosMemoryMalloc(pBuilder->szKVBuf);
if (pBuilder->pKVBuf == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
tTSchemaDestroy(pBuilder->pTSchema);
return -1;
}
pBuilder->pKVBuf = p;
pBuilder->szKVBuf = kvBufLen;
}
if (pBuilder->szTPBuf < tpBufLen) {
p = taosMemoryRealloc(pBuilder->pTPBuf, tpBufLen);
if (p == NULL) {
pBuilder->pTPBuf = taosMemoryMalloc(pBuilder->szTPBuf);
if (pBuilder->pTPBuf == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
taosMemoryFree(pBuilder->pKVBuf);
tTSchemaDestroy(pBuilder->pTSchema);
return -1;
}
pBuilder->pTPBuf = p;
pBuilder->szTPBuf = tpBufLen;
}
tTSRowBuilderReset(pBuilder);
return 0;
}
void tTSRowBuilderClear(STSRowBuilder *pBuilder) {
taosMemoryFree(pBuilder->pKVBuf);
if (pBuilder->pTPBuf) {
taosMemoryFree(pBuilder->pTPBuf);
pBuilder->pTPBuf = NULL;
}
if (pBuilder->pKVBuf) {
taosMemoryFree(pBuilder->pKVBuf);
pBuilder->pKVBuf = NULL;
}
tTSchemaDestroy(pBuilder->pTSchema);
pBuilder->pTSchema = NULL;
}
void tTSRowBuilderReset(STSRowBuilder *pBuilder) {
for (int32_t iCol = pBuilder->pTSchema->numOfCols - 1; iCol >= 0; iCol--) {
pBuilder->pTColumn = &pBuilder->pTSchema->columns[iCol];
pBuilder->pTColumn->flags &= (~COL_VAL_SET);
STColumn *pTColumn = &pBuilder->pTSchema->columns[iCol];
COL_CLR_SET(pTColumn->flags);
}
pBuilder->nCols = 0;
pBuilder->kvVLen = 0;
pBuilder->tpVLen = 0;
pBuilder->iCol = 0;
((STSKVRow *)pBuilder->pKVBuf)->nCols = 0;
pBuilder->vlenKV = 0;
pBuilder->vlenTP = 0;
pBuilder->row.flags = 0;
}
int32_t tTSRowBuilderPut(STSRowBuilder *pBuilder, int32_t cid, const uint8_t *pData, uint32_t nData) {
int32_t iCol;
int32_t tTSRowBuilderPut(STSRowBuilder *pBuilder, int32_t cid, uint8_t *pData, uint32_t nData) {
STColumn *pTColumn = &pBuilder->pTSchema->columns[pBuilder->iCol];
uint8_t *p;
int32_t iCol;
STSKVRow *pTSKVRow = (STSKVRow *)pBuilder->pKVBuf;
// search column
if (pBuilder->pTColumn->colId < cid) {
iCol = (pBuilder->pTColumn - pBuilder->pTSchema->columns) / sizeof(STColumn) + 1;
for (; iCol < pBuilder->pTSchema->numOfCols; iCol++) {
pBuilder->pTColumn = &pBuilder->pTSchema->columns[iCol];
if (pBuilder->pTColumn->colId == cid) break;
// use interp search
if (pTColumn->colId < cid) { // right search
for (iCol = pBuilder->iCol + 1; iCol < pBuilder->pTSchema->numOfCols; iCol++) {
pTColumn = &pBuilder->pTSchema->columns[iCol];
if (pTColumn->colId >= cid) break;
}
} else if (pBuilder->pTColumn->colId > cid) {
iCol = (pBuilder->pTColumn - pBuilder->pTSchema->columns) / sizeof(STColumn) - 1;
for (; iCol >= 0; iCol--) {
pBuilder->pTColumn = &pBuilder->pTSchema->columns[iCol];
if (pBuilder->pTColumn->colId == cid) break;
} else if (pTColumn->colId > cid) { // left search
for (iCol = pBuilder->iCol - 1; iCol >= 0; iCol--) {
pTColumn = &pBuilder->pTSchema->columns[iCol];
if (pTColumn->colId <= cid) break;
}
}
// check
if (pBuilder->pTColumn->colId != cid || pBuilder->pTColumn->flags & COL_VAL_SET) {
if (pTColumn->colId != cid || COL_IS_SET(pTColumn->flags)) {
return -1;
}
pBuilder->iCol = iCol;
// set value
if (cid == 0) {
ASSERT(pData && nData == sizeof(TSKEY));
ASSERT(pData && nData == sizeof(TSKEY) && iCol == 0);
pBuilder->row.ts = *(TSKEY *)pData;
pTColumn->flags |= COL_SET_VAL;
} else {
if (pData) {
// ASSERT(!IS_NULL(pData));
// set VAL
// set tuple data
p = pBuilder->pTPBuf + pBuilder->pTColumn->offset;
if (IS_VAR_DATA_TYPE(pBuilder->pTColumn->type)) {
*(int32_t *)p = pBuilder->tpVLen;
pBuilder->row.flags |= TSROW_HAS_VAL;
pTColumn->flags |= COL_SET_VAL;
// encode the variant-length data
p = pBuilder->pTPBuf + pBuilder->pTSchema->flen + pBuilder->tpVLen;
pBuilder->tpVLen += tPutBinary(p, pData, nData);
/* KV */
if (1) { // avoid KV at some threshold (todo)
pTSKVRow->idx[pTSKVRow->nCols].cid = cid;
pTSKVRow->idx[pTSKVRow->nCols].offset = pBuilder->vlenKV;
p = pBuilder->pKVBuf + sizeof(STSKVRow) + sizeof(SKVIdx) * (pBuilder->pTSchema->numOfCols - 1) +
pBuilder->vlenKV;
if (IS_VAR_DATA_TYPE(pTColumn->type)) {
ASSERT(nData <= pTColumn->bytes);
pBuilder->vlenKV += tPutBinary(p, pData, nData);
} else {
ASSERT(nData == pTColumn->bytes);
memcpy(p, pData, nData);
pBuilder->vlenKV += nData;
}
}
/* TUPLE */
p = pBuilder->pTPBuf + pBuilder->szBitMap2 + pTColumn->offset;
if (IS_VAR_DATA_TYPE(pTColumn->type)) {
ASSERT(nData <= pTColumn->bytes);
*(int32_t *)p = pBuilder->vlenTP;
p = pBuilder->pTPBuf + pBuilder->szBitMap2 + pBuilder->pTSchema->flen + pBuilder->vlenTP;
pBuilder->vlenTP += tPutBinary(p, pData, nData);
} else {
ASSERT(nData == pTColumn->bytes);
memcpy(p, pData, nData);
}
// set kv data
p = pBuilder->pKVBuf + sizeof(SKVIdx) * pBuilder->nCols;
((SKVIdx *)p)->cid = cid;
((SKVIdx *)p)->offset = pBuilder->kvVLen;
p = pBuilder->pKVBuf + sizeof(SKVIdx) * pBuilder->pTSchema->numOfCols + pBuilder->kvVLen;
if (IS_VAR_DATA_TYPE(pBuilder->pTColumn->type)) {
pBuilder->kvVLen += tPutBinary(p, pData, nData);
} else {
memcpy(p, pData, nData);
pBuilder->kvVLen += nData;
}
} else {
// set NULL val
}
// set NULL
pBuilder->row.flags |= TSROW_HAS_NULL;
pTColumn->flags |= COL_SET_NULL;
pTSKVRow->idx[pTSKVRow->nCols].cid = cid;
pTSKVRow->idx[pTSKVRow->nCols].offset = -1;
}
pTSKVRow->nCols++;
}
pBuilder->pTColumn->flags |= COL_VAL_SET;
pBuilder->nCols++;
return 0;
}
static FORCE_INLINE int tSKVIdxCmprFn(const void *p1, const void *p2) {
SKVIdx *pKVIdx1 = (SKVIdx *)p1;
SKVIdx *pKVIdx2 = (SKVIdx *)p2;
if (pKVIdx1->cid > pKVIdx2->cid) {
return 1;
} else if (pKVIdx1->cid < pKVIdx2->cid) {
return -1;
}
return 0;
}
static void setBitMap(uint8_t *p, STSchema *pTSchema, uint8_t flags) {
int32_t bidx;
STColumn *pTColumn;
for (int32_t iCol = 1; iCol < pTSchema->numOfCols; iCol++) {
pTColumn = &pTSchema->columns[iCol];
bidx = iCol - 1;
switch (flags) {
case TSROW_HAS_NULL | TSROW_HAS_NONE:
if (pTColumn->flags & COL_SET_NULL) {
SET_BIT1(p, bidx, (uint8_t)1);
} else {
SET_BIT1(p, bidx, (uint8_t)0);
}
break;
case TSROW_HAS_VAL | TSROW_HAS_NULL | TSROW_HAS_NONE:
if (pTColumn->flags & COL_SET_NULL) {
SET_BIT2(p, bidx, (uint8_t)1);
} else if (pTColumn->flags & COL_SET_VAL) {
SET_BIT2(p, bidx, (uint8_t)2);
} else {
SET_BIT2(p, bidx, (uint8_t)0);
}
break;
default:
if (pTColumn->flags & COL_SET_VAL) {
SET_BIT1(p, bidx, (uint8_t)1);
} else {
SET_BIT1(p, bidx, (uint8_t)0);
}
break;
}
}
}
int32_t tTSRowBuilderGetRow(STSRowBuilder *pBuilder, const STSRow2 **ppRow) {
if ((pBuilder->pTSchema->columns[0].flags & COL_VAL_SET) == 0) {
int32_t nDataTP, nDataKV;
uint32_t flags;
STSKVRow *pTSKVRow = (STSKVRow *)pBuilder->pKVBuf;
int32_t nCols = pBuilder->pTSchema->numOfCols;
// error not set ts
if (!COL_IS_SET(pBuilder->pTSchema->columns->flags)) {
return -1;
}
if (pBuilder->nCols * sizeof(SKVIdx) + pBuilder->kvVLen < pBuilder->pTSchema->flen + pBuilder->tpVLen) {
// encode as TD_KV_ROW
pBuilder->row.flags |= TD_KV_ROW;
pBuilder->row.ncols = pBuilder->nCols;
pBuilder->row.nData = pBuilder->nCols * sizeof(SKVIdx) + pBuilder->kvVLen;
ASSERT(pTSKVRow->nCols < nCols);
if (pTSKVRow->nCols < nCols - 1) {
pBuilder->row.flags |= TSROW_HAS_NONE;
}
ASSERT(pBuilder->row.flags & 0xf != 0);
*(ppRow) = &pBuilder->row;
switch (pBuilder->row.flags & 0xf) {
case TSROW_HAS_NONE:
case TSROW_HAS_NULL:
pBuilder->row.nData = 0;
pBuilder->row.pData = NULL;
return 0;
case TSROW_HAS_NULL | TSROW_HAS_NONE:
nDataTP = pBuilder->szBitMap1;
break;
case TSROW_HAS_VAL:
nDataTP = pBuilder->pTSchema->flen + pBuilder->vlenTP;
break;
case TSROW_HAS_VAL | TSROW_HAS_NONE:
case TSROW_HAS_VAL | TSROW_HAS_NULL:
nDataTP = pBuilder->szBitMap1 + pBuilder->pTSchema->flen + pBuilder->vlenTP;
break;
case TSROW_HAS_VAL | TSROW_HAS_NULL | TSROW_HAS_NONE:
nDataTP = pBuilder->szBitMap2 + pBuilder->pTSchema->flen + pBuilder->vlenTP;
break;
default:
ASSERT(0);
}
nDataKV = sizeof(STSKVRow) + sizeof(SKVIdx) * pTSKVRow->nCols + pBuilder->vlenKV;
pBuilder->row.sver = pBuilder->pTSchema->version;
if (nDataKV < nDataTP) {
// generate KV row
ASSERT(pBuilder->row.flags & 0xf != TSROW_HAS_VAL);
pBuilder->row.flags |= TSROW_KV_ROW;
pBuilder->row.nData = nDataKV;
pBuilder->row.pData = pBuilder->pKVBuf;
if (pBuilder->nCols < pBuilder->pTSchema->numOfCols) {
memmove(pBuilder->pKVBuf + sizeof(SKVIdx) * pBuilder->nCols,
pBuilder->pKVBuf + sizeof(SKVIdx) * pBuilder->pTSchema->numOfCols, pBuilder->kvVLen);
qsort(pTSKVRow->idx, pTSKVRow->nCols, sizeof(SKVIdx), tSKVIdxCmprFn);
if (pTSKVRow->nCols < nCols - 1) {
memmove(&pTSKVRow->idx[pTSKVRow->nCols], &pTSKVRow->idx[nCols - 1], pBuilder->vlenKV);
}
} else {
// encode as TD_TUPLE_ROW
pBuilder->row.flags &= (~TD_KV_ROW);
pBuilder->row.sver = pBuilder->pTSchema->version;
pBuilder->row.nData = pBuilder->pTSchema->flen + pBuilder->tpVLen;
// generate TUPLE row
pBuilder->row.nData = nDataTP;
uint8_t *p;
uint8_t flags = pBuilder->row.flags & 0xf;
if (flags == TSROW_HAS_VAL) {
pBuilder->row.pData = pBuilder->pTPBuf + pBuilder->szBitMap2;
} else {
if (flags == TSROW_HAS_VAL | TSROW_HAS_NULL | TSROW_HAS_NONE) {
pBuilder->row.pData = pBuilder->pTPBuf;
if (pBuilder->nCols < pBuilder->pTSchema->numOfCols) {
// set non-set cols as None
for (int32_t iCol = 1; iCol < pBuilder->pTSchema->numOfCols; iCol++) {
pBuilder->pTColumn = &pBuilder->pTSchema->columns[iCol];
if (pBuilder->pTColumn->flags & COL_VAL_SET) continue;
{
// set None (todo)
} else {
pBuilder->row.pData = pBuilder->pTPBuf + pBuilder->szBitMap2 - pBuilder->szBitMap1;
}
pBuilder->pTColumn->flags |= COL_VAL_SET;
}
setBitMap(pBuilder->row.pData, pBuilder->pTSchema, flags);
}
}
*ppRow = &pBuilder->row;
return 0;
}
#if 1 // ====================
static FORCE_INLINE int tTagIdxCmprFn(const void *p1, const void *p2) {
STagIdx *pTagIdx1 = (STagIdx *)p1;
STagIdx *pTagIdx2 = (STagIdx *)p2;
if (pTagIdx1->cid < pTagIdx1->cid) {
return -1;
} else if (pTagIdx1->cid > pTagIdx1->cid) {
return 1;
}
return 0;
}
int32_t tTagNew(STagVal *pTagVals, int16_t nTag, STag **ppTag) {
STagVal *pTagVal;
uint8_t *p;
int32_t n;
uint16_t tsize = sizeof(STag) + sizeof(STagIdx) * nTag;
for (int16_t iTag = 0; iTag < nTag; iTag++) {
pTagVal = &pTagVals[iTag];
if (IS_VAR_DATA_TYPE(pTagVal->type)) {
tsize += tPutBinary(NULL, pTagVal->pData, pTagVal->nData);
} else {
ASSERT(pTagVal->nData == TYPE_BYTES[pTagVal->type]);
tsize += pTagVal->nData;
}
}
(*ppTag) = (STag *)taosMemoryMalloc(tsize);
if (*ppTag == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
p = (uint8_t *)&((*ppTag)->idx[nTag]);
n = 0;
(*ppTag)->len = tsize;
(*ppTag)->nTag = nTag;
for (int16_t iTag = 0; iTag < nTag; iTag++) {
pTagVal = &pTagVals[iTag];
(*ppTag)->idx[iTag].cid = pTagVal->cid;
(*ppTag)->idx[iTag].offset = n;
if (IS_VAR_DATA_TYPE(pTagVal->type)) {
n += tPutBinary(p + n, pTagVal->pData, pTagVal->nData);
} else {
memcpy(p + n, pTagVal->pData, pTagVal->nData);
n += pTagVal->nData;
}
}
qsort((*ppTag)->idx, (*ppTag)->nTag, sizeof(STagIdx), tTagIdxCmprFn);
return 0;
}
void tTagFree(STag *pTag) {
if (pTag) taosMemoryFree(pTag);
}
void tTagGet(STag *pTag, int16_t cid, int8_t type, uint8_t **ppData, int32_t *nData) {
STagIdx *pTagIdx = bsearch(&((STagIdx){.cid = cid}), pTag->idx, pTag->nTag, sizeof(STagIdx), tTagIdxCmprFn);
if (pTagIdx == NULL) {
*ppData = NULL;
*nData = 0;
} else {
uint8_t *p = (uint8_t *)&pTag->idx[pTag->nTag] + pTagIdx->offset;
if (IS_VAR_DATA_TYPE(type)) {
tGetBinary(p, ppData, nData);
} else {
*ppData = p;
*nData = TYPE_BYTES[type];
}
}
}
int32_t tEncodeTag(SEncoder *pEncoder, STag *pTag) {
// return tEncodeBinary(pEncoder, (uint8_t *)pTag, pTag->len);
ASSERT(0);
return 0;
}
int32_t tDecodeTag(SDecoder *pDecoder, const STag **ppTag) {
// uint32_t n;
// return tDecodeBinary(pDecoder, (const uint8_t **)ppTag, &n);
ASSERT(0);
return 0;
}
#if 1 // ===================================================================================================================
static void dataColSetNEleNull(SDataCol *pCol, int nEle);
int tdAllocMemForCol(SDataCol *pCol, int maxPoints) {
int spaceNeeded = pCol->bytes * maxPoints;
@ -260,8 +622,8 @@ int tdAllocMemForCol(SDataCol *pCol, int maxPoints) {
spaceNeeded += (int)nBitmapBytes;
// TODO: Currently, the compression of bitmap parts is affiliated to the column data parts, thus allocate 1 more
// TYPE_BYTES as to comprise complete TYPE_BYTES. Otherwise, invalid read/write would be triggered.
// spaceNeeded += TYPE_BYTES[pCol->type]; // the bitmap part is append as a single part since 2022.04.03, thus remove
// the additional space
// spaceNeeded += TYPE_BYTES[pCol->type]; // the bitmap part is append as a single part since 2022.04.03, thus
// remove the additional space
#endif
if (pCol->spaceSize < spaceNeeded) {

View File

@ -21,9 +21,9 @@ static SMsgCb tsDefaultMsgCb;
void tmsgSetDefaultMsgCb(const SMsgCb* pMsgCb) { tsDefaultMsgCb = *pMsgCb; }
int32_t tmsgPutToQueue(const SMsgCb* pMsgCb, EQueueType qtype, SRpcMsg* pReq) {
int32_t tmsgPutToQueue(const SMsgCb* pMsgCb, EQueueType qtype, SRpcMsg* pMsg) {
PutToQueueFp fp = pMsgCb->queueFps[qtype];
return (*fp)(pMsgCb->mgmt, pReq);
return (*fp)(pMsgCb->mgmt, pMsg);
}
int32_t tmsgGetQueueSize(const SMsgCb* pMsgCb, int32_t vgId, EQueueType qtype) {
@ -31,17 +31,17 @@ int32_t tmsgGetQueueSize(const SMsgCb* pMsgCb, int32_t vgId, EQueueType qtype) {
return (*fp)(pMsgCb->mgmt, vgId, qtype);
}
int32_t tmsgSendReq(const SEpSet* epSet, SRpcMsg* pReq) {
int32_t tmsgSendReq(const SEpSet* epSet, SRpcMsg* pMsg) {
SendReqFp fp = tsDefaultMsgCb.sendReqFp;
return (*fp)(epSet, pReq);
return (*fp)(epSet, pMsg);
}
void tmsgSendRsp(const SRpcMsg* pMsg) {
void tmsgSendRsp(SRpcMsg* pMsg) {
SendRspFp fp = tsDefaultMsgCb.sendRspFp;
return (*fp)(pMsg);
}
void tmsgSendRedirectRsp(const SRpcMsg* pMsg, const SEpSet* pNewEpSet) {
void tmsgSendRedirectRsp(SRpcMsg* pMsg, const SEpSet* pNewEpSet) {
SendRedirectRspFp fp = tsDefaultMsgCb.sendRedirectRspFp;
(*fp)(pMsg, pNewEpSet);
}

View File

@ -374,39 +374,135 @@ char getPrecisionUnit(int32_t precision) {
}
int64_t convertTimePrecision(int64_t time, int32_t fromPrecision, int32_t toPrecision) {
assert(fromPrecision == TSDB_TIME_PRECISION_MILLI || fromPrecision == TSDB_TIME_PRECISION_MICRO ||
assert(fromPrecision == TSDB_TIME_PRECISION_MILLI ||
fromPrecision == TSDB_TIME_PRECISION_MICRO ||
fromPrecision == TSDB_TIME_PRECISION_NANO);
assert(toPrecision == TSDB_TIME_PRECISION_MILLI || toPrecision == TSDB_TIME_PRECISION_MICRO ||
assert(toPrecision == TSDB_TIME_PRECISION_MILLI ||
toPrecision == TSDB_TIME_PRECISION_MICRO ||
toPrecision == TSDB_TIME_PRECISION_NANO);
static double factors[3][3] = {{1., 1000., 1000000.}, {1.0 / 1000, 1., 1000.}, {1.0 / 1000000, 1.0 / 1000, 1.}};
return (int64_t)((double)time * factors[fromPrecision][toPrecision]);
double tempResult = (double)time;
switch(fromPrecision) {
case TSDB_TIME_PRECISION_MILLI: {
switch (toPrecision) {
case TSDB_TIME_PRECISION_MILLI:
return time;
case TSDB_TIME_PRECISION_MICRO:
tempResult *= 1000;
time *= 1000;
goto end_;
case TSDB_TIME_PRECISION_NANO:
tempResult *= 1000000;
time *= 1000000;
goto end_;
}
} // end from milli
case TSDB_TIME_PRECISION_MICRO: {
switch (toPrecision) {
case TSDB_TIME_PRECISION_MILLI:
return time / 1000;
case TSDB_TIME_PRECISION_MICRO:
return time;
case TSDB_TIME_PRECISION_NANO:
tempResult *= 1000;
time *= 1000;
goto end_;
}
} //end from micro
case TSDB_TIME_PRECISION_NANO: {
switch (toPrecision) {
case TSDB_TIME_PRECISION_MILLI:
return time / 1000000;
case TSDB_TIME_PRECISION_MICRO:
return time / 1000;
case TSDB_TIME_PRECISION_NANO:
return time;
}
} //end from nano
default: {
assert(0);
return time; // only to pass windows compilation
}
} //end switch fromPrecision
end_:
if (tempResult >= (double)INT64_MAX) return INT64_MAX;
if (tempResult <= (double)INT64_MIN) return INT64_MIN; // INT64_MIN means NULL
return time;
}
// !!!!notice:there are precision problems, double lose precison if time is too large, for example: 1626006833631000000*1.0 = double = 1626006833631000064
//int64_t convertTimePrecision(int64_t time, int32_t fromPrecision, int32_t toPrecision) {
// assert(fromPrecision == TSDB_TIME_PRECISION_MILLI || fromPrecision == TSDB_TIME_PRECISION_MICRO ||
// fromPrecision == TSDB_TIME_PRECISION_NANO);
// assert(toPrecision == TSDB_TIME_PRECISION_MILLI || toPrecision == TSDB_TIME_PRECISION_MICRO ||
// toPrecision == TSDB_TIME_PRECISION_NANO);
// static double factors[3][3] = {{1., 1000., 1000000.}, {1.0 / 1000, 1., 1000.}, {1.0 / 1000000, 1.0 / 1000, 1.}};
// ((double)time * factors[fromPrecision][toPrecision]);
//}
// !!!!notice: double lose precison if time is too large, for example: 1626006833631000000*1.0 = double = 1626006833631000064
int64_t convertTimeFromPrecisionToUnit(int64_t time, int32_t fromPrecision, char toUnit) {
assert(fromPrecision == TSDB_TIME_PRECISION_MILLI || fromPrecision == TSDB_TIME_PRECISION_MICRO ||
fromPrecision == TSDB_TIME_PRECISION_NANO);
static double factors[3] = {1000000., 1000., 1.};
int64_t factors[3] = {NANOSECOND_PER_MSEC, NANOSECOND_PER_USEC, 1};
double tmp = time;
switch (toUnit) {
case 's':
return time * factors[fromPrecision] / NANOSECOND_PER_SEC;
case 's':{
tmp /= (NANOSECOND_PER_SEC/factors[fromPrecision]); // the result of division is an integer
time /= (NANOSECOND_PER_SEC/factors[fromPrecision]);
break;
}
case 'm':
return time * factors[fromPrecision] / NANOSECOND_PER_MINUTE;
tmp /= (NANOSECOND_PER_MINUTE/factors[fromPrecision]); // the result of division is an integer
time /= (NANOSECOND_PER_MINUTE/factors[fromPrecision]);
break;
case 'h':
return time * factors[fromPrecision] / NANOSECOND_PER_HOUR;
tmp /= (NANOSECOND_PER_HOUR/factors[fromPrecision]); // the result of division is an integer
time /= (NANOSECOND_PER_HOUR/factors[fromPrecision]);
break;
case 'd':
return time * factors[fromPrecision] / NANOSECOND_PER_DAY;
tmp /= (NANOSECOND_PER_DAY/factors[fromPrecision]); // the result of division is an integer
time /= (NANOSECOND_PER_DAY/factors[fromPrecision]);
break;
case 'w':
return time * factors[fromPrecision] / NANOSECOND_PER_WEEK;
tmp /= (NANOSECOND_PER_WEEK/factors[fromPrecision]); // the result of division is an integer
time /= (NANOSECOND_PER_WEEK/factors[fromPrecision]);
break;
case 'a':
return time * factors[fromPrecision] / NANOSECOND_PER_MSEC;
tmp /= (NANOSECOND_PER_MSEC/factors[fromPrecision]); // the result of division is an integer
time /= (NANOSECOND_PER_MSEC/factors[fromPrecision]);
break;
case 'u':
return time * factors[fromPrecision] / NANOSECOND_PER_USEC;
// the result of (NANOSECOND_PER_USEC/(double)factors[fromPrecision]) maybe a double
switch (fromPrecision) {
case TSDB_TIME_PRECISION_MILLI:{
tmp *= 1000;
time *= 1000;
break;
}
case TSDB_TIME_PRECISION_MICRO:{
tmp /= 1;
time /= 1;
break;
}
case TSDB_TIME_PRECISION_NANO:{
tmp /= 1000;
time /= 1000;
break;
}
}
break;
case 'b':
return time * factors[fromPrecision];
tmp *= factors[fromPrecision];
time *= factors[fromPrecision];
break;
default: {
return -1;
}
}
if (tmp >= (double)INT64_MAX) return INT64_MAX;
if (tmp <= (double)INT64_MIN) return INT64_MIN;
return time;
}
int32_t convertStringToTimestamp(int16_t type, char *inputData, int64_t timePrec, int64_t *timeVal) {

View File

@ -17,6 +17,15 @@ TARGET_INCLUDE_DIRECTORIES(
PRIVATE "${TD_SOURCE_DIR}/source/libs/common/inc"
)
# dataformatTest.cpp
add_executable(dataformatTest "")
target_sources(
dataformatTest
PRIVATE
"dataformatTest.cpp"
)
target_link_libraries(dataformatTest gtest gtest_main util)
# tmsg test
# add_executable(tmsgTest "")
# target_sources(tmsgTest

View File

@ -0,0 +1 @@
#include "gtest/gtest.h"

View File

@ -1,23 +0,0 @@
#include <gtest/gtest.h>
#include "trow.h"
TEST(td_row_test, build_row_to_target) {
#if 0
char dst[1024];
SRow* pRow = (SRow*)dst;
int ncols = 10;
col_id_t cid;
void* pData;
SRowBuilder rb = trbInit(TD_OR_ROW_BUILDER, NULL, 0, pRow, 1024);
trbSetRowInfo(&rb, false, 0);
trbSetRowTS(&rb, 1637550210000);
for (int c = 0; c < ncols; c++) {
cid = c;
if (trbWriteCol(&rb, pData, cid) < 0) {
// TODO
}
}
#endif
}

View File

@ -1,14 +0,0 @@
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/

View File

@ -1,14 +0,0 @@
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/

View File

@ -1,14 +0,0 @@
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/

View File

@ -1,14 +0,0 @@
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/

View File

@ -1,14 +0,0 @@
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/

View File

@ -1,14 +0,0 @@
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/

View File

@ -1,14 +0,0 @@
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/

View File

@ -1,14 +0,0 @@
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/

View File

@ -1,14 +0,0 @@
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/

View File

@ -1,14 +0,0 @@
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/

View File

@ -1,14 +0,0 @@
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/

View File

@ -1,14 +0,0 @@
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/

View File

@ -1,14 +0,0 @@
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/

View File

@ -1,14 +0,0 @@
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/

View File

@ -1,14 +0,0 @@
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/

View File

@ -1,14 +0,0 @@
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/

View File

@ -1,14 +0,0 @@
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/

View File

@ -1,14 +0,0 @@
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/

View File

@ -1,14 +0,0 @@
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/

View File

@ -1,14 +0,0 @@
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/

View File

@ -1,14 +0,0 @@
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/

View File

@ -75,6 +75,7 @@ int32_t dmStartStatusThread(SDnodeMgmt *pMgmt) {
void dmStopStatusThread(SDnodeMgmt *pMgmt) {
if (taosCheckPthreadValid(pMgmt->statusThread)) {
taosThreadJoin(pMgmt->statusThread, NULL);
taosThreadClear(&pMgmt->statusThread);
}
}
@ -95,6 +96,7 @@ int32_t dmStartMonitorThread(SDnodeMgmt *pMgmt) {
void dmStopMonitorThread(SDnodeMgmt *pMgmt) {
if (taosCheckPthreadValid(pMgmt->monitorThread)) {
taosThreadJoin(pMgmt->monitorThread, NULL);
taosThreadClear(&pMgmt->monitorThread);
}
}

View File

@ -196,6 +196,7 @@ static int32_t vmOpenVnodes(SVnodeMgmt *pMgmt) {
SVnodeThread *pThread = &threads[t];
if (pThread->vnodeNum > 0 && taosCheckPthreadValid(pThread->thread)) {
taosThreadJoin(pThread->thread, NULL);
taosThreadClear(&pThread->thread);
}
taosMemoryFree(pThread->pCfgs);
}

View File

@ -151,12 +151,10 @@ int32_t dmInitProc(struct SMgmtWrapper *pWrapper);
void dmCleanupProc(struct SMgmtWrapper *pWrapper);
int32_t dmRunProc(SProc *proc);
void dmStopProc(SProc *proc);
int64_t dmRemoveProcRpcHandle(SProc *proc, void *handle);
void dmRemoveProcRpcHandle(SProc *proc, void *handle);
void dmCloseProcRpcHandles(SProc *proc);
int32_t dmPutToProcCQueue(SProc *proc, const void *pHead, int16_t headLen, const void *pBody, int32_t bodyLen,
void *handle, int64_t handleRef, EProcFuncType ftype);
void dmPutToProcPQueue(SProc *proc, const void *pHead, int16_t headLen, const void *pBody, int32_t bodyLen,
EProcFuncType ftype);
int32_t dmPutToProcCQueue(SProc *proc, SRpcMsg *pMsg, EProcFuncType ftype);
void dmPutToProcPQueue(SProc *proc, SRpcMsg *pMsg, EProcFuncType ftype);
// dmTransport.c
int32_t dmInitServer(SDnode *pDnode);

View File

@ -16,10 +16,7 @@
#define _DEFAULT_SOURCE
#include "dmMgmt.h"
static inline int32_t CEIL8(int32_t v) {
const int32_t c = ceil((float)(v) / 8) * 8;
return c < 8 ? 8 : c;
}
static inline int32_t CEIL8(int32_t v) { return ceil((float)(v) / 8) * 8; }
static int32_t dmInitProcMutex(SProcQueue *queue) {
TdThreadMutexAttr mattr = {0};
@ -87,42 +84,17 @@ static SProcQueue *dmInitProcQueue(SProc *proc, char *ptr, int32_t size) {
return queue;
}
#if 0
static void dmDestroyProcQueue(SProcQueue *queue) {
if (queue->mutex != NULL) {
taosThreadMutexDestroy(queue->mutex);
queue->mutex = NULL;
}
}
static void dmCleanupProcQueue(SProcQueue *queue) {}
static void dmDestroyProcSem(SProcQueue *queue) {
if (queue->sem != NULL) {
tsem_destroy(queue->sem);
queue->sem = NULL;
}
}
#endif
static void dmCleanupProcQueue(SProcQueue *queue) {
#if 0
if (queue != NULL) {
dmDestroyProcQueue(queue);
dmDestroyProcSem(queue);
}
#endif
}
static int32_t dmPushToProcQueue(SProc *proc, SProcQueue *queue, const char *pHead, int16_t rawHeadLen,
const char *pBody, int32_t rawBodyLen, int64_t handle, int64_t handleRef,
EProcFuncType ftype) {
if (rawHeadLen == 0 || pHead == NULL) {
terrno = TSDB_CODE_INVALID_PARA;
return -1;
}
const int32_t headLen = CEIL8(rawHeadLen);
static inline int32_t dmPushToProcQueue(SProc *proc, SProcQueue *queue, SRpcMsg *pMsg, EProcFuncType ftype) {
const void *pHead = pMsg;
const void *pBody = pMsg->pCont;
const int16_t rawHeadLen = sizeof(SRpcMsg);
const int32_t rawBodyLen = pMsg->contLen;
const int16_t headLen = CEIL8(rawHeadLen);
const int32_t bodyLen = CEIL8(rawBodyLen);
const int32_t fullLen = headLen + bodyLen + 8;
const int64_t handle = (int64_t)pMsg->info.handle;
taosThreadMutexLock(&queue->mutex);
if (fullLen > queue->avail) {
@ -131,8 +103,8 @@ static int32_t dmPushToProcQueue(SProc *proc, SProcQueue *queue, const char *pHe
return -1;
}
if (handle != 0 && ftype == DND_FUNC_REQ) {
if (taosHashPut(proc->hash, &handle, sizeof(int64_t), &handleRef, sizeof(int64_t)) != 0) {
if (ftype == DND_FUNC_REQ && IsReq(pMsg) && pMsg->code == 0 && handle != 0) {
if (taosHashPut(proc->hash, &handle, sizeof(int64_t), &pMsg->info, sizeof(SRpcConnInfo)) != 0) {
taosThreadMutexUnlock(&queue->mutex);
return -1;
}
@ -151,31 +123,31 @@ static int32_t dmPushToProcQueue(SProc *proc, SProcQueue *queue, const char *pHe
if (queue->tail < queue->head) {
memcpy(queue->pBuffer + queue->tail + 8, pHead, rawHeadLen);
memcpy(queue->pBuffer + queue->tail + 8 + headLen, pBody, rawBodyLen);
if (rawBodyLen > 0) memcpy(queue->pBuffer + queue->tail + 8 + headLen, pBody, rawBodyLen);
queue->tail = queue->tail + 8 + headLen + bodyLen;
} else {
int32_t remain = queue->total - queue->tail;
if (remain == 0) {
memcpy(queue->pBuffer + 8, pHead, rawHeadLen);
memcpy(queue->pBuffer + 8 + headLen, pBody, rawBodyLen);
if (rawBodyLen > 0) memcpy(queue->pBuffer + 8 + headLen, pBody, rawBodyLen);
queue->tail = 8 + headLen + bodyLen;
} else if (remain == 8) {
memcpy(queue->pBuffer, pHead, rawHeadLen);
memcpy(queue->pBuffer + headLen, pBody, rawBodyLen);
if (rawBodyLen > 0) memcpy(queue->pBuffer + headLen, pBody, rawBodyLen);
queue->tail = headLen + bodyLen;
} else if (remain < 8 + headLen) {
memcpy(queue->pBuffer + queue->tail + 8, pHead, remain - 8);
memcpy(queue->pBuffer, pHead + remain - 8, rawHeadLen - (remain - 8));
memcpy(queue->pBuffer + headLen - (remain - 8), pBody, rawBodyLen);
if (rawBodyLen > 0) memcpy(queue->pBuffer + headLen - (remain - 8), pBody, rawBodyLen);
queue->tail = headLen - (remain - 8) + bodyLen;
} else if (remain < 8 + headLen + bodyLen) {
memcpy(queue->pBuffer + queue->tail + 8, pHead, rawHeadLen);
memcpy(queue->pBuffer + queue->tail + 8 + headLen, pBody, remain - 8 - headLen);
memcpy(queue->pBuffer, pBody + remain - 8 - headLen, rawBodyLen - (remain - 8 - headLen));
if (rawBodyLen > 0) memcpy(queue->pBuffer + queue->tail + 8 + headLen, pBody, remain - 8 - headLen);
if (rawBodyLen > 0) memcpy(queue->pBuffer, pBody + remain - 8 - headLen, rawBodyLen - (remain - 8 - headLen));
queue->tail = bodyLen - (remain - 8 - headLen);
} else {
memcpy(queue->pBuffer + queue->tail + 8, pHead, rawHeadLen);
memcpy(queue->pBuffer + queue->tail + headLen + 8, pBody, rawBodyLen);
if (rawBodyLen > 0) memcpy(queue->pBuffer + queue->tail + headLen + 8, pBody, rawBodyLen);
queue->tail = queue->tail + headLen + bodyLen + 8;
}
}
@ -185,13 +157,12 @@ static int32_t dmPushToProcQueue(SProc *proc, SProcQueue *queue, const char *pHe
taosThreadMutexUnlock(&queue->mutex);
tsem_post(&queue->sem);
dTrace("node:%s, push %s msg:%p:%d cont:%p:%d, pos:%d remain:%d", queue->name, dmFuncStr(ftype), pHead, headLen,
pBody, bodyLen, pos, queue->items);
dTrace("node:%s, push %s msg:%p type:%d handle:%p len:%d code:0x%x, pos:%d remain:%d", queue->name, dmFuncStr(ftype),
pMsg, pMsg->msgType, pMsg->info.handle, pMsg->contLen, pMsg->code, pos, queue->items);
return 0;
}
static int32_t dmPopFromProcQueue(SProcQueue *queue, void **ppHead, int16_t *pHeadLen, void **ppBody, int32_t *pBodyLen,
EProcFuncType *pFuncType) {
static int32_t dmPopFromProcQueue(SProcQueue *queue, SRpcMsg **ppMsg, EProcFuncType *pFuncType) {
tsem_wait(&queue->sem);
taosThreadMutexLock(&queue->mutex);
@ -217,8 +188,9 @@ static int32_t dmPopFromProcQueue(SProcQueue *queue, void **ppHead, int16_t *pHe
int32_t bodyLen = CEIL8(rawBodyLen);
void *pHead = taosAllocateQitem(headLen, DEF_QITEM);
void *pBody = rpcMallocCont(bodyLen);
if (pHead == NULL || pBody == NULL) {
void *pBody = NULL;
if (bodyLen > 0) pBody = rpcMallocCont(bodyLen);
if (pHead == NULL || (bodyLen > 0 && pBody == NULL)) {
taosThreadMutexUnlock(&queue->mutex);
tsem_post(&queue->sem);
taosFreeQitem(pHead);
@ -230,31 +202,31 @@ static int32_t dmPopFromProcQueue(SProcQueue *queue, void **ppHead, int16_t *pHe
const int32_t pos = queue->head;
if (queue->head < queue->tail) {
memcpy(pHead, queue->pBuffer + queue->head + 8, headLen);
memcpy(pBody, queue->pBuffer + queue->head + 8 + headLen, bodyLen);
if (bodyLen > 0) memcpy(pBody, queue->pBuffer + queue->head + 8 + headLen, bodyLen);
queue->head = queue->head + 8 + headLen + bodyLen;
} else {
int32_t remain = queue->total - queue->head;
if (remain == 0) {
memcpy(pHead, queue->pBuffer + 8, headLen);
memcpy(pBody, queue->pBuffer + 8 + headLen, bodyLen);
if (bodyLen > 0) memcpy(pBody, queue->pBuffer + 8 + headLen, bodyLen);
queue->head = 8 + headLen + bodyLen;
} else if (remain == 8) {
memcpy(pHead, queue->pBuffer, headLen);
memcpy(pBody, queue->pBuffer + headLen, bodyLen);
if (bodyLen > 0) memcpy(pBody, queue->pBuffer + headLen, bodyLen);
queue->head = headLen + bodyLen;
} else if (remain < 8 + headLen) {
memcpy(pHead, queue->pBuffer + queue->head + 8, remain - 8);
memcpy((char *)pHead + remain - 8, queue->pBuffer, headLen - (remain - 8));
memcpy(pBody, queue->pBuffer + headLen - (remain - 8), bodyLen);
if (bodyLen > 0) memcpy(pBody, queue->pBuffer + headLen - (remain - 8), bodyLen);
queue->head = headLen - (remain - 8) + bodyLen;
} else if (remain < 8 + headLen + bodyLen) {
memcpy(pHead, queue->pBuffer + queue->head + 8, headLen);
memcpy(pBody, queue->pBuffer + queue->head + 8 + headLen, remain - 8 - headLen);
memcpy((char *)pBody + remain - 8 - headLen, queue->pBuffer, bodyLen - (remain - 8 - headLen));
if (bodyLen > 0) memcpy(pBody, queue->pBuffer + queue->head + 8 + headLen, remain - 8 - headLen);
if (bodyLen > 0) memcpy((char *)pBody + remain - 8 - headLen, queue->pBuffer, bodyLen - (remain - 8 - headLen));
queue->head = bodyLen - (remain - 8 - headLen);
} else {
memcpy(pHead, queue->pBuffer + queue->head + 8, headLen);
memcpy(pBody, queue->pBuffer + queue->head + headLen + 8, bodyLen);
if (bodyLen > 0) memcpy(pBody, queue->pBuffer + queue->head + headLen + 8, bodyLen);
queue->head = queue->head + headLen + bodyLen + 8;
}
}
@ -263,14 +235,12 @@ static int32_t dmPopFromProcQueue(SProcQueue *queue, void **ppHead, int16_t *pHe
queue->items--;
taosThreadMutexUnlock(&queue->mutex);
*ppHead = pHead;
*ppBody = pBody;
*pHeadLen = rawHeadLen;
*pBodyLen = rawBodyLen;
*ppMsg = pHead;
(*ppMsg)->pCont = pBody;
*pFuncType = (EProcFuncType)ftype;
dTrace("node:%s, pop %s msg:%p:%d cont:%p:%d, pos:%d remain:%d", queue->name, dmFuncStr(ftype), pHead, headLen, pBody,
bodyLen, pos, queue->items);
dTrace("node:%s, pop %s msg:%p type:%d handle:%p len:%d code:0x%x, pos:%d remain:%d", queue->name, dmFuncStr(ftype),
(*ppMsg), (*ppMsg)->msgType, (*ppMsg)->info.handle, (*ppMsg)->contLen, (*ppMsg)->code, pos, queue->items);
return 1;
}
@ -308,18 +278,14 @@ static void *dmConsumChildQueue(void *param) {
SProc *proc = param;
SMgmtWrapper *pWrapper = proc->wrapper;
SProcQueue *queue = proc->cqueue;
void *pHead = NULL;
void *pBody = NULL;
int16_t headLen = 0;
int32_t bodyLen = 0;
int32_t numOfMsgs = 0;
int32_t code = 0;
EProcFuncType ftype = DND_FUNC_REQ;
SRpcMsg *pReq = NULL;
SRpcMsg *pMsg = NULL;
dDebug("node:%s, start to consume from cqueue", proc->name);
do {
numOfMsgs = dmPopFromProcQueue(queue, &pHead, &headLen, &pBody, &bodyLen, &ftype);
numOfMsgs = dmPopFromProcQueue(queue, &pMsg, &ftype);
if (numOfMsgs == 0) {
dDebug("node:%s, get no msg from cqueue and exit thread", proc->name);
break;
@ -332,25 +298,24 @@ static void *dmConsumChildQueue(void *param) {
}
if (ftype != DND_FUNC_REQ) {
dFatal("node:%s, get msg:%p from cqueue, invalid ftype:%d", proc->name, pHead, ftype);
taosFreeQitem(pHead);
rpcFreeCont(pBody);
} else {
pReq = pHead;
pReq->pCont = pBody;
code = dmProcessNodeMsg(pWrapper, pReq);
if (code != 0) {
dError("node:%s, failed to process msg:%p since %s, put into pqueue", proc->name, pReq, terrstr());
SRpcMsg rspMsg = {
.info = pReq->info,
.pCont = pReq->info.rsp,
.contLen = pReq->info.rspLen,
};
dmPutToProcPQueue(proc, &rspMsg, sizeof(SRpcMsg), rspMsg.pCont, rspMsg.contLen, DND_FUNC_RSP);
taosFreeQitem(pHead);
rpcFreeCont(pBody);
rpcFreeCont(rspMsg.pCont);
dError("node:%s, invalid ftype:%d from cqueue", proc->name, ftype);
rpcFreeCont(pMsg->pCont);
taosFreeQitem(pMsg);
continue;
}
code = dmProcessNodeMsg(pWrapper, pMsg);
if (code != 0) {
dError("node:%s, failed to process msg:%p since %s, put into pqueue", proc->name, pMsg, terrstr());
SRpcMsg rsp = {
.code = (terrno != 0 ? terrno : code),
.pCont = pMsg->info.rsp,
.contLen = pMsg->info.rspLen,
.info = pMsg->info,
};
dmPutToProcPQueue(proc, &rsp, DND_FUNC_RSP);
rpcFreeCont(pMsg->pCont);
taosFreeQitem(pMsg);
}
} while (1);
@ -361,18 +326,14 @@ static void *dmConsumParentQueue(void *param) {
SProc *proc = param;
SMgmtWrapper *pWrapper = proc->wrapper;
SProcQueue *queue = proc->pqueue;
void *pHead = NULL;
void *pBody = NULL;
int16_t headLen = 0;
int32_t bodyLen = 0;
int32_t numOfMsgs = 0;
int32_t code = 0;
EProcFuncType ftype = DND_FUNC_REQ;
SRpcMsg *pRsp = NULL;
SRpcMsg *pMsg = NULL;
dDebug("node:%s, start to consume from pqueue", proc->name);
do {
numOfMsgs = dmPopFromProcQueue(queue, &pHead, &headLen, &pBody, &bodyLen, &ftype);
numOfMsgs = dmPopFromProcQueue(queue, &pMsg, &ftype);
if (numOfMsgs == 0) {
dDebug("node:%s, get no msg from pqueue and exit thread", proc->name);
break;
@ -385,31 +346,19 @@ static void *dmConsumParentQueue(void *param) {
}
if (ftype == DND_FUNC_RSP) {
pRsp = pHead;
pRsp->pCont = pBody;
dTrace("node:%s, get rsp msg:%p from pqueue, code:0x%04x handle:%p", proc->name, pRsp, code, pRsp->info.handle);
dmRemoveProcRpcHandle(proc, pRsp->info.handle);
rpcSendResponse(pRsp);
dmRemoveProcRpcHandle(proc, pMsg->info.handle);
rpcSendResponse(pMsg);
} else if (ftype == DND_FUNC_REGIST) {
pRsp = pHead;
pRsp->pCont = pBody;
dTrace("node:%s, get regist msg:%p from pqueue, code:0x%04x handle:%p", proc->name, pRsp, code,
pRsp->info.handle);
rpcRegisterBrokenLinkArg(pRsp);
rpcRegisterBrokenLinkArg(pMsg);
} else if (ftype == DND_FUNC_RELEASE) {
pRsp = pHead;
pRsp->pCont = NULL;
dTrace("node:%s, get release msg:%p from pqueue, code:0x%04x handle:%p", proc->name, pRsp, code,
pRsp->info.handle);
dmRemoveProcRpcHandle(proc, pRsp->info.handle);
rpcReleaseHandle(pRsp->info.handle, (int8_t)pRsp->code);
rpcFreeCont(pBody);
dmRemoveProcRpcHandle(proc, pMsg->info.handle);
rpcReleaseHandle(pMsg->info.handle, (int8_t)pMsg->code);
} else {
dFatal("node:%s, get msg:%p from pqueue, invalid ftype:%d", proc->name, pHead, ftype);
rpcFreeCont(pBody);
dError("node:%s, invalid ftype:%d from pqueue", proc->name, ftype);
rpcFreeCont(pMsg->pCont);
}
taosFreeQitem(pHead);
taosFreeQitem(pMsg);
} while (1);
return NULL;
@ -468,51 +417,55 @@ void dmCleanupProc(struct SMgmtWrapper *pWrapper) {
dmCleanupProcQueue(proc->cqueue);
dmCleanupProcQueue(proc->pqueue);
taosHashCleanup(proc->hash);
proc->hash = NULL;
dDebug("node:%s, proc is cleaned up", pWrapper->name);
}
int64_t dmRemoveProcRpcHandle(SProc *proc, void *handle) {
void dmRemoveProcRpcHandle(SProc *proc, void *handle) {
int64_t h = (int64_t)handle;
taosThreadMutexLock(&proc->cqueue->mutex);
int64_t *pRef = taosHashGet(proc->hash, &h, sizeof(int64_t));
int64_t ref = 0;
if (pRef != NULL) {
ref = *pRef;
}
taosHashRemove(proc->hash, &h, sizeof(int64_t));
taosThreadMutexUnlock(&proc->cqueue->mutex);
return ref;
}
void dmCloseProcRpcHandles(SProc *proc) {
taosThreadMutexLock(&proc->cqueue->mutex);
void *h = taosHashIterate(proc->hash, NULL);
while (h != NULL) {
void *handle = *((void **)h);
h = taosHashIterate(proc->hash, h);
dError("node:%s, the child process dies and send an offline rsp to handle:%p", proc->name, handle);
SRpcMsg rpcMsg = {.info.handle = handle, .code = TSDB_CODE_NODE_OFFLINE};
SRpcHandleInfo *pInfo = taosHashIterate(proc->hash, NULL);
while (pInfo != NULL) {
dError("node:%s, the child process dies and send an offline rsp to handle:%p", proc->name, pInfo->handle);
SRpcMsg rpcMsg = {.info = *pInfo, .code = TSDB_CODE_NODE_OFFLINE};
rpcSendResponse(&rpcMsg);
pInfo = taosHashIterate(proc->hash, pInfo);
}
taosHashClear(proc->hash);
taosThreadMutexUnlock(&proc->cqueue->mutex);
}
void dmPutToProcPQueue(SProc *proc, const void *pHead, int16_t headLen, const void *pBody, int32_t bodyLen,
EProcFuncType ftype) {
void dmPutToProcPQueue(SProc *proc, SRpcMsg *pMsg, EProcFuncType ftype) {
int32_t retry = 0;
while (dmPushToProcQueue(proc, proc->pqueue, pHead, headLen, pBody, bodyLen, 0, 0, ftype) != 0) {
dWarn("node:%s, failed to put msg:%p to pqueue since %s, retry:%d", proc->name, pHead, terrstr(), retry);
while (1) {
if (dmPushToProcQueue(proc, proc->pqueue, pMsg, ftype) == 0) {
break;
}
if (retry == 10) {
pMsg->code = terrno;
if (pMsg->contLen > 0) {
rpcFreeCont(pMsg->pCont);
pMsg->pCont = NULL;
pMsg->contLen = 0;
}
dError("node:%s, failed to push %s msg:%p type:%d handle:%p then discard data and return error", proc->name,
dmFuncStr(ftype), pMsg, pMsg->msgType, pMsg->info.handle);
} else {
dError("node:%s, failed to push %s msg:%p type:%d handle:%p len:%d since %s, retry:%d", proc->name,
dmFuncStr(ftype), pMsg, pMsg->msgType, pMsg->info.handle, pMsg->contLen, terrstr(), retry);
retry++;
taosMsleep(retry);
}
}
int32_t dmPutToProcCQueue(SProc *proc, const void *pHead, int16_t headLen, const void *pBody, int32_t bodyLen,
void *handle, int64_t ref, EProcFuncType ftype) {
return dmPushToProcQueue(proc, proc->cqueue, pHead, headLen, pBody, bodyLen, (int64_t)handle, ref, ftype);
}
int32_t dmPutToProcCQueue(SProc *proc, SRpcMsg *pMsg, EProcFuncType ftype) {
return dmPushToProcQueue(proc, proc->cqueue, pMsg, ftype);
}

View File

@ -128,9 +128,7 @@ static void dmProcessRpcMsg(SDnode *pDnode, SRpcMsg *pRpc, SEpSet *pEpSet) {
}
if (InParentProc(pWrapper)) {
code = dmPutToProcCQueue(&pWrapper->proc, pMsg, sizeof(SRpcMsg), pRpc->pCont, pRpc->contLen,
(IsReq(pRpc) && (pRpc->code == 0)) ? pRpc->info.handle : NULL, pRpc->info.refId,
DND_FUNC_REQ);
code = dmPutToProcCQueue(&pWrapper->proc, pMsg, DND_FUNC_REQ);
} else {
code = dmProcessNodeMsg(pWrapper, pMsg);
}
@ -255,23 +253,23 @@ static inline int32_t dmSendReq(const SEpSet *pEpSet, SRpcMsg *pReq) {
}
}
static inline void dmSendRsp(const SRpcMsg *pMsg) {
static inline void dmSendRsp(SRpcMsg *pMsg) {
SMgmtWrapper *pWrapper = pMsg->info.wrapper;
if (InChildProc(pWrapper)) {
dmPutToProcPQueue(&pWrapper->proc, pMsg, sizeof(SRpcMsg), pMsg->pCont, pMsg->contLen, DND_FUNC_RSP);
} else {
if (pMsg->code == TSDB_CODE_NODE_REDIRECT) {
dmSendRpcRedirectRsp(pMsg);
} else {
if (InChildProc(pWrapper)) {
dmPutToProcPQueue(&pWrapper->proc, pMsg, DND_FUNC_RSP);
} else {
rpcSendResponse(pMsg);
}
}
}
static inline void dmSendRedirectRsp(const SRpcMsg *pRsp, const SEpSet *pNewEpSet) {
SMgmtWrapper *pWrapper = pRsp->info.wrapper;
static inline void dmSendRedirectRsp(SRpcMsg *pMsg, const SEpSet *pNewEpSet) {
SMgmtWrapper *pWrapper = pMsg->info.wrapper;
if (InChildProc(pWrapper)) {
dmPutToProcPQueue(&pWrapper->proc, pRsp, sizeof(SRpcMsg), pRsp->pCont, pRsp->contLen, DND_FUNC_RSP);
dmPutToProcPQueue(&pWrapper->proc, pMsg, DND_FUNC_RSP);
} else {
SRpcMsg rsp = {0};
SMEpSet msg = {.epSet = *pNewEpSet};
@ -281,7 +279,7 @@ static inline void dmSendRedirectRsp(const SRpcMsg *pRsp, const SEpSet *pNewEpSe
tSerializeSMEpSet(rsp.pCont, len, &msg);
rsp.code = TSDB_CODE_RPC_REDIRECT;
rsp.info = pRsp->info;
rsp.info = pMsg->info;
rpcSendResponse(&rsp);
}
}
@ -289,7 +287,7 @@ static inline void dmSendRedirectRsp(const SRpcMsg *pRsp, const SEpSet *pNewEpSe
static inline void dmRegisterBrokenLinkArg(SRpcMsg *pMsg) {
SMgmtWrapper *pWrapper = pMsg->info.wrapper;
if (InChildProc(pWrapper)) {
dmPutToProcPQueue(&pWrapper->proc, pMsg, sizeof(SRpcMsg), pMsg->pCont, pMsg->contLen, DND_FUNC_REGIST);
dmPutToProcPQueue(&pWrapper->proc, pMsg, DND_FUNC_REGIST);
} else {
rpcRegisterBrokenLinkArg(pMsg);
}
@ -299,7 +297,7 @@ static inline void dmReleaseHandle(SRpcHandleInfo *pHandle, int8_t type) {
SMgmtWrapper *pWrapper = pHandle->wrapper;
if (InChildProc(pWrapper)) {
SRpcMsg msg = {.code = type, .info = *pHandle};
dmPutToProcPQueue(&pWrapper->proc, &msg, sizeof(SRpcMsg), NULL, 0, DND_FUNC_RELEASE);
dmPutToProcPQueue(&pWrapper->proc, &msg, DND_FUNC_RELEASE);
} else {
rpcReleaseHandle(pHandle->handle, type);
}

View File

@ -122,7 +122,7 @@ static void mndCleanupTimer(SMnode *pMnode) {
pMnode->stopped = true;
if (taosCheckPthreadValid(pMnode->thread)) {
taosThreadJoin(pMnode->thread, NULL);
memset(&pMnode->thread, 0, sizeof(pMnode->thread));
taosThreadClear(&pMnode->thread);
}
}

View File

@ -16,10 +16,9 @@
#include "tcache.h"
void reportStartup(const char *name, const char *desc) {}
void sendRsp(const SRpcMsg *pMsg) { rpcFreeCont(pMsg->pCont); }
void sendRsp(SRpcMsg *pMsg) { rpcFreeCont(pMsg->pCont); }
int32_t sendReq(const SEpSet *pEpSet, SRpcMsg *pMsg) {
// rpcFreeCont(pMsg->pCont);
terrno = TSDB_CODE_INVALID_PTR;
return -1;
}

View File

@ -213,8 +213,9 @@ int32_t sdbGetRawBinary(SSdbRaw *pRaw, int32_t dataPos, char *pVal, int32_t valL
terrno = TSDB_CODE_SDB_INVALID_DATA_LEN;
return -1;
}
if (pVal != NULL) {
memcpy(pVal, pRaw->pData + dataPos, valLen);
}
return 0;
}

View File

@ -2497,7 +2497,7 @@ bool elapsedFunctionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResultInfo
pInfo->min = MAX_TS_KEY;
pInfo->max = 0;
if (pCtx->numOfParams == 3) {
if (pCtx->numOfParams == 2) {
pInfo->timeUnit = pCtx->param[1].param.i;
} else {
pInfo->timeUnit = 1;
@ -2521,7 +2521,6 @@ int32_t elapsedFunction(SqlFunctionCtx *pCtx) {
}
if (pInput->colDataAggIsSet) {
if (pInfo->min == MAX_TS_KEY) {
pInfo->min = GET_INT64_VAL(&pAgg->min);
pInfo->max = GET_INT64_VAL(&pAgg->max);

View File

@ -473,17 +473,16 @@ static int32_t tfSearchCompareFunc_JSON(void* reader, SIndexTerm* tem, SIdxTempR
int32_t sz = 0;
char* ch = (char*)fstSliceData(s, &sz);
// char* tmp = taosMemoryCalloc(1, sz + 1);
// memcpy(tmp, ch, sz);
char* tmp = taosMemoryCalloc(1, sz + 1);
memcpy(tmp, ch, sz);
if (0 != strncmp(ch, p, skip)) {
if (0 != strncmp(tmp, p, skip)) {
swsResultDestroy(rt);
// taosMemoryFree(tmp);
taosMemoryFree(tmp);
break;
}
TExeCond cond = cmpFn(ch + skip, tem->colVal, INDEX_TYPE_GET_TYPE(tem->colType));
TExeCond cond = cmpFn(tmp + skip, tem->colVal, INDEX_TYPE_GET_TYPE(tem->colType));
if (MATCH == cond) {
tfileReaderLoadTableIds((TFileReader*)reader, rt->out.out, tr->total);
} else if (CONTINUE == cond) {
@ -491,7 +490,7 @@ static int32_t tfSearchCompareFunc_JSON(void* reader, SIndexTerm* tem, SIdxTempR
swsResultDestroy(rt);
break;
}
// taosMemoryFree(tmp);
taosMemoryFree(tmp);
swsResultDestroy(rt);
}
streamWithStateDestroy(st);

View File

@ -578,3 +578,40 @@ TEST_F(JsonEnv, testWriteJsonTfileAndCache_FLOAT) {
EXPECT_EQ(1000, taosArrayGetSize(res));
}
}
TEST_F(JsonEnv, testWriteJsonTfileAndCache_DOUBLE) {
{
double val = 10.0;
std::string colName("test1");
for (int i = 0; i < 1000; i++) {
WriteData(index, colName, TSDB_DATA_TYPE_DOUBLE, &val, sizeof(val), i);
}
}
{
double val = 2.0;
std::string colName("test1");
for (int i = 0; i < 1000; i++) {
WriteData(index, colName, TSDB_DATA_TYPE_DOUBLE, &val, sizeof(val), i + 1000);
}
}
{
SArray* res = NULL;
std::string colName("test1");
double val = 1.9;
Search(index, colName, TSDB_DATA_TYPE_DOUBLE, &val, sizeof(val), QUERY_GREATER_EQUAL, &res);
EXPECT_EQ(2000, taosArrayGetSize(res));
}
{
SArray* res = NULL;
std::string colName("test1");
double val = 2.1;
Search(index, colName, TSDB_DATA_TYPE_DOUBLE, &val, sizeof(val), QUERY_GREATER_EQUAL, &res);
EXPECT_EQ(1000, taosArrayGetSize(res));
}
{
std::string colName("test1");
SArray* res = NULL;
double val = 2.1;
Search(index, colName, TSDB_DATA_TYPE_DOUBLE, &val, sizeof(val), QUERY_GREATER_EQUAL, &res);
EXPECT_EQ(1000, taosArrayGetSize(res));
}
}

View File

@ -237,6 +237,7 @@ static int32_t syncIOStopInternal(SSyncIO *io) {
int32_t ret = 0;
atomic_store_8(&io->isStart, 0);
taosThreadJoin(io->consumerTid, NULL);
taosThreadClear(&io->consumerTid);
taosTmrCleanUp(io->timerMgr);
return ret;
}

View File

@ -924,8 +924,7 @@ int cliAppCb(SCliConn* pConn, STransMsg* pResp, SCliMsg* pMsg) {
arg->param1 = pMsg;
arg->param2 = pThrd;
transDQSched(pThrd->delayQueue, doDelayTask, arg, TRANS_RETRY_INTERVAL);
cliDestroy((uv_handle_t*)pConn->stream);
cliDestroyConn(pConn, true);
return -1;
}
} else if (pCtx->retryCount < TRANS_RETRY_COUNT_LIMIT) {

View File

@ -244,6 +244,7 @@ static void walStopThread() {
if (taosCheckPthreadValid(tsWal.thread)) {
taosThreadJoin(tsWal.thread, NULL);
taosThreadClear(&tsWal.thread);
}
wDebug("wal thread is stopped");

View File

@ -145,6 +145,7 @@ void taosCloseLog() {
taosStopLog();
if (tsLogObj.logHandle != NULL && taosCheckPthreadValid(tsLogObj.logHandle->asyncThread)) {
taosThreadJoin(tsLogObj.logHandle->asyncThread, NULL);
taosThreadClear(&tsLogObj.logHandle->asyncThread);
}
tsLogInited = 0;

View File

@ -209,6 +209,7 @@ void taosCleanUpScheduler(void *param) {
for (int32_t i = 0; i < pSched->numOfThreads; ++i) {
if (taosCheckPthreadValid(pSched->qthread[i])) {
taosThreadJoin(pSched->qthread[i], NULL);
taosThreadClear(&pSched->qthread[i]);
}
}

View File

@ -57,6 +57,7 @@ void tQWorkerCleanup(SQWorkerPool *pool) {
if (worker == NULL) continue;
if (taosCheckPthreadValid(worker->thread)) {
taosThreadJoin(worker->thread, NULL);
taosThreadClear(&worker->thread);
}
}
@ -179,6 +180,7 @@ void tWWorkerCleanup(SWWorkerPool *pool) {
SWWorker *worker = pool->workers + i;
if (taosCheckPthreadValid(worker->thread)) {
taosThreadJoin(worker->thread, NULL);
taosThreadClear(&worker->thread);
taosFreeQall(worker->qall);
taosCloseQset(worker->qset);
}

View File

@ -436,6 +436,7 @@ int32_t main(int32_t argc, char *argv[]) {
taosMsleep(300);
for (int32_t i = 0; i < numOfThreads; i++) {
taosThreadJoin(pInfo[i].thread, NULL);
taosThreadClear(&pInfo[i].thread);
}
int64_t maxDelay = 0;

View File

@ -537,6 +537,7 @@ int main(int32_t argc, char* argv[]) {
for (int32_t i = 0; i < g_stConfInfo.numOfThread; i++) {
taosThreadJoin(g_stConfInfo.stThreads[i].thread, NULL);
taosThreadClear(&g_stConfInfo.stThreads[i].thread);
}
// printf("consumer: %d, cosumer1: %d\n", totalMsgs, pInfo->consumeMsgCnt);

View File

@ -56,6 +56,7 @@ void simFreeScript(SScript *script) {
bgScript->killed = true;
if (taosCheckPthreadValid(bgScript->bgPid)) {
taosThreadJoin(bgScript->bgPid, NULL);
taosThreadClear(&bgScript->bgPid);
}
simDebug("script:%s, background thread joined", bgScript->fileName);

View File

@ -985,6 +985,7 @@ int32_t shellExecute() {
while (1) {
taosThreadCreate(&shell.pid, NULL, shellThreadLoop, shell.conn);
taosThreadJoin(shell.pid, NULL);
taosThreadClear(&shell.pid);
}
return 0;