enh: refator index/transport code

This commit is contained in:
yihaoDeng 2022-05-21 17:29:50 +08:00
parent 84eeb2ab70
commit 4d18fa0875
8 changed files with 31 additions and 84 deletions

View File

@ -19,38 +19,11 @@
#include "nodes.h" #include "nodes.h"
#include "tdatablock.h" #include "tdatablock.h"
typedef struct SIFCtx { // clang-format off
int32_t code; #define SIF_ERR_RET(c) do { int32_t _code = c; if (_code != TSDB_CODE_SUCCESS) { terrno = _code; return _code; } } while (0)
SHashObj *pRes; /* element is SScalarParam */ #define SIF_RET(c) do { int32_t _code = c; if (_code != TSDB_CODE_SUCCESS) { terrno = _code; } return _code; } while (0)
bool noExec; // true: just iterate condition tree, and add hint to executor plan #define SIF_ERR_JRET(c) do { code = c; if (code != TSDB_CODE_SUCCESS) { terrno = code; goto _return; } } while (0)
// SIdxFltStatus st; // clang-format on
} SIFCtx;
#define SIF_ERR_RET(c) \
do { \
int32_t _code = c; \
if (_code != TSDB_CODE_SUCCESS) { \
terrno = _code; \
return _code; \
} \
} while (0)
#define SIF_RET(c) \
do { \
int32_t _code = c; \
if (_code != TSDB_CODE_SUCCESS) { \
terrno = _code; \
} \
return _code; \
} while (0)
#define SIF_ERR_JRET(c) \
do { \
code = c; \
if (code != TSDB_CODE_SUCCESS) { \
terrno = code; \
goto _return; \
} \
} while (0)
typedef struct SIFParam { typedef struct SIFParam {
SHashObj *pFilter; SHashObj *pFilter;
@ -65,6 +38,13 @@ typedef struct SIFParam {
char colName[TSDB_COL_NAME_LEN]; char colName[TSDB_COL_NAME_LEN];
} SIFParam; } SIFParam;
typedef struct SIFCtx {
int32_t code;
SHashObj *pRes; /* element is SIFParam */
bool noExec; // true: just iterate condition tree, and add hint to executor plan
// SIdxFltStatus st;
} SIFCtx;
static int32_t sifGetFuncFromSql(EOperatorType src, EIndexQueryType *dst) { static int32_t sifGetFuncFromSql(EOperatorType src, EIndexQueryType *dst) {
if (src == OP_TYPE_GREATER_THAN) { if (src == OP_TYPE_GREATER_THAN) {
*dst = QUERY_GREATER_THAN; *dst = QUERY_GREATER_THAN;

View File

@ -132,12 +132,12 @@ int32_t indexSerialCacheKey(ICacheKey* key, char* buf);
// int32_t indexSerialTermKey(SIndexTerm* itm, char* buf); // int32_t indexSerialTermKey(SIndexTerm* itm, char* buf);
// clang-format off // clang-format off
#define indexFatal(...) do { if (sDebugFlag & DEBUG_FATAL) { taosPrintLog("index FATAL ", DEBUG_FATAL, 255, __VA_ARGS__); }} while (0) #define indexFatal(...) do { if (sDebugFlag & DEBUG_FATAL) { taosPrintLog("INDEX FATAL ", DEBUG_FATAL, 255, __VA_ARGS__); }} while (0)
#define indexError(...) do { if (sDebugFlag & DEBUG_ERROR) { taosPrintLog("index ERROR ", DEBUG_ERROR, 255, __VA_ARGS__); }} while (0) #define indexError(...) do { if (sDebugFlag & DEBUG_ERROR) { taosPrintLog("INDEX ERROR ", DEBUG_ERROR, 255, __VA_ARGS__); }} while (0)
#define indexWarn(...) do { if (sDebugFlag & DEBUG_WARN) { taosPrintLog("index WARN ", DEBUG_WARN, 255, __VA_ARGS__); }} while (0) #define indexWarn(...) do { if (sDebugFlag & DEBUG_WARN) { taosPrintLog("INDEX WARN ", DEBUG_WARN, 255, __VA_ARGS__); }} while (0)
#define indexInfo(...) do { if (sDebugFlag & DEBUG_INFO) { taosPrintLog("index ", DEBUG_INFO, 255, __VA_ARGS__); } } while (0) #define indexInfo(...) do { if (sDebugFlag & DEBUG_INFO) { taosPrintLog("INDEX ", DEBUG_INFO, 255, __VA_ARGS__); } } while (0)
#define indexDebug(...) do { if (sDebugFlag & DEBUG_DEBUG) { taosPrintLog("index ", DEBUG_DEBUG, sDebugFlag, __VA_ARGS__);} } while (0) #define indexDebug(...) do { if (sDebugFlag & DEBUG_DEBUG) { taosPrintLog("INDEX ", DEBUG_DEBUG, sDebugFlag, __VA_ARGS__);} } while (0)
#define indexTrace(...) do { if (sDebugFlag & DEBUG_TRACE) { taosPrintLog("index ", DEBUG_TRACE, sDebugFlag, __VA_ARGS__);} } while (0) #define indexTrace(...) do { if (sDebugFlag & DEBUG_TRACE) { taosPrintLog("INDEX ", DEBUG_TRACE, sDebugFlag, __VA_ARGS__);} } while (0)
// clang-format on // clang-format on
#define INDEX_TYPE_CONTAIN_EXTERN_TYPE(ty, exTy) (((ty >> 4) & (exTy)) != 0) #define INDEX_TYPE_CONTAIN_EXTERN_TYPE(ty, exTy) (((ty >> 4) & (exTy)) != 0)

View File

@ -563,10 +563,11 @@ int32_t indexSerialCacheKey(ICacheKey* key, char* buf) {
bool hasJson = INDEX_TYPE_CONTAIN_EXTERN_TYPE(key->colType, TSDB_DATA_TYPE_JSON); bool hasJson = INDEX_TYPE_CONTAIN_EXTERN_TYPE(key->colType, TSDB_DATA_TYPE_JSON);
char* p = buf; char* p = buf;
SERIALIZE_MEM_TO_BUF(buf, key, suid); char tbuf[65] = {0};
indexInt2str((int64_t)key->suid, tbuf, 0);
SERIALIZE_STR_VAR_TO_BUF(buf, tbuf, strlen(tbuf));
SERIALIZE_VAR_TO_BUF(buf, '_', char); SERIALIZE_VAR_TO_BUF(buf, '_', char);
// SERIALIZE_MEM_TO_BUF(buf, key, colType);
// SERIALIZE_VAR_TO_BUF(buf, '_', char);
if (hasJson) { if (hasJson) {
SERIALIZE_STR_VAR_TO_BUF(buf, JSON_COLUMN, strlen(JSON_COLUMN)); SERIALIZE_STR_VAR_TO_BUF(buf, JSON_COLUMN, strlen(JSON_COLUMN));
} else { } else {

View File

@ -385,7 +385,7 @@ void indexCacheDebug(IndexCache* cache) {
void indexCacheDestroySkiplist(SSkipList* slt) { void indexCacheDestroySkiplist(SSkipList* slt) {
SSkipListIterator* iter = tSkipListCreateIter(slt); SSkipListIterator* iter = tSkipListCreateIter(slt);
while (tSkipListIterNext(iter)) { while (iter != NULL && tSkipListIterNext(iter)) {
SSkipListNode* node = tSkipListIterGet(iter); SSkipListNode* node = tSkipListIterGet(iter);
CacheTerm* ct = (CacheTerm*)SL_GET_NODE_DATA(node); CacheTerm* ct = (CacheTerm*)SL_GET_NODE_DATA(node);
if (ct != NULL) { if (ct != NULL) {

View File

@ -172,7 +172,6 @@ void tfileCachePut(TFileCache* tcache, ICacheKey* key, TFileReader* reader) {
oldReader->remove = true; oldReader->remove = true;
tfileReaderUnRef(oldReader); tfileReaderUnRef(oldReader);
} }
taosHashPut(tcache->tableCache, buf, sz, &reader, sizeof(void*)); taosHashPut(tcache->tableCache, buf, sz, &reader, sizeof(void*));
tfileReaderRef(reader); tfileReaderRef(reader);
return; return;
@ -500,15 +499,15 @@ static int32_t tfSearchCompareFunc_JSON(void* reader, SIndexTerm* tem, SIdxTempR
int tfileReaderSearch(TFileReader* reader, SIndexTermQuery* query, SIdxTempResult* tr) { int tfileReaderSearch(TFileReader* reader, SIndexTermQuery* query, SIdxTempResult* tr) {
SIndexTerm* term = query->term; SIndexTerm* term = query->term;
EIndexQueryType qtype = query->qType; EIndexQueryType qtype = query->qType;
int ret = 0;
if (INDEX_TYPE_CONTAIN_EXTERN_TYPE(term->colType, TSDB_DATA_TYPE_JSON)) { if (INDEX_TYPE_CONTAIN_EXTERN_TYPE(term->colType, TSDB_DATA_TYPE_JSON)) {
return tfSearch[1][qtype](reader, term, tr); ret = tfSearch[1][qtype](reader, term, tr);
} else { } else {
return tfSearch[0][qtype](reader, term, tr); ret = tfSearch[0][qtype](reader, term, tr);
} }
tfileReaderUnRef(reader); tfileReaderUnRef(reader);
return 0; return ret;
} }
TFileWriter* tfileWriterOpen(char* path, uint64_t suid, int32_t version, const char* colName, uint8_t colType) { TFileWriter* tfileWriterOpen(char* path, uint64_t suid, int32_t version, const char* colName, uint8_t colType) {

View File

@ -20,22 +20,12 @@ extern "C" {
#endif #endif
#include <uv.h> #include <uv.h>
#include "lz4.h"
#include "os.h" #include "os.h"
#include "osSocket.h"
#include "taoserror.h" #include "taoserror.h"
#include "tglobal.h"
#include "thash.h"
#include "theap.h" #include "theap.h"
#include "tidpool.h"
#include "tmd5.h"
#include "tmempool.h"
#include "tmsg.h"
#include "transLog.h" #include "transLog.h"
#include "transportInt.h" #include "transportInt.h"
#include "tref.h"
#include "trpc.h" #include "trpc.h"
#include "ttimer.h"
#include "tutil.h" #include "tutil.h"
typedef void* queue[2]; typedef void* queue[2];
@ -108,27 +98,6 @@ typedef void* queue[2];
#define TRANS_RETRY_INTERVAL 15 // ms retry interval #define TRANS_RETRY_INTERVAL 15 // ms retry interval
#define TRANS_CONN_TIMEOUT 3 // connect timeout #define TRANS_CONN_TIMEOUT 3 // connect timeout
typedef struct {
SRpcInfo* pRpc; // associated SRpcInfo
SEpSet epSet; // ip list provided by app
void* ahandle; // handle provided by app
// struct SRpcConn* pConn; // pConn allocated
tmsg_t msgType; // message type
uint8_t* pCont; // content provided by app
int32_t contLen; // content length
// int32_t code; // error code
// int16_t numOfTry; // number of try for different servers
// int8_t oldInUse; // server EP inUse passed by app
// int8_t redirect; // flag to indicate redirect
int8_t connType; // connection type
int64_t rid; // refId returned by taosAddRef
SRpcMsg* pRsp; // for synchronous API
tsem_t* pSem; // for synchronous API
char* ip;
uint32_t port;
// SEpSet* pSet; // for synchronous API
} SRpcReqContext;
typedef SRpcMsg STransMsg; typedef SRpcMsg STransMsg;
typedef SRpcCtx STransCtx; typedef SRpcCtx STransCtx;
typedef SRpcCtxVal STransCtxVal; typedef SRpcCtxVal STransCtxVal;

View File

@ -22,15 +22,13 @@
#include "lz4.h" #include "lz4.h"
#include "os.h" #include "os.h"
#include "taoserror.h" #include "taoserror.h"
#include "tglobal.h"
#include "thash.h" #include "thash.h"
#include "tidpool.h" #include "tref.h"
#include "tmsg.h" #include "tmsg.h"
#include "transLog.h" #include "transLog.h"
#include "tref.h"
#include "trpc.h" #include "trpc.h"
#include "ttimer.h"
#include "tutil.h" #include "tutil.h"
#include "tglobal.h"
#ifdef __cplusplus #ifdef __cplusplus
extern "C" { extern "C" {

View File

@ -295,14 +295,14 @@ static void uvHandleReq(SSrvConn* pConn) {
// no ref here // no ref here
} }
// if pHead->noResp = 1, // pHead->noResp = 1,
// 1. server application should not send resp on handle // 1. server application should not send resp on handle
// 2. once send out data, cli conn released to conn pool immediately // 2. once send out data, cli conn released to conn pool immediately
// 3. not mixed with persist // 3. not mixed with persist
transMsg.info.handle = (void*)uvAcquireExHandle(pConn->refId); transMsg.info.handle = (void*)uvAcquireExHandle(pConn->refId);
tTrace("server handle %p conn: %p translated to app, refId: %" PRIu64 "", transMsg.info.handle, pConn, pConn->refId);
transMsg.info.refId = pConn->refId; transMsg.info.refId = pConn->refId;
tTrace("server handle %p conn: %p translated to app, refId: %" PRIu64 "", transMsg.info.handle, pConn, pConn->refId);
assert(transMsg.info.handle != NULL); assert(transMsg.info.handle != NULL);
if (pHead->noResp == 1) { if (pHead->noResp == 1) {
transMsg.info.refId = -1; transMsg.info.refId = -1;