diff --git a/include/common/tcommon.h b/include/common/tcommon.h index a04e2afc94..d0ec5c9296 100644 --- a/include/common/tcommon.h +++ b/include/common/tcommon.h @@ -25,24 +25,6 @@ extern "C" { #endif -// typedef struct STimeWindow { -// TSKEY skey; -// TSKEY ekey; -// } STimeWindow; - -// typedef struct { -// int32_t dataLen; -// char name[TSDB_TABLE_FNAME_LEN]; -// char *data; -// } STagData; - -// typedef struct SSchema { -// uint8_t type; -// char name[TSDB_COL_NAME_LEN]; -// int16_t colId; -// int16_t bytes; -// } SSchema; - enum { TMQ_CONF__RESET_OFFSET__LATEST = -1, TMQ_CONF__RESET_OFFSET__EARLIEAST = -2, @@ -50,7 +32,8 @@ enum { }; enum { - TMQ_MSG_TYPE__POLL_RSP = 0, + TMQ_MSG_TYPE__DUMMY = 0, + TMQ_MSG_TYPE__POLL_RSP, TMQ_MSG_TYPE__EP_RSP, }; @@ -285,4 +268,4 @@ typedef struct SSessionWindow { } #endif -#endif /*_TD_COMMON_DEF_H_*/ +#endif /*_TD_COMMON_DEF_H_*/ diff --git a/include/common/tep.h b/include/common/tdatablock.h similarity index 100% rename from include/common/tep.h rename to include/common/tdatablock.h diff --git a/include/common/tmsg.h b/include/common/tmsg.h index 25d668e788..1b08d6f241 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -70,7 +70,7 @@ typedef uint16_t tmsg_t; typedef enum { HEARTBEAT_TYPE_MQ = 0, - HEARTBEAT_TYPE_QUERY = 1, + HEARTBEAT_TYPE_QUERY, // types can be added here // HEARTBEAT_TYPE_MAX diff --git a/include/libs/executor/executor.h b/include/libs/executor/executor.h index a4c0be27ab..d4af51fc21 100644 --- a/include/libs/executor/executor.h +++ b/include/libs/executor/executor.h @@ -89,25 +89,6 @@ int32_t qExecTask(qTaskInfo_t tinfo, SSDataBlock** pRes, uint64_t *useconds); */ int32_t qRetrieveQueryResultInfo(qTaskInfo_t tinfo, bool* buildRes, void* pRspContext); -/** - * - * Retrieve the actual results to fill the response message payload. - * Note that this function must be executed after qRetrieveQueryResultInfo is invoked. - * - * @param tinfo tinfo object - * @param pRsp response message - * @param contLen payload length - * @return - */ -//int32_t qDumpRetrieveResult(qTaskInfo_t tinfo, SRetrieveTableRsp** pRsp, int32_t* contLen, bool* continueExec); - -/** - * return the transporter context (RPC) - * @param tinfo - * @return - */ -void* qGetResultRetrieveMsg(qTaskInfo_t tinfo); - /** * kill the ongoing query and free the query handle and corresponding resources automatically * @param tinfo qhandle diff --git a/include/libs/scalar/filter.h b/include/libs/scalar/filter.h index fedb487931..a93180800e 100644 --- a/include/libs/scalar/filter.h +++ b/include/libs/scalar/filter.h @@ -19,10 +19,12 @@ extern "C" { #endif +#include "tcommon.h" +#include "nodes.h" + typedef struct SFilterInfo SFilterInfo; typedef int32_t (*filer_get_col_from_id)(void *, int32_t, void **); - enum { FLT_OPTION_NO_REWRITE = 1, FLT_OPTION_TIMESTAMP = 2, @@ -34,7 +36,6 @@ typedef struct SFilterColumnParam{ SArray* pDataBlock; } SFilterColumnParam; - extern int32_t filterInitFromNode(SNode *pNode, SFilterInfo **pinfo, uint32_t options); extern bool filterExecute(SFilterInfo *info, SSDataBlock *pSrc, int8_t** p, SColumnDataAgg *statis, int16_t numOfCols); extern int32_t filterSetDataFromSlotId(SFilterInfo *info, void *param); diff --git a/include/libs/sync/sync.h b/include/libs/sync/sync.h index 53fad4607a..23e3ef7525 100644 --- a/include/libs/sync/sync.h +++ b/include/libs/sync/sync.h @@ -21,7 +21,7 @@ extern "C" { #endif #include -#include +#include #include "taosdef.h" #include "trpc.h" diff --git a/include/os/osFile.h b/include/os/osFile.h index 7e3a5277c8..6ddf1e33c6 100644 --- a/include/os/osFile.h +++ b/include/os/osFile.h @@ -22,6 +22,7 @@ extern "C" { #include "osSocket.h" +// If the error is in a third-party library, place this header file under the third-party library header file. #ifndef ALLOW_FORBID_FUNC #define open OPEN_FUNC_TAOS_FORBID #define fopen FOPEN_FUNC_TAOS_FORBID @@ -31,6 +32,8 @@ extern "C" { #define fstat FSTAT_FUNC_TAOS_FORBID #define close CLOSE_FUNC_TAOS_FORBID #define fclose FCLOSE_FUNC_TAOS_FORBID + #define fsync FSYNC_FUNC_TAOS_FORBID + // #define fflush FFLUSH_FUNC_TAOS_FORBID #endif #ifndef PATH_MAX @@ -47,13 +50,13 @@ typedef struct TdFile *TdFilePtr; #define TD_FILE_TEXT 0x0020 #define TD_FILE_AUTO_DEL 0x0040 #define TD_FILE_EXCL 0x0080 -#define TD_FILE_STREAM 0x0100 // Only support taosFprintfFile, taosGetLineFile, taosGetLineFile, taosEOFFile +#define TD_FILE_STREAM 0x0100 // Only support taosFprintfFile, taosGetLineFile, taosEOFFile TdFilePtr taosOpenFile(const char *path,int32_t tdFileOptions); #define TD_FILE_ACCESS_EXIST_OK 0x1 #define TD_FILE_ACCESS_READ_OK 0x2 #define TD_FILE_ACCESS_WRITE_OK 0x4 -bool taosCheckAccessFile(const char *pathname, int mode); +bool taosCheckAccessFile(const char *pathname, int mode); int32_t taosLockFile(TdFilePtr pFile); int32_t taosUnLockFile(TdFilePtr pFile); @@ -80,6 +83,7 @@ int64_t taosCloseFile(TdFilePtr *ppFile); int32_t taosRenameFile(const char *oldName, const char *newName); int64_t taosCopyFile(const char *from, const char *to); +int32_t taosRemoveFile(const char *path); void taosGetTmpfilePath(const char *inputTmpDir, const char *fileNamePrefix, char *dstPath); diff --git a/include/util/thash.h b/include/util/thash.h index 017cc8696f..57b20c65ee 100644 --- a/include/util/thash.h +++ b/include/util/thash.h @@ -28,7 +28,8 @@ typedef int32_t (*_equal_fn_t)(const void *, const void *, size_t len); typedef void (*_hash_before_fn_t)(void *); typedef void (*_hash_free_fn_t)(void *); -#define HASH_NODE_EXIST(code) (code == -2) +#define HASH_KEY_ALREADY_EXISTS (-2) +#define HASH_NODE_EXIST(code) (code == HASH_KEY_ALREADY_EXISTS) /** * murmur hash algorithm @@ -49,24 +50,14 @@ uint32_t taosIntHash_32(const char *key, uint32_t len); uint32_t taosIntHash_64(const char *key, uint32_t len); _hash_fn_t taosGetDefaultHashFunction(int32_t type); - _equal_fn_t taosGetDefaultEqualFunction(int32_t type); -typedef struct SHashNode { - struct SHashNode *next; - uint32_t hashVal; // the hash value of key - uint32_t dataLen; // length of data - uint32_t keyLen; // length of the key - uint16_t refCount; // reference count - int8_t removed; // flag to indicate removed - char data[]; -} SHashNode; - typedef enum SHashLockTypeE { HASH_NO_LOCK = 0, HASH_ENTRY_LOCK = 1, } SHashLockTypeE; +typedef struct SHashNode SHashNode; typedef struct SHashObj SHashObj; /** diff --git a/source/client/inc/clientInt.h b/source/client/inc/clientInt.h index c93ea1aabe..0e1b564b03 100644 --- a/source/client/inc/clientInt.h +++ b/source/client/inc/clientInt.h @@ -20,12 +20,12 @@ extern "C" { #endif -#include "tcommon.h" #include "parser.h" #include "query.h" #include "taos.h" +#include "tcommon.h" +#include "tdatablock.h" #include "tdef.h" -#include "tep.h" #include "thash.h" #include "tlist.h" #include "tmsg.h" @@ -47,12 +47,12 @@ extern "C" { typedef struct SAppInstInfo SAppInstInfo; -typedef struct SHbConnInfo { +typedef struct { void* param; SClientHbReq* req; } SHbConnInfo; -typedef struct SAppHbMgr { +typedef struct { char* key; // statistics int32_t reportCnt; @@ -68,11 +68,11 @@ typedef struct SAppHbMgr { SHashObj* connInfo; // hash } SAppHbMgr; -typedef int32_t (*FHbRspHandle)(struct SAppHbMgr* pAppHbMgr, SClientHbRsp* pRsp); +typedef int32_t (*FHbRspHandle)(SAppHbMgr* pAppHbMgr, SClientHbRsp* pRsp); typedef int32_t (*FHbReqHandle)(SClientHbKey* connKey, void* param, SClientHbReq* req); -typedef struct SClientHbMgr { +typedef struct { int8_t inited; // ctl int8_t threadStop; @@ -108,13 +108,13 @@ typedef struct SHeartBeatInfo { } SHeartBeatInfo; struct SAppInstInfo { - int64_t numOfConns; - SCorEpSet mgmtEp; - SInstanceSummary summary; - SList* pConnList; // STscObj linked list - int64_t clusterId; - void* pTransporter; - struct SAppHbMgr* pAppHbMgr; + int64_t numOfConns; + SCorEpSet mgmtEp; + SInstanceSummary summary; + SList* pConnList; // STscObj linked list + int64_t clusterId; + void* pTransporter; + SAppHbMgr* pAppHbMgr; }; typedef struct SAppInfo { @@ -141,10 +141,6 @@ typedef struct STscObj { SAppInstInfo* pAppInfo; } STscObj; -typedef struct SMqConsumer { - STscObj* pTscObj; -} SMqConsumer; - typedef struct SReqResultInfo { const char* pRspMsg; const char* pData; @@ -172,7 +168,7 @@ typedef struct SRequestSendRecvBody { SShowReqInfo showInfo; // todo this attribute will be removed after the query framework being completed. SDataBuf requestMsg; int64_t queryJob; // query job, created according to sql query DAG. - struct SQueryDag* pDag; // the query dag, generated according to the sql statement. + struct SQueryDag* pDag; // the query dag, generated according to the sql statement. SReqResultInfo resInfo; } SRequestSendRecvBody; diff --git a/source/client/src/clientHb.c b/source/client/src/clientHb.c index ed5b5f9bbe..b1e9de8000 100644 --- a/source/client/src/clientHb.c +++ b/source/client/src/clientHb.c @@ -23,7 +23,7 @@ static SClientHbMgr clientHbMgr = {0}; static int32_t hbCreateThread(); static void hbStopThread(); -static int32_t hbMqHbRspHandle(struct SAppHbMgr *pAppHbMgr, SClientHbRsp *pRsp) { return 0; } +static int32_t hbMqHbRspHandle(SAppHbMgr *pAppHbMgr, SClientHbRsp *pRsp) { return 0; } static int32_t hbProcessDBInfoRsp(void *value, int32_t valueLen, struct SCatalog *pCatalog) { int32_t code = 0; @@ -104,7 +104,7 @@ static int32_t hbProcessStbInfoRsp(void *value, int32_t valueLen, struct SCatalo return TSDB_CODE_SUCCESS; } -static int32_t hbQueryHbRspHandle(struct SAppHbMgr *pAppHbMgr, SClientHbRsp *pRsp) { +static int32_t hbQueryHbRspHandle(SAppHbMgr *pAppHbMgr, SClientHbRsp *pRsp) { SHbConnInfo *info = taosHashGet(pAppHbMgr->connInfo, &pRsp->connKey, sizeof(SClientHbKey)); if (NULL == info) { tscWarn("fail to get connInfo, may be dropped, connId:%d, type:%d", pRsp->connKey.connId, pRsp->connKey.hbType); @@ -163,7 +163,7 @@ static int32_t hbQueryHbRspHandle(struct SAppHbMgr *pAppHbMgr, SClientHbRsp *pRs return TSDB_CODE_SUCCESS; } -static int32_t hbMqAsyncCallBack(void *param, const SDataBuf *pMsg, int32_t code) { +static int32_t hbAsyncCallBack(void *param, const SDataBuf *pMsg, int32_t code) { static int32_t emptyRspNum = 0; if (code != 0) { tfree(param); @@ -226,7 +226,11 @@ int32_t hbGetExpiredDBInfo(SClientHbKey *connKey, struct SCatalog *pCatalog, SCl db->vgVersion = htonl(db->vgVersion); } - SKv kv = {.key = HEARTBEAT_KEY_DBINFO, .valueLen = sizeof(SDbVgVersion) * dbNum, .value = dbs}; + SKv kv = { + .key = HEARTBEAT_KEY_DBINFO, + .valueLen = sizeof(SDbVgVersion) * dbNum, + .value = dbs, + }; tscDebug("hb got %d expired db, valueLen:%d", dbNum, kv.valueLen); @@ -256,7 +260,11 @@ int32_t hbGetExpiredStbInfo(SClientHbKey *connKey, struct SCatalog *pCatalog, SC stb->tversion = htons(stb->tversion); } - SKv kv = {.key = HEARTBEAT_KEY_STBINFO, .valueLen = sizeof(SSTableMetaVersion) * stbNum, .value = stbs}; + SKv kv = { + .key = HEARTBEAT_KEY_STBINFO, + .valueLen = sizeof(SSTableMetaVersion) * stbNum, + .value = stbs, + }; tscDebug("hb got %d expired stb, valueLen:%d", stbNum, kv.valueLen); @@ -288,7 +296,7 @@ int32_t hbQueryHbReqHandle(SClientHbKey *connKey, void *param, SClientHbReq *req return TSDB_CODE_SUCCESS; } -int32_t hbMqHbReqHandle(SClientHbKey *connKey, void *param, SClientHbReq *req) {} +int32_t hbMqHbReqHandle(SClientHbKey *connKey, void *param, SClientHbReq *req) { return 0; } void hbMgrInitMqHbHandle() { clientHbMgr.reqHandle[HEARTBEAT_TYPE_QUERY] = hbQueryHbReqHandle; @@ -396,7 +404,7 @@ static void *hbThreadFunc(void *param) { free(buf); break; } - pInfo->fp = hbMqAsyncCallBack; + pInfo->fp = hbAsyncCallBack; pInfo->msgInfo.pData = buf; pInfo->msgInfo.len = tlen; pInfo->msgType = TDMT_MND_HEARTBEAT; @@ -448,7 +456,6 @@ static void hbStopThread() { } SAppHbMgr *appHbMgrInit(SAppInstInfo *pAppInstInfo, char *key) { - /*return NULL;*/ hbMgrInit(); SAppHbMgr *pAppHbMgr = malloc(sizeof(SAppHbMgr)); if (pAppHbMgr == NULL) { @@ -507,7 +514,6 @@ void appHbMgrCleanup(void) { } int hbMgrInit() { - /*return 0;*/ // init once int8_t old = atomic_val_compare_exchange_8(&clientHbMgr.inited, 0, 1); if (old == 1) return 0; @@ -525,7 +531,7 @@ int hbMgrInit() { } void hbMgrCleanUp() { - return; +#if 0 hbStopThread(); // destroy all appHbMgr @@ -538,6 +544,7 @@ void hbMgrCleanUp() { pthread_mutex_unlock(&clientHbMgr.lock); clientHbMgr.appHbMgrs = NULL; +#endif } int hbRegisterConnImpl(SAppHbMgr *pAppHbMgr, SClientHbKey connKey, SHbConnInfo *info) { @@ -564,9 +571,11 @@ int hbRegisterConnImpl(SAppHbMgr *pAppHbMgr, SClientHbKey connKey, SHbConnInfo * } int hbRegisterConn(SAppHbMgr *pAppHbMgr, int32_t connId, int64_t clusterId, int32_t hbType) { - /*return 0;*/ - SClientHbKey connKey = {.connId = connId, .hbType = HEARTBEAT_TYPE_QUERY}; - SHbConnInfo info = {0}; + SClientHbKey connKey = { + .connId = connId, + .hbType = HEARTBEAT_TYPE_QUERY, + }; + SHbConnInfo info = {0}; switch (hbType) { case HEARTBEAT_TYPE_QUERY: { @@ -587,7 +596,6 @@ int hbRegisterConn(SAppHbMgr *pAppHbMgr, int32_t connId, int64_t clusterId, int3 } void hbDeregisterConn(SAppHbMgr *pAppHbMgr, SClientHbKey connKey) { - /*return;*/ int32_t code = 0; code = taosHashRemove(pAppHbMgr->activeInfo, &connKey, sizeof(SClientHbKey)); code = taosHashRemove(pAppHbMgr->connInfo, &connKey, sizeof(SClientHbKey)); @@ -599,7 +607,6 @@ void hbDeregisterConn(SAppHbMgr *pAppHbMgr, SClientHbKey connKey) { int hbAddConnInfo(SAppHbMgr *pAppHbMgr, SClientHbKey connKey, void *key, void *value, int32_t keyLen, int32_t valueLen) { - return 0; // find req by connection id SClientHbReq *pReq = taosHashGet(pAppHbMgr->activeInfo, &connKey, sizeof(SClientHbKey)); ASSERT(pReq != NULL); diff --git a/source/client/src/clientImpl.c b/source/client/src/clientImpl.c index a0ba668f8a..259d0e5799 100644 --- a/source/client/src/clientImpl.c +++ b/source/client/src/clientImpl.c @@ -4,8 +4,8 @@ #include "parser.h" #include "planner.h" #include "scheduler.h" +#include "tdatablock.h" #include "tdef.h" -#include "tep.h" #include "tglobal.h" #include "tmsgtype.h" #include "tpagedbuf.h" diff --git a/source/client/src/tmq.c b/source/client/src/tmq.c index d9ab23b9fa..35fc557eea 100644 --- a/source/client/src/tmq.c +++ b/source/client/src/tmq.c @@ -20,8 +20,8 @@ #include "parser.h" #include "planner.h" #include "scheduler.h" +#include "tdatablock.h" #include "tdef.h" -#include "tep.h" #include "tglobal.h" #include "tmsgtype.h" #include "tpagedbuf.h" @@ -59,6 +59,7 @@ struct tmq_t { char groupId[256]; char clientId[256]; int8_t autoCommit; + int8_t inWaiting; int64_t consumerId; int32_t epoch; int32_t resetOffsetCfg; @@ -66,9 +67,12 @@ struct tmq_t { STscObj* pTscObj; tmq_commit_cb* commit_cb; int32_t nextTopicIdx; + int32_t waitingRequest; + int32_t readyRequest; SArray* clientTopics; // SArray STaosQueue* mqueue; // queue of tmq_message_t STaosQall* qall; + tsem_t rspSem; // stat int64_t pollCnt; }; @@ -117,10 +121,12 @@ typedef struct { } SMqAskEpCbParam; typedef struct { - tmq_t* tmq; - SMqClientVg* pVg; - int32_t epoch; - tsem_t rspSem; + tmq_t* tmq; + SMqClientVg* pVg; + int32_t epoch; + tsem_t rspSem; + tmq_message_t** msg; + int32_t sync; } SMqPollCbParam; typedef struct { @@ -196,6 +202,26 @@ int32_t tmq_list_append(tmq_list_t* ptr, const char* src) { return 0; } +void tmqClearUnhandleMsg(tmq_t* tmq) { + tmq_message_t* msg; + while (1) { + taosGetQitem(tmq->qall, (void**)&msg); + if (msg) + taosFreeQitem(msg); + else + break; + } + + taosReadAllQitems(tmq->mqueue, tmq->qall); + while (1) { + taosGetQitem(tmq->qall, (void**)&msg); + if (msg) + taosFreeQitem(msg); + else + break; + } +} + int32_t tmqSubscribeCb(void* param, const SDataBuf* pMsg, int32_t code) { SMqSubscribeCbParam* pParam = (SMqSubscribeCbParam*)param; pParam->rspErr = code; @@ -205,16 +231,11 @@ int32_t tmqSubscribeCb(void* param, const SDataBuf* pMsg, int32_t code) { int32_t tmqCommitCb(void* param, const SDataBuf* pMsg, int32_t code) { SMqCommitCbParam* pParam = (SMqCommitCbParam*)param; - tmq_resp_err_t rspErr = code == 0 ? TMQ_RESP_ERR__SUCCESS : TMQ_RESP_ERR__FAIL; + pParam->rspErr = code == 0 ? TMQ_RESP_ERR__SUCCESS : TMQ_RESP_ERR__FAIL; if (pParam->tmq->commit_cb) { - pParam->tmq->commit_cb(pParam->tmq, rspErr, NULL, NULL); - } - if (!pParam->async) - tsem_post(&pParam->rspSem); - else { - tsem_destroy(&pParam->rspSem); - free(param); + pParam->tmq->commit_cb(pParam->tmq, pParam->rspErr, NULL, NULL); } + if (!pParam->async) tsem_post(&pParam->rspSem); return 0; } @@ -240,9 +261,12 @@ tmq_t* tmq_consumer_new(void* conn, tmq_conf_t* conf, char* errstr, int32_t errs return NULL; } pTmq->pTscObj = (STscObj*)conn; + pTmq->inWaiting = 0; pTmq->status = 0; pTmq->pollCnt = 0; pTmq->epoch = 0; + pTmq->waitingRequest = 0; + pTmq->readyRequest = 0; // set conf strcpy(pTmq->clientId, conf->clientId); strcpy(pTmq->groupId, conf->groupId); @@ -250,6 +274,8 @@ tmq_t* tmq_consumer_new(void* conn, tmq_conf_t* conf, char* errstr, int32_t errs pTmq->commit_cb = conf->commit_cb; pTmq->resetOffsetCfg = conf->resetOffset; + tsem_init(&pTmq->rspSem, 0, 0); + pTmq->consumerId = generateRequestId() & (((uint64_t)-1) >> 1); pTmq->clientTopics = taosArrayInit(0, sizeof(SMqClientTopic)); @@ -315,6 +341,7 @@ tmq_resp_err_t tmq_commit(tmq_t* tmq, const tmq_topic_vgroup_list_t* offsets, in } pParam->tmq = tmq; tsem_init(&pParam->rspSem, 0, 0); + pParam->async = async; pRequest->body.requestMsg = (SDataBuf){ .pData = buf, @@ -335,6 +362,9 @@ tmq_resp_err_t tmq_commit(tmq_t* tmq, const tmq_topic_vgroup_list_t* offsets, in resp = pParam->rspErr; } + tsem_destroy(&pParam->rspSem); + free(pParam); + if (pArray) { taosArrayDestroy(pArray); } @@ -576,7 +606,7 @@ static char* formatTimestamp(char* buf, int64_t val, int precision) { int32_t tmqGetSkipLogNum(tmq_message_t* tmq_message) { if (tmq_message == NULL) return 0; - SMqConsumeRsp* pRsp = (SMqConsumeRsp*)tmq_message; + SMqConsumeRsp* pRsp = &tmq_message->consumeRsp; return pRsp->skipLogNum; } @@ -625,56 +655,74 @@ void tmqShowMsg(tmq_message_t* tmq_message) { } int32_t tmqPollCb(void* param, const SDataBuf* pMsg, int32_t code) { - printf("recv poll\n"); + /*printf("recv poll\n");*/ SMqPollCbParam* pParam = (SMqPollCbParam*)param; SMqClientVg* pVg = pParam->pVg; tmq_t* tmq = pParam->tmq; if (code != 0) { printf("msg discard\n"); - if (pParam->epoch == tmq->epoch) { - atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE); - } - return 0; + goto WRITE_QUEUE_FAIL; } int32_t msgEpoch = ((SMqRspHead*)pMsg->pData)->epoch; int32_t tmqEpoch = atomic_load_32(&tmq->epoch); if (msgEpoch < tmqEpoch) { + tsem_post(&tmq->rspSem); printf("discard rsp epoch %d, current epoch %d\n", msgEpoch, tmqEpoch); return 0; } if (msgEpoch != tmqEpoch) { printf("mismatch rsp epoch %d, current epoch %d\n", msgEpoch, tmqEpoch); + } else { + atomic_sub_fetch_32(&tmq->waitingRequest, 1); } +#if 0 + if (pParam->sync == 1) { + /**pParam->msg = malloc(sizeof(tmq_message_t));*/ + *pParam->msg = taosAllocateQitem(sizeof(tmq_message_t)); + if (*pParam->msg) { + memcpy(*pParam->msg, pMsg->pData, sizeof(SMqRspHead)); + tDecodeSMqConsumeRsp(POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)), &((*pParam->msg)->consumeRsp)); + if ((*pParam->msg)->consumeRsp.numOfTopics != 0) { + pVg->currentOffset = (*pParam->msg)->consumeRsp.rspOffset; + } + taosWriteQitem(tmq->mqueue, *pParam->msg); + tsem_post(&pParam->rspSem); + return 0; + } + tsem_post(&pParam->rspSem); + return -1; + } +#endif + /*SMqConsumeRsp* pRsp = calloc(1, sizeof(SMqConsumeRsp));*/ tmq_message_t* pRsp = taosAllocateQitem(sizeof(tmq_message_t)); if (pRsp == NULL) { - printf("fail\n"); - return -1; + goto WRITE_QUEUE_FAIL; } memcpy(pRsp, pMsg->pData, sizeof(SMqRspHead)); tDecodeSMqConsumeRsp(POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)), &pRsp->consumeRsp); /*printf("rsp commit off:%ld rsp off:%ld has data:%d\n", pRsp->committedOffset, pRsp->rspOffset, pRsp->numOfTopics);*/ if (pRsp->consumeRsp.numOfTopics == 0) { - printf("no data\n"); - if (pParam->epoch == tmq->epoch) { - atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE); - } + /*printf("no data\n");*/ taosFreeQitem(pRsp); - return 0; + goto WRITE_QUEUE_FAIL; } + pRsp->extra = pParam->pVg; taosWriteQitem(tmq->mqueue, pRsp); - printf("poll in queue\n"); - /*pParam->rspMsg = (tmq_message_t*)pRsp;*/ - /*pVg->currentOffset = pRsp->consumeRsp.rspOffset;*/ - - /*printf("rsp offset: %ld\n", rsp.rspOffset);*/ - /*printf("-----msg begin----\n");*/ - /*printf("\n-----msg end------\n");*/ + atomic_add_fetch_32(&tmq->readyRequest, 1); + tsem_post(&tmq->rspSem); return 0; + +WRITE_QUEUE_FAIL: + if (pParam->epoch == tmq->epoch) { + atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE); + } + tsem_post(&tmq->rspSem); + return code; } bool tmqUpdateEp(tmq_t* tmq, int32_t epoch, SMqCMGetSubEpRsp* pRsp) { @@ -711,81 +759,94 @@ int32_t tmqAskEpCb(void* param, const SDataBuf* pMsg, int32_t code) { tmq_t* tmq = pParam->tmq; if (code != 0) { printf("get topic endpoint error, not ready, wait:%d\n", pParam->sync); - if (pParam->sync) { - tsem_post(&pParam->rspSem); - } - return 0; + goto END; } - tscDebug("tmq ask ep cb called"); + + // tmq's epoch is monotomically increase, + // so it's safe to discard any old epoch msg. + // epoch will only increase when received newer epoch ep msg + SMqRspHead* head = pMsg->pData; + int32_t epoch = atomic_load_32(&tmq->epoch); + if (head->epoch <= epoch) { + goto END; + } + if (pParam->sync) { - SMqRspHead* head = pMsg->pData; SMqCMGetSubEpRsp rsp; tDecodeSMqCMGetSubEpRsp(POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)), &rsp); /*printf("rsp epoch %ld sz %ld\n", rsp.epoch, rsp.topics->size);*/ /*printf("tmq epoch %ld sz %ld\n", tmq->epoch, tmq->clientTopics->size);*/ - int32_t epoch = atomic_load_32(&tmq->epoch); - if (head->epoch > epoch && tmqUpdateEp(tmq, head->epoch, &rsp)) { + if (tmqUpdateEp(tmq, head->epoch, &rsp)) { atomic_store_64(&tmq->status, TMQ_CONSUMER_STATUS__READY); } - tsem_post(&pParam->rspSem); tDeleteSMqCMGetSubEpRsp(&rsp); } else { tmq_message_t* pRsp = taosAllocateQitem(sizeof(tmq_message_t)); if (pRsp == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; - return -1; + code = -1; + goto END; } memcpy(pRsp, pMsg->pData, sizeof(SMqRspHead)); tDecodeSMqCMGetSubEpRsp(POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)), &pRsp->getEpRsp); + taosWriteQitem(tmq->mqueue, pRsp); + tsem_post(&tmq->rspSem); } - return 0; + +END: + if (pParam->sync) { + tsem_post(&pParam->rspSem); + } + return code; } int32_t tmqAskEp(tmq_t* tmq, bool sync) { - printf("ask ep sync %d\n", sync); int32_t tlen = sizeof(SMqCMGetSubEpReq); - SMqCMGetSubEpReq* buf = malloc(tlen); - if (buf == NULL) { + SMqCMGetSubEpReq* req = malloc(tlen); + if (req == NULL) { tscError("failed to malloc get subscribe ep buf"); - goto END; + return -1; } - buf->consumerId = htobe64(tmq->consumerId); - buf->epoch = htonl(tmq->epoch); - strcpy(buf->cgroup, tmq->groupId); - - SRequestObj* pRequest = createRequest(tmq->pTscObj, NULL, NULL, TDMT_MND_GET_SUB_EP); - if (pRequest == NULL) { - tscError("failed to malloc subscribe ep request"); - goto END; - } - - pRequest->body.requestMsg = (SDataBuf){ - .pData = buf, - .len = tlen, - .handle = NULL, - }; + req->consumerId = htobe64(tmq->consumerId); + req->epoch = htonl(tmq->epoch); + strcpy(req->cgroup, tmq->groupId); SMqAskEpCbParam* pParam = malloc(sizeof(SMqAskEpCbParam)); if (pParam == NULL) { tscError("failed to malloc subscribe param"); - goto END; + free(req); + return -1; } pParam->tmq = tmq; pParam->sync = sync; tsem_init(&pParam->rspSem, 0, 0); - SMsgSendInfo* sendInfo = buildMsgInfoImpl(pRequest); + SMsgSendInfo* sendInfo = malloc(sizeof(SMsgSendInfo)); + if (sendInfo == NULL) { + tsem_destroy(&pParam->rspSem); + free(pParam); + free(req); + return -1; + } + + sendInfo->msgInfo = (SDataBuf){ + .pData = req, + .len = tlen, + .handle = NULL, + }; + + sendInfo->requestId = generateRequestId(); sendInfo->requestObjRefId = 0; sendInfo->param = pParam; sendInfo->fp = tmqAskEpCb; + sendInfo->msgType = TDMT_MND_GET_SUB_EP; SEpSet epSet = getEpSet_s(&tmq->pTscObj->pAppInfo->mgmtEp); int64_t transporterId = 0; asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &epSet, &transporterId, sendInfo); -END: if (sync) tsem_wait(&pParam->rspSem); return 0; } @@ -804,6 +865,7 @@ tmq_resp_err_t tmq_seek(tmq_t* tmq, const tmq_topic_vgroup_t* offset) { SMqClientVg* pVg = taosArrayGet(clientTopic->vgs, j); if (pVg->vgId == pOffset->vgId) { pVg->currentOffset = pOffset->offset; + tmqClearUnhandleMsg(tmq); return TMQ_RESP_ERR__SUCCESS; } } @@ -812,7 +874,7 @@ tmq_resp_err_t tmq_seek(tmq_t* tmq, const tmq_topic_vgroup_t* offset) { return TMQ_RESP_ERR__FAIL; } -SMqConsumeReq* tmqBuildConsumeReqImpl(tmq_t* tmq, int64_t blocking_time, SMqClientTopic* pTopic, SMqClientVg* pVg) { +SMqConsumeReq* tmqBuildConsumeReqImpl(tmq_t* tmq, int64_t blockingTime, SMqClientTopic* pTopic, SMqClientVg* pVg) { int64_t reqOffset; if (pVg->currentOffset >= 0) { reqOffset = pVg->currentOffset; @@ -832,7 +894,7 @@ SMqConsumeReq* tmqBuildConsumeReqImpl(tmq_t* tmq, int64_t blocking_time, SMqClie strcpy(pReq->topic, pTopic->topicName); strcpy(pReq->cgroup, tmq->groupId); - pReq->blockingTime = blocking_time; + pReq->blockingTime = blockingTime; pReq->consumerId = tmq->consumerId; pReq->epoch = tmq->epoch; pReq->currentOffset = reqOffset; @@ -842,28 +904,76 @@ SMqConsumeReq* tmqBuildConsumeReqImpl(tmq_t* tmq, int64_t blocking_time, SMqClie return pReq; } -void tmqClearUnhandleMsg(tmq_t* tmq) { - tmq_message_t* msg; - while (1) { - taosGetQitem(tmq->qall, (void**)&msg); - if (msg) - taosFreeQitem(msg); - else - break; - } +tmq_message_t* tmqSyncPollImpl(tmq_t* tmq, int64_t blockingTime) { + tmq_message_t* msg = NULL; + for (int i = 0; i < taosArrayGetSize(tmq->clientTopics); i++) { + SMqClientTopic* pTopic = taosArrayGet(tmq->clientTopics, i); + for (int j = 0; j < taosArrayGetSize(pTopic->vgs); j++) { + SMqClientVg* pVg = taosArrayGet(pTopic->vgs, j); + int32_t vgStatus = atomic_val_compare_exchange_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE, TMQ_VG_STATUS__WAIT); + /*if (vgStatus != TMQ_VG_STATUS__IDLE) {*/ + /*continue;*/ + /*}*/ + SMqConsumeReq* pReq = tmqBuildConsumeReqImpl(tmq, blockingTime, pTopic, pVg); + if (pReq == NULL) { + atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE); + // TODO: out of mem + return NULL; + } - taosReadAllQitems(tmq->mqueue, tmq->qall); - while (1) { - taosGetQitem(tmq->qall, (void**)&msg); - if (msg) - taosFreeQitem(msg); - else - break; + SMqPollCbParam* pParam = malloc(sizeof(SMqPollCbParam)); + if (pParam == NULL) { + atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE); + // TODO: out of mem + return NULL; + } + pParam->tmq = tmq; + pParam->pVg = pVg; + pParam->epoch = tmq->epoch; + pParam->sync = 1; + pParam->msg = &msg; + tsem_init(&pParam->rspSem, 0, 0); + + SMsgSendInfo* sendInfo = malloc(sizeof(SMsgSendInfo)); + if (sendInfo == NULL) { + return NULL; + } + + sendInfo->msgInfo = (SDataBuf){ + .pData = pReq, + .len = sizeof(SMqConsumeReq), + .handle = NULL, + }; + sendInfo->requestId = generateRequestId(); + sendInfo->requestObjRefId = 0; + sendInfo->param = pParam; + sendInfo->fp = tmqPollCb; + sendInfo->msgType = TDMT_VND_CONSUME; + + int64_t transporterId = 0; + /*printf("send poll\n");*/ + atomic_add_fetch_32(&tmq->waitingRequest, 1); + asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &pVg->epSet, &transporterId, sendInfo); + pVg->pollCnt++; + tmq->pollCnt++; + + tsem_wait(&pParam->rspSem); + tmq_message_t* nmsg = NULL; + while (1) { + taosReadQitem(tmq->mqueue, (void**)&nmsg); + if (nmsg == NULL) continue; + while (nmsg->head.mqMsgType != TMQ_MSG_TYPE__POLL_RSP) { + taosReadQitem(tmq->mqueue, (void**)&nmsg); + } + return nmsg; + } + } } + return NULL; } int32_t tmqPollImpl(tmq_t* tmq, int64_t blockingTime) { - printf("call poll\n"); + /*printf("call poll\n");*/ for (int i = 0; i < taosArrayGetSize(tmq->clientTopics); i++) { SMqClientTopic* pTopic = taosArrayGet(tmq->clientTopics, i); for (int j = 0; j < taosArrayGetSize(pTopic->vgs); j++) { @@ -875,32 +985,44 @@ int32_t tmqPollImpl(tmq_t* tmq, int64_t blockingTime) { SMqConsumeReq* pReq = tmqBuildConsumeReqImpl(tmq, blockingTime, pTopic, pVg); if (pReq == NULL) { atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE); - // TODO: out of mem + tsem_post(&tmq->rspSem); return -1; } - SMqPollCbParam* param = malloc(sizeof(SMqPollCbParam)); - if (param == NULL) { + SMqPollCbParam* pParam = malloc(sizeof(SMqPollCbParam)); + if (pParam == NULL) { + free(pReq); atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE); - // TODO: out of mem + tsem_post(&tmq->rspSem); return -1; } - param->tmq = tmq; - param->pVg = pVg; - param->epoch = tmq->epoch; - SRequestObj* pRequest = createRequest(tmq->pTscObj, NULL, NULL, TDMT_VND_CONSUME); - pRequest->body.requestMsg = (SDataBuf){ + pParam->tmq = tmq; + pParam->pVg = pVg; + pParam->epoch = tmq->epoch; + pParam->sync = 0; + + SMsgSendInfo* sendInfo = malloc(sizeof(SMsgSendInfo)); + if (sendInfo == NULL) { + free(pReq); + free(pParam); + atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE); + tsem_post(&tmq->rspSem); + return -1; + } + + sendInfo->msgInfo = (SDataBuf){ .pData = pReq, .len = sizeof(SMqConsumeReq), .handle = NULL, }; - - SMsgSendInfo* sendInfo = buildMsgInfoImpl(pRequest); + sendInfo->requestId = generateRequestId(); sendInfo->requestObjRefId = 0; - sendInfo->param = param; + sendInfo->param = pParam; sendInfo->fp = tmqPollCb; + sendInfo->msgType = TDMT_VND_CONSUME; int64_t transporterId = 0; - printf("send poll\n"); + /*printf("send poll\n");*/ + atomic_add_fetch_32(&tmq->waitingRequest, 1); asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &pVg->epSet, &transporterId, sendInfo); pVg->pollCnt++; tmq->pollCnt++; @@ -912,7 +1034,7 @@ int32_t tmqPollImpl(tmq_t* tmq, int64_t blockingTime) { // return int32_t tmqHandleRes(tmq_t* tmq, tmq_message_t* rspMsg, bool* pReset) { if (rspMsg->head.mqMsgType == TMQ_MSG_TYPE__EP_RSP) { - printf("ep %d %d\n", rspMsg->head.epoch, tmq->epoch); + /*printf("ep %d %d\n", rspMsg->head.epoch, tmq->epoch);*/ if (rspMsg->head.epoch > atomic_load_32(&tmq->epoch)) { tmqUpdateEp(tmq, rspMsg->head.epoch, &rspMsg->getEpRsp); tmqClearUnhandleMsg(tmq); @@ -931,23 +1053,26 @@ tmq_message_t* tmqHandleAllRsp(tmq_t* tmq, int64_t blockingTime, bool pollIfRese tmq_message_t* rspMsg = NULL; taosGetQitem(tmq->qall, (void**)&rspMsg); if (rspMsg == NULL) { - break; + taosReadAllQitems(tmq->mqueue, tmq->qall); + taosGetQitem(tmq->qall, (void**)&rspMsg); + if (rspMsg == NULL) return NULL; } if (rspMsg->head.mqMsgType == TMQ_MSG_TYPE__POLL_RSP) { - printf("handle poll rsp %d\n", rspMsg->head.mqMsgType); + atomic_sub_fetch_32(&tmq->readyRequest, 1); + /*printf("handle poll rsp %d\n", rspMsg->head.mqMsgType);*/ if (rspMsg->head.epoch == atomic_load_32(&tmq->epoch)) { - printf("epoch match\n"); + /*printf("epoch match\n");*/ SMqClientVg* pVg = rspMsg->extra; pVg->currentOffset = rspMsg->consumeRsp.rspOffset; atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE); return rspMsg; } else { - printf("epoch mismatch\n"); + /*printf("epoch mismatch\n");*/ taosFreeQitem(rspMsg); } } else { - printf("handle ep rsp %d\n", rspMsg->head.mqMsgType); + /*printf("handle ep rsp %d\n", rspMsg->head.mqMsgType);*/ bool reset = false; tmqHandleRes(tmq, rspMsg, &reset); taosFreeQitem(rspMsg); @@ -957,36 +1082,57 @@ tmq_message_t* tmqHandleAllRsp(tmq_t* tmq, int64_t blockingTime, bool pollIfRese } } } - return NULL; +} + +tmq_message_t* tmq_consumer_poll_v1(tmq_t* tmq, int64_t blocking_time) { + tmq_message_t* rspMsg = NULL; + int64_t startTime = taosGetTimestampMs(); + + int64_t status = atomic_load_64(&tmq->status); + tmqAskEp(tmq, status == TMQ_CONSUMER_STATUS__INIT); + + while (1) { + rspMsg = tmqSyncPollImpl(tmq, blocking_time); + if (rspMsg && rspMsg->consumeRsp.numOfTopics) { + return rspMsg; + } + + if (blocking_time != 0) { + int64_t endTime = taosGetTimestampMs(); + if (endTime - startTime > blocking_time) { + return NULL; + } + } else + return NULL; + } } tmq_message_t* tmq_consumer_poll(tmq_t* tmq, int64_t blocking_time) { - tmq_message_t* rspMsg = NULL; + tmq_message_t* rspMsg; int64_t startTime = taosGetTimestampMs(); // TODO: put into another thread or delayed queue int64_t status = atomic_load_64(&tmq->status); tmqAskEp(tmq, status == TMQ_CONSUMER_STATUS__INIT); - taosGetQitem(tmq->qall, (void**)&rspMsg); - if (rspMsg == NULL) { - taosReadAllQitems(tmq->mqueue, tmq->qall); + rspMsg = tmqHandleAllRsp(tmq, blocking_time, false); + if (rspMsg) { + return rspMsg; } - tmqHandleAllRsp(tmq, blocking_time, false); - - tmqPollImpl(tmq, blocking_time); while (1) { /*printf("cycle\n");*/ - taosReadAllQitems(tmq->mqueue, tmq->qall); - rspMsg = tmqHandleAllRsp(tmq, blocking_time, true); + tmqPollImpl(tmq, blocking_time); + + tsem_wait(&tmq->rspSem); + + rspMsg = tmqHandleAllRsp(tmq, blocking_time, false); if (rspMsg) { return rspMsg; } if (blocking_time != 0) { int64_t endTime = taosGetTimestampMs(); if (endTime - startTime > blocking_time) { - printf("normal exit\n"); return NULL; } } @@ -1127,6 +1273,7 @@ void tmq_message_destroy(tmq_message_t* tmq_message) { if (tmq_message == NULL) return; SMqConsumeRsp* pRsp = &tmq_message->consumeRsp; tDeleteSMqConsumeRsp(pRsp); + /*free(tmq_message);*/ taosFreeQitem(tmq_message); } @@ -1138,6 +1285,7 @@ const char* tmq_err2str(tmq_resp_err_t err) { } return "fail"; } + #if 0 tmq_t* tmqCreateConsumerImpl(TAOS* conn, tmq_conf_t* conf) { tmq_t* pTmq = malloc(sizeof(tmq_t)); diff --git a/source/common/src/tep.c b/source/common/src/tdatablock.c similarity index 99% rename from source/common/src/tep.c rename to source/common/src/tdatablock.c index e2880441be..5b7557b749 100644 --- a/source/common/src/tep.c +++ b/source/common/src/tdatablock.c @@ -14,7 +14,7 @@ */ #define _DEFAULT_SOURCE -#include "tep.h" +#include "tdatablock.h" #include "tcompare.h" #include "tglobal.h" diff --git a/source/common/src/tglobal.c b/source/common/src/tglobal.c index 264e54aafe..a521e10cc7 100644 --- a/source/common/src/tglobal.c +++ b/source/common/src/tglobal.c @@ -17,7 +17,7 @@ #include "tglobal.h" #include "tcompare.h" #include "tconfig.h" -#include "tep.h" +#include "tdatablock.h" #include "tlog.h" SConfig *tsCfg = NULL; diff --git a/source/common/src/ttszip.c b/source/common/src/ttszip.c index f20e911681..464c29d287 100644 --- a/source/common/src/ttszip.c +++ b/source/common/src/ttszip.c @@ -46,7 +46,7 @@ STSBuf* tsBufCreate(bool autoDelete, int32_t order) { } if (!autoDelete) { - remove(pTSBuf->path); + taosRemoveFile(pTSBuf->path); } if (NULL == allocResForTSBuf(pTSBuf)) { @@ -178,7 +178,7 @@ void* tsBufDestroy(STSBuf* pTSBuf) { if (pTSBuf->autoDelete) { // ("tsBuf %p destroyed, delete tmp file:%s", pTSBuf, pTSBuf->path); - remove(pTSBuf->path); + taosRemoveFile(pTSBuf->path); } else { // tscDebug("tsBuf %p destroyed, tmp file:%s, remains", pTSBuf, pTSBuf->path); } diff --git a/source/common/test/commonTests.cpp b/source/common/test/commonTests.cpp index 563efe4d54..7d54e5150c 100644 --- a/source/common/test/commonTests.cpp +++ b/source/common/test/commonTests.cpp @@ -8,7 +8,8 @@ #pragma GCC diagnostic ignored "-Wsign-compare" #include "os.h" -#include "tep.h" +#include "tcommon.h" +#include "tdatablock.h" #include "tcommon.h" #include "taos.h" #include "tvariant.h" diff --git a/source/dnode/mgmt/impl/inc/dndInt.h b/source/dnode/mgmt/impl/inc/dndInt.h index c7c30af057..4ca6b97ad4 100644 --- a/source/dnode/mgmt/impl/inc/dndInt.h +++ b/source/dnode/mgmt/impl/inc/dndInt.h @@ -23,9 +23,11 @@ extern "C" { #include "os.h" #include "cJSON.h" +#include "monitor.h" #include "tcache.h" #include "tcrc32c.h" -#include "tep.h" +#include "tdatablock.h" +#include "tglobal.h" #include "thash.h" #include "tlockfree.h" #include "tlog.h" @@ -35,8 +37,6 @@ extern "C" { #include "tthread.h" #include "ttime.h" #include "tworker.h" -#include "tglobal.h" -#include "monitor.h" #include "dnode.h" diff --git a/source/dnode/mgmt/impl/test/sut/src/client.cpp b/source/dnode/mgmt/impl/test/sut/src/client.cpp index b89cb02834..f22bc9d276 100644 --- a/source/dnode/mgmt/impl/test/sut/src/client.cpp +++ b/source/dnode/mgmt/impl/test/sut/src/client.cpp @@ -14,7 +14,7 @@ */ #include "sut.h" -#include "tep.h" +#include "tdatablock.h" static void processClientRsp(void* parent, SRpcMsg* pRsp, SEpSet* pEpSet) { TestClient* client = (TestClient*)parent; diff --git a/source/dnode/mnode/impl/inc/mndInt.h b/source/dnode/mnode/impl/inc/mndInt.h index 71448feb73..0fe24cd757 100644 --- a/source/dnode/mnode/impl/inc/mndInt.h +++ b/source/dnode/mnode/impl/inc/mndInt.h @@ -20,7 +20,7 @@ #include "sdb.h" #include "tcache.h" -#include "tep.h" +#include "tdatablock.h" #include "tglobal.h" #include "tqueue.h" #include "ttime.h" diff --git a/source/dnode/vnode/inc/meta.h b/source/dnode/vnode/inc/meta.h index c98a97d0dc..fd079b8f32 100644 --- a/source/dnode/vnode/inc/meta.h +++ b/source/dnode/vnode/inc/meta.h @@ -24,8 +24,8 @@ extern "C" { #endif -#define META_SUPER_TABLE TD_SUPER_TABLE -#define META_CHILD_TABLE TD_CHILD_TABLE +#define META_SUPER_TABLE TD_SUPER_TABLE +#define META_CHILD_TABLE TD_CHILD_TABLE #define META_NORMAL_TABLE TD_NORMAL_TABLE // Types exported @@ -50,14 +50,14 @@ int metaDropTable(SMeta *pMeta, tb_uid_t uid); int metaCommit(SMeta *pMeta); // For Query -STbCfg * metaGetTbInfoByUid(SMeta *pMeta, tb_uid_t uid); -STbCfg * metaGetTbInfoByName(SMeta *pMeta, char *tbname, tb_uid_t *uid); +STbCfg *metaGetTbInfoByUid(SMeta *pMeta, tb_uid_t uid); +STbCfg *metaGetTbInfoByName(SMeta *pMeta, char *tbname, tb_uid_t *uid); SSchemaWrapper *metaGetTableSchema(SMeta *pMeta, tb_uid_t uid, int32_t sver, bool isinline); -STSchema * metaGetTbTSchema(SMeta *pMeta, tb_uid_t uid, int32_t sver); +STSchema *metaGetTbTSchema(SMeta *pMeta, tb_uid_t uid, int32_t sver); SMTbCursor *metaOpenTbCursor(SMeta *pMeta); void metaCloseTbCursor(SMTbCursor *pTbCur); -char * metaTbCursorNext(SMTbCursor *pTbCur); +char *metaTbCursorNext(SMTbCursor *pTbCur); SMCtbCursor *metaOpenCtbCursor(SMeta *pMeta, tb_uid_t uid); void metaCloseCtbCurosr(SMCtbCursor *pCtbCur); diff --git a/source/dnode/vnode/inc/vnode.h b/source/dnode/vnode/inc/vnode.h index 7718e0886d..319bd9fde6 100644 --- a/source/dnode/vnode/inc/vnode.h +++ b/source/dnode/vnode/inc/vnode.h @@ -213,6 +213,10 @@ static FORCE_INLINE void tqReadHandleSetColIdList(STqReadHandle *pReadHandle, SA //} static FORCE_INLINE int tqReadHandleSetTbUidList(STqReadHandle *pHandle, const SArray *tbUidList) { + if (pHandle->tbIdHash) { + taosHashClear(pHandle->tbIdHash); + } + pHandle->tbIdHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_NO_LOCK); if (pHandle->tbIdHash == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; @@ -227,6 +231,23 @@ static FORCE_INLINE int tqReadHandleSetTbUidList(STqReadHandle *pHandle, const S return 0; } +static FORCE_INLINE int tqReadHandleAddTbUidList(STqReadHandle *pHandle, const SArray *tbUidList) { + if (pHandle->tbIdHash == NULL) { + pHandle->tbIdHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_NO_LOCK); + if (pHandle->tbIdHash == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return -1; + } + } + + for (int i = 0; i < taosArrayGetSize(tbUidList); i++) { + int64_t *pKey = (int64_t *)taosArrayGet(tbUidList, i); + taosHashPut(pHandle->tbIdHash, pKey, sizeof(int64_t), NULL, 0); + } + + return 0; +} + int32_t tqReadHandleSetMsg(STqReadHandle *pHandle, SSubmitReq *pMsg, int64_t ver); bool tqNextDataBlock(STqReadHandle *pHandle); int tqRetrieveDataBlockInfo(STqReadHandle *pHandle, SDataBlockInfo *pBlockInfo); diff --git a/source/dnode/vnode/src/inc/metaDef.h b/source/dnode/vnode/src/inc/metaDef.h index f4128f7170..71bfd91356 100644 --- a/source/dnode/vnode/src/inc/metaDef.h +++ b/source/dnode/vnode/src/inc/metaDef.h @@ -76,4 +76,4 @@ struct SMeta { } #endif -#endif /*_TD_META_DEF_H_*/ \ No newline at end of file +#endif /*_TD_META_DEF_H_*/ diff --git a/source/dnode/vnode/src/meta/metaBDBImpl.c b/source/dnode/vnode/src/meta/metaBDBImpl.c index 3c464391d2..f75d9d3db0 100644 --- a/source/dnode/vnode/src/meta/metaBDBImpl.c +++ b/source/dnode/vnode/src/meta/metaBDBImpl.c @@ -62,10 +62,10 @@ static int metaStbIdxCb(DB *pIdx, const DBT *pKey, const DBT *pValue, DBT * static int metaNtbIdxCb(DB *pIdx, const DBT *pKey, const DBT *pValue, DBT *pSKey); static int metaCtbIdxCb(DB *pIdx, const DBT *pKey, const DBT *pValue, DBT *pSKey); static int metaEncodeTbInfo(void **buf, STbCfg *pTbCfg); -static void * metaDecodeTbInfo(void *buf, STbCfg *pTbCfg); +static void *metaDecodeTbInfo(void *buf, STbCfg *pTbCfg); static void metaClearTbCfg(STbCfg *pTbCfg); static int metaEncodeSchema(void **buf, SSchemaWrapper *pSW); -static void * metaDecodeSchema(void *buf, SSchemaWrapper *pSW); +static void *metaDecodeSchema(void *buf, SSchemaWrapper *pSW); static void metaDBWLock(SMetaDB *pDB); static void metaDBRLock(SMetaDB *pDB); static void metaDBULock(SMetaDB *pDB); @@ -142,7 +142,7 @@ int metaSaveTableToDB(SMeta *pMeta, STbCfg *pTbCfg) { tb_uid_t uid; char buf[512]; char buf1[512]; - void * pBuf; + void *pBuf; DBT key1, value1; DBT key2, value2; SSchema *pSchema = NULL; @@ -394,7 +394,7 @@ static int metaNtbIdxCb(DB *pIdx, const DBT *pKey, const DBT *pValue, DBT *pSKey static int metaCtbIdxCb(DB *pIdx, const DBT *pKey, const DBT *pValue, DBT *pSKey) { STbCfg *pTbCfg = (STbCfg *)(pValue->app_data); - DBT * pDbt; + DBT *pDbt; if (pTbCfg->type == META_CHILD_TABLE) { // pDbt = calloc(2, sizeof(DBT)); @@ -479,7 +479,7 @@ static void metaClearTbCfg(STbCfg *pTbCfg) { /* ------------------------ FOR QUERY ------------------------ */ STbCfg *metaGetTbInfoByUid(SMeta *pMeta, tb_uid_t uid) { - STbCfg * pTbCfg = NULL; + STbCfg *pTbCfg = NULL; SMetaDB *pDB = pMeta->pDB; DBT key = {0}; DBT value = {0}; @@ -509,7 +509,7 @@ STbCfg *metaGetTbInfoByUid(SMeta *pMeta, tb_uid_t uid) { } STbCfg *metaGetTbInfoByName(SMeta *pMeta, char *tbname, tb_uid_t *uid) { - STbCfg * pTbCfg = NULL; + STbCfg *pTbCfg = NULL; SMetaDB *pDB = pMeta->pDB; DBT key = {0}; DBT pkey = {0}; @@ -543,10 +543,10 @@ STbCfg *metaGetTbInfoByName(SMeta *pMeta, char *tbname, tb_uid_t *uid) { SSchemaWrapper *metaGetTableSchema(SMeta *pMeta, tb_uid_t uid, int32_t sver, bool isinline) { uint32_t nCols; SSchemaWrapper *pSW = NULL; - SMetaDB * pDB = pMeta->pDB; + SMetaDB *pDB = pMeta->pDB; int ret; - void * pBuf; - SSchema * pSchema; + void *pBuf; + SSchema *pSchema; SSchemaKey schemaKey = {uid, sver, 0}; DBT key = {0}; DBT value = {0}; @@ -578,7 +578,7 @@ struct SMTbCursor { SMTbCursor *metaOpenTbCursor(SMeta *pMeta) { SMTbCursor *pTbCur = NULL; - SMetaDB * pDB = pMeta->pDB; + SMetaDB *pDB = pMeta->pDB; pTbCur = (SMTbCursor *)calloc(1, sizeof(*pTbCur)); if (pTbCur == NULL) { @@ -609,7 +609,7 @@ char *metaTbCursorNext(SMTbCursor *pTbCur) { DBT key = {0}; DBT value = {0}; STbCfg tbCfg; - void * pBuf; + void *pBuf; for (;;) { if (pTbCur->pCur->get(pTbCur->pCur, &key, &value, DB_NEXT) == 0) { @@ -631,10 +631,10 @@ char *metaTbCursorNext(SMTbCursor *pTbCur) { STSchema *metaGetTbTSchema(SMeta *pMeta, tb_uid_t uid, int32_t sver) { STSchemaBuilder sb; - STSchema * pTSchema = NULL; - SSchema * pSchema; + STSchema *pTSchema = NULL; + SSchema *pSchema; SSchemaWrapper *pSW; - STbCfg * pTbCfg; + STbCfg *pTbCfg; tb_uid_t quid; pTbCfg = metaGetTbInfoByUid(pMeta, uid); @@ -662,13 +662,13 @@ STSchema *metaGetTbTSchema(SMeta *pMeta, tb_uid_t uid, int32_t sver) { } struct SMCtbCursor { - DBC * pCur; + DBC *pCur; tb_uid_t suid; }; SMCtbCursor *metaOpenCtbCursor(SMeta *pMeta, tb_uid_t uid) { SMCtbCursor *pCtbCur = NULL; - SMetaDB * pDB = pMeta->pDB; + SMetaDB *pDB = pMeta->pDB; int ret; pCtbCur = (SMCtbCursor *)calloc(1, sizeof(*pCtbCur)); @@ -700,7 +700,7 @@ tb_uid_t metaCtbCursorNext(SMCtbCursor *pCtbCur) { DBT skey = {0}; DBT pkey = {0}; DBT pval = {0}; - void * pBuf; + void *pBuf; STbCfg tbCfg; // Set key diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index aa198d0806..ccad006657 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -72,6 +72,8 @@ void tqClose(STQ* pTq) { } int tqPushMsg(STQ* pTq, void* msg, tmsg_t msgType, int64_t version) { + // if waiting + // memcpy and send msg to fetch thread // TODO: add reference // if handle waiting, launch query and response to consumer // @@ -210,7 +212,7 @@ int32_t tqProcessConsumeReq(STQ* pTq, SRpcMsg* pMsg) { SMqConsumeReq* pReq = pMsg->pCont; int64_t consumerId = pReq->consumerId; int64_t fetchOffset; - /*int64_t blockingTime = pReq->blockingTime;*/ + int64_t blockingTime = pReq->blockingTime; if (pReq->currentOffset == TMQ_CONF__RESET_OFFSET__EARLIEAST) { fetchOffset = 0; diff --git a/source/dnode/vnode/src/tsdb/tsdbCommit.c b/source/dnode/vnode/src/tsdb/tsdbCommit.c index 7a95820115..37403f1a11 100644 --- a/source/dnode/vnode/src/tsdb/tsdbCommit.c +++ b/source/dnode/vnode/src/tsdb/tsdbCommit.c @@ -1054,7 +1054,7 @@ int tsdbWriteBlockIdx(SDFile *pHeadf, SArray *pIdxA, void **ppBuf) { // pfs->metaCacheComp = NULL; // } else { // // remove meta.tmp file -// remove(mf.f.aname); +// taosRemoveFile(mf.f.aname); // taosHashCleanup(pfs->metaCacheComp); // pfs->metaCacheComp = NULL; // } diff --git a/source/dnode/vnode/src/tsdb/tsdbFS.c b/source/dnode/vnode/src/tsdb/tsdbFS.c index 411a166caa..a03739c90f 100644 --- a/source/dnode/vnode/src/tsdb/tsdbFS.c +++ b/source/dnode/vnode/src/tsdb/tsdbFS.c @@ -439,7 +439,7 @@ static int tsdbSaveFSStatus(STsdb *pRepo, SFSStatus *pStatus) { if (taosWriteFile(pFile, hbuf, TSDB_FILE_HEAD_SIZE) < TSDB_FILE_HEAD_SIZE) { terrno = TAOS_SYSTEM_ERROR(errno); taosCloseFile(&pFile); - remove(tfname); + taosRemoveFile(tfname); return -1; } @@ -447,7 +447,7 @@ static int tsdbSaveFSStatus(STsdb *pRepo, SFSStatus *pStatus) { if (fsheader.len > 0) { if (tsdbMakeRoom(&(pBuf), fsheader.len) < 0) { taosCloseFile(&pFile); - remove(tfname); + taosRemoveFile(tfname); return -1; } @@ -458,7 +458,7 @@ static int tsdbSaveFSStatus(STsdb *pRepo, SFSStatus *pStatus) { if (taosWriteFile(pFile, pBuf, fsheader.len) < fsheader.len) { terrno = TAOS_SYSTEM_ERROR(errno); taosCloseFile(&pFile); - (void)remove(tfname); + (void)taosRemoveFile(tfname); taosTZfree(pBuf); return -1; } @@ -468,7 +468,7 @@ static int tsdbSaveFSStatus(STsdb *pRepo, SFSStatus *pStatus) { if (taosFsyncFile(pFile) < 0) { terrno = TAOS_SYSTEM_ERROR(errno); taosCloseFile(&pFile); - remove(tfname); + taosRemoveFile(tfname); taosTZfree(pBuf); return -1; } diff --git a/source/libs/catalog/test/catalogTests.cpp b/source/libs/catalog/test/catalogTests.cpp index b05fc7812a..aeac6b3eee 100644 --- a/source/libs/catalog/test/catalogTests.cpp +++ b/source/libs/catalog/test/catalogTests.cpp @@ -27,13 +27,13 @@ #include "os.h" #include "tglobal.h" #include "catalog.h" +#include "catalogInt.h" #include "stub.h" #include "taos.h" +#include "tdatablock.h" #include "tdef.h" -#include "tep.h" #include "trpc.h" #include "tvariant.h" -#include "catalogInt.h" namespace { diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h index f75720dc86..991cd372c3 100644 --- a/source/libs/executor/inc/executorimpl.h +++ b/source/libs/executor/inc/executorimpl.h @@ -15,12 +15,12 @@ #ifndef TDENGINE_EXECUTORIMPL_H #define TDENGINE_EXECUTORIMPL_H -#include "tsort.h" #ifdef __cplusplus extern "C" { #endif #include "os.h" +#include "tsort.h" #include "tcommon.h" #include "tlosertree.h" #include "ttszip.h" @@ -157,6 +157,13 @@ typedef struct STaskCostInfo { SHashObj* operatorProfResults; // map } STaskCostInfo; +typedef struct SOperatorCostInfo { + uint64_t openCost; + uint64_t execCost; + uint64_t totalRows; + uint64_t totalBytes; +} SOperatorCostInfo; + typedef struct { int64_t vgroupLimit; int64_t ts; diff --git a/source/libs/executor/src/executor.c b/source/libs/executor/src/executor.c index b90ce275d2..19a3874f75 100644 --- a/source/libs/executor/src/executor.c +++ b/source/libs/executor/src/executor.c @@ -95,17 +95,17 @@ qTaskInfo_t qCreateStreamExecTaskInfo(void* msg, void* streamReadHandle) { } int32_t qUpdateQualifiedTableId(qTaskInfo_t tinfo, SArray* tableIdList, bool isAdd) { - SExecTaskInfo* pTaskInfo = (SExecTaskInfo* )tinfo; + SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo; // traverse to the streamscan node to add this table id SOperatorInfo* pInfo = pTaskInfo->pRoot; - while(pInfo->operatorType != OP_StreamScan) { + while (pInfo->operatorType != OP_StreamScan) { pInfo = pInfo->pDownstream[0]; } SStreamBlockScanInfo* pScanInfo = pInfo->info; if (isAdd) { - int32_t code = tqReadHandleSetTbUidList(pScanInfo->readerHandle, tableIdList); + int32_t code = tqReadHandleAddTbUidList(pScanInfo->readerHandle, tableIdList); if (code != TSDB_CODE_SUCCESS) { return code; } @@ -114,4 +114,4 @@ int32_t qUpdateQualifiedTableId(qTaskInfo_t tinfo, SArray* tableIdList, bool isA } return TSDB_CODE_SUCCESS; -} \ No newline at end of file +} diff --git a/source/libs/executor/src/executorMain.c b/source/libs/executor/src/executorMain.c index 461235f2ab..fabaa2d31d 100644 --- a/source/libs/executor/src/executorMain.c +++ b/source/libs/executor/src/executorMain.c @@ -179,13 +179,6 @@ int32_t qExecTask(qTaskInfo_t tinfo, SSDataBlock** pRes, uint64_t *useconds) { return pTaskInfo->code; } -void* qGetResultRetrieveMsg(qTaskInfo_t qinfo) { - SQInfo* pQInfo = (SQInfo*) qinfo; - assert(pQInfo != NULL); - - return pQInfo->rspContext; -} - int32_t qKillTask(qTaskInfo_t qinfo) { SExecTaskInfo *pTaskInfo = (SExecTaskInfo *)qinfo; @@ -221,7 +214,7 @@ int32_t qAsyncKillTask(qTaskInfo_t qinfo) { int32_t qIsTaskCompleted(qTaskInfo_t qinfo) { SExecTaskInfo *pTaskInfo = (SExecTaskInfo *)qinfo; - if (pTaskInfo == NULL /*|| !isValidQInfo(pTaskInfo)*/) { + if (pTaskInfo == NULL) { return TSDB_CODE_QRY_INVALID_QHANDLE; } @@ -235,33 +228,3 @@ void qDestroyTask(qTaskInfo_t qTaskHandle) { queryCostStatis(pTaskInfo); // print the query cost summary doDestroyTask(pTaskInfo); } - -#if 0 -//kill by qid -int32_t qKillQueryByQId(void* pMgmt, int64_t qId, int32_t waitMs, int32_t waitCount) { - int32_t error = TSDB_CODE_SUCCESS; - void** handle = qAcquireTask(pMgmt, qId); - if(handle == NULL) return terrno; - - SQInfo* pQInfo = (SQInfo*)(*handle); - if (pQInfo == NULL || !isValidQInfo(pQInfo)) { - return TSDB_CODE_QRY_INVALID_QHANDLE; - } - qWarn("%s be killed(no memory commit).", pQInfo->qId); - setTaskKilled(pQInfo); - - // wait query stop - int32_t loop = 0; - while (pQInfo->owner != 0) { - taosMsleep(waitMs); - if(loop++ > waitCount){ - error = TSDB_CODE_FAILED; - break; - } - } - - qReleaseTask(pMgmt, (void **)&handle, true); - return error; -} - -#endif diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index ae79f23272..3ebad151fd 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -15,12 +15,12 @@ #include "os.h" -#include "tep.h" -#include "tsort.h" -#include "texception.h" #include "parser.h" +#include "tdatablock.h" +#include "texception.h" #include "tglobal.h" #include "tmsg.h" +#include "tsort.h" #include "ttime.h" #include "executorimpl.h" @@ -8730,10 +8730,8 @@ static void doSetTagValueToResultBuf(char* output, const char* val, int16_t type static int64_t getQuerySupportBufSize(size_t numOfTables) { size_t s1 = sizeof(STableQueryInfo); - size_t s2 = sizeof(SHashNode); - // size_t s3 = sizeof(STableCheckInfo); buffer consumption in tsdb - return (int64_t)((s1 + s2) * 1.5 * numOfTables); + return (int64_t)(s1* 1.5 * numOfTables); } int32_t checkForQueryBuf(size_t numOfTables) { diff --git a/source/libs/executor/src/tsort.c b/source/libs/executor/src/tsort.c index d042dc0eff..34dd248ba7 100644 --- a/source/libs/executor/src/tsort.c +++ b/source/libs/executor/src/tsort.c @@ -16,11 +16,11 @@ #include "tcommon.h" #include "query.h" -#include "tsort.h" -#include "tep.h" +#include "tdatablock.h" #include "tdef.h" #include "tlosertree.h" #include "tpagedbuf.h" +#include "tsort.h" #include "tutil.h" typedef struct STupleHandle { diff --git a/source/libs/executor/test/executorTests.cpp b/source/libs/executor/test/executorTests.cpp index 2a41f306f7..ff29d5f355 100644 --- a/source/libs/executor/test/executorTests.cpp +++ b/source/libs/executor/test/executorTests.cpp @@ -29,7 +29,7 @@ #include "taos.h" #include "tdef.h" #include "tvariant.h" -#include "tep.h" +#include "tdatablock.h" #include "trpc.h" #include "stub.h" #include "executor.h" diff --git a/source/libs/executor/test/sortTests.cpp b/source/libs/executor/test/sortTests.cpp index 612b0705ea..1956794395 100644 --- a/source/libs/executor/test/sortTests.cpp +++ b/source/libs/executor/test/sortTests.cpp @@ -29,8 +29,8 @@ #include "executor.h" #include "stub.h" #include "taos.h" +#include "tdatablock.h" #include "tdef.h" -#include "tep.h" #include "trpc.h" #include "tvariant.h" diff --git a/source/libs/function/src/taggfunction.c b/source/libs/function/src/taggfunction.c index c36fc9c659..7fb63f5910 100644 --- a/source/libs/function/src/taggfunction.c +++ b/source/libs/function/src/taggfunction.c @@ -28,8 +28,8 @@ #include "tbuffer.h" #include "tcompression.h" //#include "queryLog.h" +#include "tdatablock.h" #include "tudf.h" -#include "tep.h" #define GET_INPUT_DATA_LIST(x) ((char *)((x)->pInput)) #define GET_INPUT_DATA(x, y) ((char*) colDataGetData((x)->pInput, (y))) diff --git a/source/libs/index/inc/indexInt.h b/source/libs/index/inc/indexInt.h index 6cb5142dd8..d47954ac1f 100644 --- a/source/libs/index/inc/indexInt.h +++ b/source/libs/index/inc/indexInt.h @@ -96,7 +96,7 @@ typedef struct SIndexTermQuery { typedef struct Iterate Iterate; typedef struct IterateValue { - int8_t type; + int8_t type; // opera type, ADD_VALUE/DELETE_VALUE char* colVal; SArray* val; } IterateValue; diff --git a/source/libs/index/inc/index_util.h b/source/libs/index/inc/index_util.h index 985dd657d0..313839bf1d 100644 --- a/source/libs/index/inc/index_util.h +++ b/source/libs/index/inc/index_util.h @@ -54,7 +54,22 @@ extern "C" { * output:[4, 5] */ void iIntersection(SArray *interResults, SArray *finalResult); + +/* multi sorted result intersection + * input: [1, 2, 4, 5] + * [2, 3, 4, 5] + * [1, 4, 5] + * output:[1, 2, 3, 4, 5] + */ void iUnion(SArray *interResults, SArray *finalResult); + +/* sorted array + * total: [1, 2, 4, 5, 7, 8] + * except: [4, 5] + * return: [1, 2, 7, 8] + */ + +void iExcept(SArray *total, SArray *except); #ifdef __cplusplus } #endif diff --git a/source/libs/index/src/index.c b/source/libs/index/src/index.c index 168e819073..75dc05fc5b 100644 --- a/source/libs/index/src/index.c +++ b/source/libs/index/src/index.c @@ -16,6 +16,7 @@ #include "index.h" #include "indexInt.h" #include "index_cache.h" +#include "index_comm.h" #include "index_tfile.h" #include "index_util.h" #include "tdef.h" @@ -30,8 +31,18 @@ void* indexQhandle = NULL; -static char JSON_COLUMN[] = "JSON"; - +#define INDEX_MERGE_ADD_DEL(src, dst, tgt) \ + { \ + bool f = false; \ + for (int i = 0; i < taosArrayGetSize(src); i++) { \ + if (*(uint64_t*)taosArrayGet(src, i) == tgt) { \ + f = true; \ + } \ + } \ + if (f == false) { \ + taosArrayPush(dst, &tgt); \ + } \ + } void indexInit() { // refactor later indexQhandle = taosInitScheduler(INDEX_QUEUE_SIZE, INDEX_NUM_OF_THREADS, "index"); @@ -52,6 +63,12 @@ typedef struct SIdxColInfo { int cVersion; } SIdxColInfo; +typedef struct SIdxTempResult { + SArray* total; + SArray* added; + SArray* deled; +} SIdxTempResult; + static pthread_once_t isInit = PTHREAD_ONCE_INIT; // static void indexInit(); static int indexTermSearch(SIndex* sIdx, SIndexTermQuery* term, SArray** result); @@ -62,8 +79,7 @@ static int indexMergeFinalResults(SArray* interResults, EIndexOperatorType oTyp static int indexGenTFile(SIndex* index, IndexCache* cache, SArray* batch); // merge cache and tfile by opera type -static void indexMergeCacheAndTFile(SArray* result, IterateValue* icache, IterateValue* iTfv); -static void indexMergeSameKey(SArray* result, TFileValue* tv); +static void indexMergeCacheAndTFile(SArray* result, IterateValue* icache, IterateValue* iTfv, SIdxTempResult* helper); // static int32_t indexSerialTermKey(SIndexTerm* itm, char* buf); // int32_t indexSerialKey(ICacheKey* key, char* buf); @@ -379,7 +395,6 @@ static void indexInterResultsDestroy(SArray* results) { static int indexMergeFinalResults(SArray* interResults, EIndexOperatorType oType, SArray* fResults) { // refactor, merge interResults into fResults by oType - for (int i = 0; i < taosArrayGetSize(interResults); i--) { SArray* t = taosArrayGetP(interResults, i); taosArraySort(t, uidCompare); @@ -398,44 +413,82 @@ static int indexMergeFinalResults(SArray* interResults, EIndexOperatorType oType return 0; } -static void indexMergeSameKey(SArray* result, TFileValue* tv) { - int32_t sz = result ? taosArrayGetSize(result) : 0; +SIdxTempResult* sIdxTempResultCreate() { + SIdxTempResult* tr = calloc(1, sizeof(SIdxTempResult)); + tr->total = taosArrayInit(4, sizeof(uint64_t)); + tr->added = taosArrayInit(4, sizeof(uint64_t)); + tr->deled = taosArrayInit(4, sizeof(uint64_t)); + return tr; +} +void sIdxTempResultClear(SIdxTempResult* tr) { + if (tr == NULL) { + return; + } + taosArrayClear(tr->total); + taosArrayClear(tr->added); + taosArrayClear(tr->deled); +} +void sIdxTempResultDestroy(SIdxTempResult* tr) { + if (tr == NULL) { + return; + } + taosArrayDestroy(tr->total); + taosArrayDestroy(tr->added); + taosArrayDestroy(tr->deled); +} +static void sIdxTempResultMergeTo(SArray* result, SIdxTempResult* tr) { + taosArraySort(tr->total, uidCompare); + taosArraySort(tr->added, uidCompare); + taosArraySort(tr->deled, uidCompare); + + SArray* arrs = taosArrayInit(2, sizeof(void*)); + taosArrayPush(arrs, &tr->total); + taosArrayPush(arrs, &tr->added); + + iUnion(arrs, result); + taosArrayDestroy(arrs); + + iExcept(result, tr->deled); +} +static void indexMayMergeTempToFinalResult(SArray* result, TFileValue* tfv, SIdxTempResult* tr) { + int32_t sz = taosArrayGetSize(result); if (sz > 0) { - // TODO(yihao): remove duplicate tableid TFileValue* lv = taosArrayGetP(result, sz - 1); - // indexError("merge colVal: %s", lv->colVal); - if (strcmp(lv->colVal, tv->colVal) == 0) { - taosArrayAddAll(lv->tableId, tv->tableId); - tfileValueDestroy(tv); + if (tfv != NULL && strcmp(lv->colVal, tfv->colVal) != 0) { + sIdxTempResultMergeTo(lv->tableId, tr); + sIdxTempResultClear(tr); + + taosArrayPush(result, &tfv); + } else if (tfv == NULL) { + // handle last iterator + sIdxTempResultMergeTo(lv->tableId, tr); } else { - taosArrayPush(result, &tv); + // temp result saved in help + tfileValueDestroy(tfv); } } else { - taosArrayPush(result, &tv); + taosArrayPush(result, &tfv); } } -static void indexMergeCacheAndTFile(SArray* result, IterateValue* cv, IterateValue* tv) { - // opt - char* colVal = (cv != NULL) ? cv->colVal : tv->colVal; - // design merge-algorithm later, too complicated to handle all kind of situation +static void indexMergeCacheAndTFile(SArray* result, IterateValue* cv, IterateValue* tv, SIdxTempResult* tr) { + char* colVal = (cv != NULL) ? cv->colVal : tv->colVal; TFileValue* tfv = tfileValueCreate(colVal); + + indexMayMergeTempToFinalResult(result, tfv, tr); + if (cv != NULL) { + uint64_t id = *(uint64_t*)taosArrayGet(cv->val, 0); if (cv->type == ADD_VALUE) { - taosArrayAddAll(tfv->tableId, cv->val); + INDEX_MERGE_ADD_DEL(tr->deled, tr->added, id) } else if (cv->type == DEL_VALUE) { - } else if (cv->type == UPDATE_VALUE) { - } else { - // do nothing + INDEX_MERGE_ADD_DEL(tr->added, tr->deled, id) } } if (tv != NULL) { - // opt later - taosArrayAddAll(tfv->tableId, tv->val); + taosArrayAddAll(tr->total, tv->val); } - - indexMergeSameKey(result, tfv); } -static void indexDestroyTempResult(SArray* result) { +static void indexDestroyFinalResult(SArray* result) { int32_t sz = result ? taosArrayGetSize(result) : 0; for (size_t i = 0; i < sz; i++) { TFileValue* tv = taosArrayGetP(result, i); @@ -443,6 +496,7 @@ static void indexDestroyTempResult(SArray* result) { } taosArrayDestroy(result); } + int indexFlushCacheToTFile(SIndex* sIdx, void* cache) { if (sIdx == NULL) { return -1; @@ -467,6 +521,8 @@ int indexFlushCacheToTFile(SIndex* sIdx, void* cache) { bool cn = cacheIter ? cacheIter->next(cacheIter) : false; bool tn = tfileIter ? tfileIter->next(tfileIter) : false; + + SIdxTempResult* tr = sIdxTempResultCreate(); while (cn == true || tn == true) { IterateValue* cv = (cn == true) ? cacheIter->getValue(cacheIter) : NULL; IterateValue* tv = (tn == true) ? tfileIter->getValue(tfileIter) : NULL; @@ -480,19 +536,22 @@ int indexFlushCacheToTFile(SIndex* sIdx, void* cache) { comp = 1; } if (comp == 0) { - indexMergeCacheAndTFile(result, cv, tv); + indexMergeCacheAndTFile(result, cv, tv, tr); cn = cacheIter->next(cacheIter); tn = tfileIter->next(tfileIter); } else if (comp < 0) { - indexMergeCacheAndTFile(result, cv, NULL); + indexMergeCacheAndTFile(result, cv, NULL, tr); cn = cacheIter->next(cacheIter); } else { - indexMergeCacheAndTFile(result, NULL, tv); + indexMergeCacheAndTFile(result, NULL, tv, tr); tn = tfileIter->next(tfileIter); } } + indexMayMergeTempToFinalResult(result, NULL, tr); + sIdxTempResultDestroy(tr); + int ret = indexGenTFile(sIdx, pCache, result); - indexDestroyTempResult(result); + indexDestroyFinalResult(result); indexCacheDestroyImm(pCache); diff --git a/source/libs/index/src/index_cache.c b/source/libs/index/src/index_cache.c index 599bac3fe6..1ac72e10a9 100644 --- a/source/libs/index/src/index_cache.c +++ b/source/libs/index/src/index_cache.c @@ -267,6 +267,10 @@ static int indexQueryMem(MemTable* mem, CacheTerm* ct, EIndexQueryType qtype, SA SSkipListNode* node = tSkipListIterGet(iter); if (node != NULL) { CacheTerm* c = (CacheTerm*)SL_GET_NODE_DATA(node); + // if (c->operaType == ADD_VALUE) { + //} else if (c->operaType == DEL_VALUE) { + //} + if (c->operaType == ADD_VALUE || qtype == QUERY_TERM) { if (strcmp(c->colVal, ct->colVal) == 0) { taosArrayPush(result, &c->uid); @@ -411,17 +415,8 @@ static bool indexCacheIteratorNext(Iterate* itera) { SSkipListNode* node = tSkipListIterGet(iter); CacheTerm* ct = (CacheTerm*)SL_GET_NODE_DATA(node); - // equal func - // if (iv->colVal != NULL && ct->colVal != NULL) { - // if (0 == strcmp(iv->colVal, ct->colVal)) { if (iv->type == ADD_VALUE) } - //} else { - // tIterVal.colVal = calloc(1, strlen(ct->colVal) + 1); - // tIterval.colVal = tstrdup(ct->colVal); - //} iv->type = ct->operaType; iv->colVal = tstrdup(ct->colVal); - // iv->colVal = calloc(1, strlen(ct->colVal) + 1); - // memcpy(iv->colVal, ct->colVal, strlen(ct->colVal)); taosArrayPush(iv->val, &ct->uid); } diff --git a/source/libs/index/src/index_tfile.c b/source/libs/index/src/index_tfile.c index 0947c796b2..f5f46b0617 100644 --- a/source/libs/index/src/index_tfile.c +++ b/source/libs/index/src/index_tfile.c @@ -102,7 +102,6 @@ void tfileCacheDestroy(TFileCache* tcache) { if (tcache == NULL) { return; } - // free table cache TFileReader** reader = taosHashIterate(tcache->tableCache, NULL); while (reader) { @@ -429,6 +428,7 @@ static bool tfileIteratorNext(Iterate* iiter) { return false; } + iv->type = ADD_VALUE; // value in tfile always ADD_VALUE iv->colVal = colVal; return true; // std::string key(ch, sz); diff --git a/source/libs/index/src/index_util.c b/source/libs/index/src/index_util.c index fcaab968c2..fc28484de9 100644 --- a/source/libs/index/src/index_util.c +++ b/source/libs/index/src/index_util.c @@ -14,6 +14,7 @@ */ #include "index_util.h" #include "index.h" + typedef struct MergeIndex { int idx; int len; @@ -111,6 +112,26 @@ void iUnion(SArray *inters, SArray *final) { break; } } - tfree(mi); } + +void iExcept(SArray *total, SArray *except) { + int32_t tsz = taosArrayGetSize(total); + int32_t esz = taosArrayGetSize(except); + if (esz == 0 || tsz == 0) { + return; + } + + int vIdx = 0; + for (int i = 0; i < tsz; i++) { + uint64_t val = *(uint64_t *)taosArrayGet(total, i); + int idx = iBinarySearch(except, 0, esz - 1, val); + if (idx >= 0 && idx < esz && *(uint64_t *)taosArrayGet(except, idx) == val) { + continue; + } + taosArraySet(total, vIdx, &val); + vIdx += 1; + } + + taosArrayPopTailBatch(total, tsz - vIdx); +} diff --git a/source/libs/index/test/fstTest.cc b/source/libs/index/test/fstTest.cc index cb3206a611..618e20bc4b 100644 --- a/source/libs/index/test/fstTest.cc +++ b/source/libs/index/test/fstTest.cc @@ -19,7 +19,7 @@ static std::string fileName = "/tmp/tindex.tindex"; class FstWriter { public: FstWriter() { - remove(fileName.c_str()); + taosRemoveFile(fileName.c_str()); _wc = writerCtxCreate(TFile, fileName.c_str(), false, 64 * 1024 * 1024); _b = fstBuilderCreate(_wc, 0); } diff --git a/source/libs/index/test/jsonUT.cc b/source/libs/index/test/jsonUT.cc index e5c79d137f..df9f8b8439 100644 --- a/source/libs/index/test/jsonUT.cc +++ b/source/libs/index/test/jsonUT.cc @@ -105,6 +105,22 @@ TEST_F(JsonEnv, testWriteMillonData) { } indexMultiTermDestroy(terms); } + { + std::string colName("voltagefdadfa"); + std::string colVal("abxxxxxxxxxxxx"); + for (uint i = 0; i < 1000; i++) { + colVal[i % colVal.size()] = '0' + i % 128; + SIndexTerm* term = indexTermCreate(1, ADD_VALUE, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(), + colVal.c_str(), colVal.size()); + + SIndexMultiTerm* terms = indexMultiTermCreate(); + indexMultiTermAdd(terms, term); + for (size_t i = 0; i < 1000; i++) { + tIndexJsonPut(index, terms, i); + } + indexMultiTermDestroy(terms); + } + } { std::string colName("voltagefdadfa"); std::string colVal("abxxxxxxxxxxxx"); diff --git a/source/libs/index/test/utilUT.cc b/source/libs/index/test/utilUT.cc index aeff20d488..6ba8cc9525 100644 --- a/source/libs/index/test/utilUT.cc +++ b/source/libs/index/test/utilUT.cc @@ -224,3 +224,84 @@ TEST_F(UtilEnv, 04union) { iUnion(src, rslt); assert(taosArrayGetSize(rslt) == 12); } +TEST_F(UtilEnv, 01Except) { + SArray *total = taosArrayInit(4, sizeof(uint64_t)); + { + uint64_t arr1[] = {1, 4, 5, 6}; + for (int i = 0; i < sizeof(arr1) / sizeof(arr1[0]); i++) { + taosArrayPush(total, &arr1[i]); + } + } + + SArray *except = taosArrayInit(4, sizeof(uint64_t)); + { + uint64_t arr1[] = {1, 4, 5, 6}; + for (int i = 0; i < sizeof(arr1) / sizeof(arr1[0]); i++) { + taosArrayPush(except, &arr1[i]); + } + } + iExcept(total, except); + ASSERT_EQ(taosArrayGetSize(total), 0); + + taosArrayClear(total); + taosArrayClear(except); + + { + uint64_t arr1[] = {1, 4, 5, 6, 7, 8}; + for (int i = 0; i < sizeof(arr1) / sizeof(arr1[0]); i++) { + taosArrayPush(total, &arr1[i]); + } + } + + { + uint64_t arr1[] = {2, 4, 5, 6}; + for (int i = 0; i < sizeof(arr1) / sizeof(arr1[0]); i++) { + taosArrayPush(except, &arr1[i]); + } + } + iExcept(total, except); + ASSERT_EQ(taosArrayGetSize(total), 3); + + taosArrayClear(total); + taosArrayClear(except); + { + uint64_t arr1[] = {1, 4, 5, 6, 7, 8, 10, 100}; + for (int i = 0; i < sizeof(arr1) / sizeof(arr1[0]); i++) { + taosArrayPush(total, &arr1[i]); + } + } + + { + uint64_t arr1[] = {2, 4, 5, 6}; + for (int i = 0; i < sizeof(arr1) / sizeof(arr1[0]); i++) { + taosArrayPush(except, &arr1[i]); + } + } + iExcept(total, except); + ASSERT_EQ(taosArrayGetSize(total), 5); + ASSERT_EQ(*(uint64_t *)taosArrayGet(total, 0), 1); + ASSERT_EQ(*(uint64_t *)taosArrayGet(total, 1), 7); + ASSERT_EQ(*(uint64_t *)taosArrayGet(total, 2), 8); + ASSERT_EQ(*(uint64_t *)taosArrayGet(total, 3), 10); + ASSERT_EQ(*(uint64_t *)taosArrayGet(total, 4), 100); + + taosArrayClear(total); + taosArrayClear(except); + { + uint64_t arr1[] = {1, 100}; + for (int i = 0; i < sizeof(arr1) / sizeof(arr1[0]); i++) { + taosArrayPush(total, &arr1[i]); + } + } + + { + uint64_t arr1[] = {2, 4, 5, 6}; + for (int i = 0; i < sizeof(arr1) / sizeof(arr1[0]); i++) { + taosArrayPush(except, &arr1[i]); + } + } + iExcept(total, except); + ASSERT_EQ(taosArrayGetSize(total), 2); + ASSERT_EQ(*(uint64_t *)taosArrayGet(total, 0), 1); + ASSERT_EQ(*(uint64_t *)taosArrayGet(total, 1), 100); +} diff --git a/source/libs/parser/test/mockCatalogService.cpp b/source/libs/parser/test/mockCatalogService.cpp index 00d64bd12a..cdb547ee1b 100644 --- a/source/libs/parser/test/mockCatalogService.cpp +++ b/source/libs/parser/test/mockCatalogService.cpp @@ -15,10 +15,10 @@ #include "mockCatalogService.h" -#include "tep.h" #include #include #include +#include "tdatablock.h" #include "tname.h" #include "ttypes.h" diff --git a/source/libs/qcom/src/queryUtil.c b/source/libs/qcom/src/queryUtil.c index a2165453d5..63fbf59c06 100644 --- a/source/libs/qcom/src/queryUtil.c +++ b/source/libs/qcom/src/queryUtil.c @@ -1,23 +1,36 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + #include "os.h" -#include "tmsg.h" #include "query.h" #include "tglobal.h" -#include "tsched.h" +#include "tmsg.h" #include "trpc.h" +#include "tsched.h" -#define VALIDNUMOFCOLS(x) ((x) >= TSDB_MIN_COLUMNS && (x) <= TSDB_MAX_COLUMNS) -#define VALIDNUMOFTAGS(x) ((x) >= 0 && (x) <= TSDB_MAX_TAGS) +#define VALIDNUMOFCOLS(x) ((x) >= TSDB_MIN_COLUMNS && (x) <= TSDB_MAX_COLUMNS) +#define VALIDNUMOFTAGS(x) ((x) >= 0 && (x) <= TSDB_MAX_TAGS) static struct SSchema _s = { .colId = TSDB_TBNAME_COLUMN_INDEX, - .type = TSDB_DATA_TYPE_BINARY, + .type = TSDB_DATA_TYPE_BINARY, .bytes = TSDB_TABLE_NAME_LEN + VARSTR_HEADER_SIZE, .name = "tbname", }; -const SSchema* tGetTbnameColumnSchema() { - return &_s; -} +const SSchema* tGetTbnameColumnSchema() { return &_s; } static bool doValidateSchema(SSchema* pSchema, int32_t numOfCols, int32_t maxLen) { int32_t rowLen = 0; @@ -87,7 +100,7 @@ int32_t initTaskQueue() { double factor = 4.0; int32_t numOfThreads = TMAX((int)(tsNumOfCores * tsNumOfThreadsPerCore / factor), 2); - + int32_t queueSize = tsMaxConnections * 2; pTaskQueue = taosInitScheduler(queueSize, numOfThreads, "tsc"); if (NULL == pTaskQueue) { @@ -96,19 +109,21 @@ int32_t initTaskQueue() { } qDebug("task queue is initialized, numOfThreads: %d", numOfThreads); + return 0; } int32_t cleanupTaskQueue() { taosCleanUpScheduler(pTaskQueue); + return 0; } static void execHelper(struct SSchedMsg* pSchedMsg) { assert(pSchedMsg != NULL && pSchedMsg->ahandle != NULL); - __async_exec_fn_t execFn = (__async_exec_fn_t) pSchedMsg->ahandle; - int32_t code = execFn(pSchedMsg->thandle); + __async_exec_fn_t execFn = (__async_exec_fn_t)pSchedMsg->ahandle; + int32_t code = execFn(pSchedMsg->thandle); if (code != 0 && pSchedMsg->msg != NULL) { - *(int32_t*) pSchedMsg->msg = code; + *(int32_t*)pSchedMsg->msg = code; } } @@ -116,34 +131,33 @@ int32_t taosAsyncExec(__async_exec_fn_t execFn, void* execParam, int32_t* code) assert(execFn != NULL); SSchedMsg schedMsg = {0}; - schedMsg.fp = execHelper; + schedMsg.fp = execHelper; schedMsg.ahandle = execFn; schedMsg.thandle = execParam; - schedMsg.msg = code; + schedMsg.msg = code; taosScheduleTask(pTaskQueue, &schedMsg); + return 0; } -int32_t asyncSendMsgToServer(void *pTransporter, SEpSet* epSet, int64_t* pTransporterId, const SMsgSendInfo* pInfo) { - char *pMsg = rpcMallocCont(pInfo->msgInfo.len); +int32_t asyncSendMsgToServer(void* pTransporter, SEpSet* epSet, int64_t* pTransporterId, const SMsgSendInfo* pInfo) { + char* pMsg = rpcMallocCont(pInfo->msgInfo.len); if (NULL == pMsg) { - qError("0x%"PRIx64" msg:%s malloc failed", pInfo->requestId, TMSG_INFO(pInfo->msgType)); + qError("0x%" PRIx64 " msg:%s malloc failed", pInfo->requestId, TMSG_INFO(pInfo->msgType)); terrno = TSDB_CODE_TSC_OUT_OF_MEMORY; return terrno; } memcpy(pMsg, pInfo->msgInfo.pData, pInfo->msgInfo.len); - SRpcMsg rpcMsg = { - .msgType = pInfo->msgType, - .pCont = pMsg, - .contLen = pInfo->msgInfo.len, - .ahandle = (void*) pInfo, - .handle = pInfo->msgInfo.handle, - .code = 0 - }; + SRpcMsg rpcMsg = {.msgType = pInfo->msgType, + .pCont = pMsg, + .contLen = pInfo->msgInfo.len, + .ahandle = (void*)pInfo, + .handle = pInfo->msgInfo.handle, + .code = 0}; assert(pInfo->fp != NULL); rpcSendRequest(pTransporter, epSet, &rpcMsg, pTransporterId); return TSDB_CODE_SUCCESS; -} \ No newline at end of file +} diff --git a/source/libs/qworker/test/qworkerTests.cpp b/source/libs/qworker/test/qworkerTests.cpp index 94d4260696..231f0c7fff 100644 --- a/source/libs/qworker/test/qworkerTests.cpp +++ b/source/libs/qworker/test/qworkerTests.cpp @@ -33,7 +33,7 @@ #include "taos.h" #include "tdef.h" #include "tvariant.h" -#include "tep.h" +#include "tdatablock.h" #include "trpc.h" #include "planner.h" #include "qworker.h" diff --git a/source/libs/scalar/inc/filterInt.h b/source/libs/scalar/inc/filterInt.h index b4fe9a67ca..e1ffb2efd1 100644 --- a/source/libs/scalar/inc/filterInt.h +++ b/source/libs/scalar/inc/filterInt.h @@ -20,13 +20,13 @@ extern "C" { #endif +#include "query.h" +#include "querynodes.h" +#include "scalar.h" +#include "tcommon.h" +#include "tdatablock.h" #include "thash.h" #include "tname.h" -#include "tcommon.h" -#include "scalar.h" -#include "querynodes.h" -#include "query.h" -#include "tep.h" #define FILTER_DEFAULT_GROUP_SIZE 4 #define FILTER_DEFAULT_UNIT_SIZE 4 diff --git a/source/libs/scalar/src/filter.c b/source/libs/scalar/src/filter.c index e5be22c0a8..a7aea5a7e5 100644 --- a/source/libs/scalar/src/filter.c +++ b/source/libs/scalar/src/filter.c @@ -16,11 +16,11 @@ #include #include "thash.h" //#include "queryLog.h" -#include "tcompare.h" +#include "filter.h" #include "filterInt.h" #include "sclInt.h" -#include "filter.h" -#include "tep.h" +#include "tcompare.h" +#include "tdatablock.h" OptrStr gOptrStr[] = { {0, "invalid"}, diff --git a/source/libs/scalar/src/scalar.c b/source/libs/scalar/src/scalar.c index 0245226dfc..b8cdda9ed1 100644 --- a/source/libs/scalar/src/scalar.c +++ b/source/libs/scalar/src/scalar.c @@ -1,11 +1,11 @@ -#include "nodes.h" -#include "tcommon.h" -#include "querynodes.h" #include "function.h" #include "functionMgt.h" -#include "sclvector.h" +#include "nodes.h" +#include "querynodes.h" #include "sclInt.h" -#include "tep.h" +#include "sclvector.h" +#include "tcommon.h" +#include "tdatablock.h" int32_t scalarGetOperatorParamNum(EOperatorType type) { if (OP_TYPE_IS_NULL == type || OP_TYPE_IS_NOT_NULL == type || OP_TYPE_IS_TRUE == type || OP_TYPE_IS_NOT_TRUE == type diff --git a/source/libs/scalar/src/sclvector.c b/source/libs/scalar/src/sclvector.c index 3c431ff33f..b066dd2e77 100644 --- a/source/libs/scalar/src/sclvector.c +++ b/source/libs/scalar/src/sclvector.c @@ -15,15 +15,15 @@ #include "os.h" -#include "ttypes.h" -#include "sclvector.h" -#include "tcompare.h" -#include "querynodes.h" +#include "filter.h" #include "filterInt.h" #include "query.h" +#include "querynodes.h" #include "sclInt.h" -#include "tep.h" -#include "filter.h" +#include "sclvector.h" +#include "tcompare.h" +#include "tdatablock.h" +#include "ttypes.h" //GET_TYPED_DATA(v, double, pRight->type, (char *)&((right)[i])); diff --git a/source/libs/scalar/test/filter/filterTests.cpp b/source/libs/scalar/test/filter/filterTests.cpp index 13829618a2..08210aa2f0 100644 --- a/source/libs/scalar/test/filter/filterTests.cpp +++ b/source/libs/scalar/test/filter/filterTests.cpp @@ -33,12 +33,18 @@ #include "taos.h" #include "tdef.h" #include "tvariant.h" -#include "tep.h" +#include "tdatablock.h" #include "stub.h" #include "scalar.h" -#include "nodes.h" -#include "tlog.h" #include "filter.h" +#include "nodes.h" +#include "scalar.h" +#include "stub.h" +#include "taos.h" +#include "tdatablock.h" +#include "tdef.h" +#include "tlog.h" +#include "tvariant.h" namespace { diff --git a/source/libs/scalar/test/scalar/scalarTests.cpp b/source/libs/scalar/test/scalar/scalarTests.cpp index 8eef1836a5..faf13f0a82 100644 --- a/source/libs/scalar/test/scalar/scalarTests.cpp +++ b/source/libs/scalar/test/scalar/scalarTests.cpp @@ -33,7 +33,7 @@ #include "taos.h" #include "tdef.h" #include "tvariant.h" -#include "tep.h" +#include "tdatablock.h" #include "stub.h" #include "scalar.h" #include "nodes.h" diff --git a/source/libs/scheduler/test/schedulerTests.cpp b/source/libs/scheduler/test/schedulerTests.cpp index 1a34c20ba0..daf65cf251 100644 --- a/source/libs/scheduler/test/schedulerTests.cpp +++ b/source/libs/scheduler/test/schedulerTests.cpp @@ -34,8 +34,20 @@ #include "tvariant.h" #include "catalog.h" #include "scheduler.h" -#include "tep.h" +#include "taos.h" +#include "tdatablock.h" +#include "tdef.h" #include "trpc.h" +#include "tvariant.h" + +#pragma GCC diagnostic push +#pragma GCC diagnostic ignored "-Wwrite-strings" +#pragma GCC diagnostic ignored "-Wunused-function" +#pragma GCC diagnostic ignored "-Wunused-variable" +#pragma GCC diagnostic ignored "-Wsign-compare" +#pragma GCC diagnostic ignored "-Wreturn-type" +#pragma GCC diagnostic ignored "-Wformat" + #include "schedulerInt.h" #include "stub.h" #include "tref.h" diff --git a/source/libs/sync/src/syncIO.c b/source/libs/sync/src/syncIO.c index 3ba145a96b..757718282a 100644 --- a/source/libs/sync/src/syncIO.c +++ b/source/libs/sync/src/syncIO.c @@ -14,7 +14,7 @@ */ #include "syncIO.h" -#include +#include #include "syncOnMessage.h" #include "tglobal.h" #include "ttimer.h" diff --git a/source/libs/tfs/src/tfs.c b/source/libs/tfs/src/tfs.c index 26fa90bdef..2579490791 100644 --- a/source/libs/tfs/src/tfs.c +++ b/source/libs/tfs/src/tfs.c @@ -202,7 +202,7 @@ void tfsDirname(const STfsFile *pFile, char *dest) { tstrncpy(dest, dirname(tname), TSDB_FILENAME_LEN); } -int32_t tfsRemoveFile(const STfsFile *pFile) { return remove(pFile->aname); } +int32_t tfsRemoveFile(const STfsFile *pFile) { return taosRemoveFile(pFile->aname); } int32_t tfsCopyFile(const STfsFile *pFile1, const STfsFile *pFile2) { return taosCopyFile(pFile1->aname, pFile2->aname); diff --git a/source/libs/transport/test/pushClient.c b/source/libs/transport/test/pushClient.c index 4842a0c800..f4babc9980 100644 --- a/source/libs/transport/test/pushClient.c +++ b/source/libs/transport/test/pushClient.c @@ -14,7 +14,7 @@ */ #include -#include +#include #include "os.h" #include "rpcLog.h" #include "taoserror.h" diff --git a/source/libs/transport/test/pushServer.c b/source/libs/transport/test/pushServer.c index a1c181ac98..c763b7bd8a 100644 --- a/source/libs/transport/test/pushServer.c +++ b/source/libs/transport/test/pushServer.c @@ -192,7 +192,7 @@ int main(int argc, char *argv[]) { if (pDataFile != NULL) { taosCloseFile(&pDataFile); - remove(dataName); + taosRemoveFile(dataName); } return 0; diff --git a/source/libs/transport/test/rclient.c b/source/libs/transport/test/rclient.c index bcdf32bf6a..6fc935cb61 100644 --- a/source/libs/transport/test/rclient.c +++ b/source/libs/transport/test/rclient.c @@ -14,7 +14,7 @@ */ #include -#include +#include #include "os.h" #include "rpcLog.h" #include "taoserror.h" diff --git a/source/libs/transport/test/rserver.c b/source/libs/transport/test/rserver.c index 5432a07649..da5284b5c5 100644 --- a/source/libs/transport/test/rserver.c +++ b/source/libs/transport/test/rserver.c @@ -181,7 +181,7 @@ int main(int argc, char *argv[]) { if (pDataFile != NULL) { taosCloseFile(&pDataFile); - remove(dataName); + taosRemoveFile(dataName); } return 0; diff --git a/source/libs/transport/test/syncClient.c b/source/libs/transport/test/syncClient.c index b7ef296b9d..f43fa7aae6 100644 --- a/source/libs/transport/test/syncClient.c +++ b/source/libs/transport/test/syncClient.c @@ -14,7 +14,7 @@ */ #include -#include +#include #include "os.h" #include "rpcLog.h" #include "taoserror.h" diff --git a/source/libs/transport/test/transUT.cc b/source/libs/transport/test/transUT.cc index 5c364a4c33..5edddb006b 100644 --- a/source/libs/transport/test/transUT.cc +++ b/source/libs/transport/test/transUT.cc @@ -15,10 +15,10 @@ #include #include #include -#include "tep.h" +#include "tdatablock.h" #include "tglobal.h" -#include "trpc.h" #include "tlog.h" +#include "trpc.h" using namespace std; const char *label = "APP"; diff --git a/source/libs/wal/src/walMeta.c b/source/libs/wal/src/walMeta.c index e64260c541..2a4f3497e4 100644 --- a/source/libs/wal/src/walMeta.c +++ b/source/libs/wal/src/walMeta.c @@ -379,7 +379,7 @@ int walSaveMeta(SWal* pWal) { // delete old file if (metaVer > -1) { walBuildMetaName(pWal, metaVer, fnameStr); - remove(fnameStr); + taosRemoveFile(fnameStr); } free(serialized); return 0; diff --git a/source/libs/wal/src/walWrite.c b/source/libs/wal/src/walWrite.c index 4b1f0ba306..699da75790 100644 --- a/source/libs/wal/src/walWrite.c +++ b/source/libs/wal/src/walWrite.c @@ -55,9 +55,9 @@ int32_t walRollback(SWal *pWal, int64_t ver) { int fileSetSize = taosArrayGetSize(pWal->fileInfoSet); for (int i = pWal->writeCur; i < fileSetSize; i++) { walBuildLogName(pWal, ((SWalFileInfo *)taosArrayGet(pWal->fileInfoSet, i))->firstVer, fnameStr); - remove(fnameStr); + taosRemoveFile(fnameStr); walBuildIdxName(pWal, ((SWalFileInfo *)taosArrayGet(pWal->fileInfoSet, i))->firstVer, fnameStr); - remove(fnameStr); + taosRemoveFile(fnameStr); } // pop from fileInfoSet taosArraySetSize(pWal->fileInfoSet, pWal->writeCur + 1); @@ -162,8 +162,8 @@ int32_t walEndSnapshot(SWal *pWal) { } // iterate files, until the searched result for (SWalFileInfo *iter = pWal->fileInfoSet->pData; iter < pInfo; iter++) { - if ((pWal->cfg.retentionSize != -1 && pWal->totSize > pWal->cfg.retentionSize) - || (pWal->cfg.retentionPeriod != -1 && iter->closeTs + pWal->cfg.retentionPeriod > ts)) { + if ((pWal->cfg.retentionSize != -1 && pWal->totSize > pWal->cfg.retentionSize) || + (pWal->cfg.retentionPeriod != -1 && iter->closeTs + pWal->cfg.retentionPeriod > ts)) { // delete according to file size or close time deleteCnt++; newTotSize -= iter->fileSize; @@ -174,9 +174,9 @@ int32_t walEndSnapshot(SWal *pWal) { for (int i = 0; i < deleteCnt; i++) { SWalFileInfo *pInfo = taosArrayGet(pWal->fileInfoSet, i); walBuildLogName(pWal, pInfo->firstVer, fnameStr); - remove(fnameStr); + taosRemoveFile(fnameStr); walBuildIdxName(pWal, pInfo->firstVer, fnameStr); - remove(fnameStr); + taosRemoveFile(fnameStr); } // make new array, remove files @@ -279,6 +279,7 @@ int64_t walWrite(SWal *pWal, int64_t index, tmsg_t msgType, const void *body, in } else { // reject skip log or rewrite log // must truncate explicitly first + terrno = TSDB_CODE_WAL_INVALID_VER; return -1; } /*if (!tfValid(pWal->pWriteLogTFile)) return -1;*/ @@ -303,16 +304,18 @@ int64_t walWrite(SWal *pWal, int64_t index, tmsg_t msgType, const void *body, in if (taosWriteFile(pWal->pWriteLogTFile, &pWal->writeHead, sizeof(SWalHead)) != sizeof(SWalHead)) { // ftruncate - code = TAOS_SYSTEM_ERROR(errno); + terrno = TAOS_SYSTEM_ERROR(errno); wError("vgId:%d, file:%" PRId64 ".log, failed to write since %s", pWal->cfg.vgId, walGetLastFileFirstVer(pWal), strerror(errno)); + return -1; } if (taosWriteFile(pWal->pWriteLogTFile, (char *)body, bodyLen) != bodyLen) { // ftruncate - code = TAOS_SYSTEM_ERROR(errno); + terrno = TAOS_SYSTEM_ERROR(errno); wError("vgId:%d, file:%" PRId64 ".log, failed to write since %s", pWal->cfg.vgId, walGetLastFileFirstVer(pWal), strerror(errno)); + return -1; } code = walWriteIndex(pWal, index, offset); @@ -329,7 +332,7 @@ int64_t walWrite(SWal *pWal, int64_t index, tmsg_t msgType, const void *body, in pthread_mutex_unlock(&pWal->mutex); - return code; + return 0; } void walFsync(SWal *pWal, bool forceFsync) { diff --git a/source/libs/wal/test/walMetaTest.cpp b/source/libs/wal/test/walMetaTest.cpp index b65a200ca1..230555e016 100644 --- a/source/libs/wal/test/walMetaTest.cpp +++ b/source/libs/wal/test/walMetaTest.cpp @@ -339,9 +339,9 @@ TEST_F(WalRetentionEnv, repairMeta1) { //getchar(); char buf[100]; sprintf(buf, "%s/meta-ver%d", pathName, 0); - remove(buf); + taosRemoveFile(buf); sprintf(buf, "%s/meta-ver%d", pathName, 1); - remove(buf); + taosRemoveFile(buf); SetUp(); //getchar(); diff --git a/source/os/src/osDir.c b/source/os/src/osDir.c index e0f5f80dab..7d7382d83f 100644 --- a/source/os/src/osDir.c +++ b/source/os/src/osDir.c @@ -49,7 +49,7 @@ void taosRemoveDir(const char *dirname) { if (de->d_type & DT_DIR) { taosRemoveDir(filename); } else { - (void)remove(filename); + (void)taosRemoveFile(filename); //printf("file:%s is removed\n", filename); } } @@ -102,7 +102,7 @@ void taosRemoveOldFiles(const char *dirname, int32_t keepDays) { if (fileSec <= 100) continue; int32_t days = (int32_t)(TABS(sec - fileSec) / 86400 + 1); if (days > keepDays) { - (void)remove(filename); + (void)taosRemoveFile(filename); //printf("file:%s is removed, days:%d keepDays:%d", filename, days, keepDays); } else { //printf("file:%s won't be removed, days:%d keepDays:%d", filename, days, keepDays); diff --git a/source/os/src/osFile.c b/source/os/src/osFile.c index 37004e9d70..652e0b5182 100644 --- a/source/os/src/osFile.c +++ b/source/os/src/osFile.c @@ -142,11 +142,13 @@ int64_t taosCopyFile(const char *from, const char *to) { _err: if (pFileFrom != NULL) taosCloseFile(&pFileFrom); if (pFileTo != NULL) taosCloseFile(&pFileTo); - remove(to); + taosRemoveFile(to); return -1; #endif } +int32_t taosRemoveFile(const char *path) { return remove(path); } + int32_t taosRenameFile(const char *oldName, const char *newName) { #if defined(_TD_WINDOWS_64) || defined(_TD_WINDOWS_32) int32_t code = MoveFileEx(oldName, newName, MOVEFILE_REPLACE_EXISTING | MOVEFILE_COPY_ALLOWED); diff --git a/source/util/src/terror.c b/source/util/src/terror.c index 176290ba38..9c2fef5b47 100644 --- a/source/util/src/terror.c +++ b/source/util/src/terror.c @@ -13,6 +13,8 @@ * along with this program. If not, see . */ +// clang-format off + #define _DEFAULT_SOURCE #include "os.h" #include "taoserror.h" @@ -408,6 +410,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_SYN_INVALID_MSGTYPE, "Invalid msg type") TAOS_DEFINE_ERROR(TSDB_CODE_WAL_APP_ERROR, "Unexpected generic error in wal") TAOS_DEFINE_ERROR(TSDB_CODE_WAL_FILE_CORRUPTED, "WAL file is corrupted") TAOS_DEFINE_ERROR(TSDB_CODE_WAL_SIZE_LIMIT, "WAL size exceeds limit") +TAOS_DEFINE_ERROR(TSDB_CODE_WAL_INVALID_VER, "WAL use invalid version") // tfs TAOS_DEFINE_ERROR(TSDB_CODE_FS_APP_ERROR, "tfs out of memory") diff --git a/source/util/src/thash.c b/source/util/src/thash.c index efbd9adddf..8b437b9797 100644 --- a/source/util/src/thash.c +++ b/source/util/src/thash.c @@ -36,25 +36,35 @@ tfree(_n); \ } while (0); +struct SHashNode { + SHashNode *next; + uint32_t hashVal; // the hash value of key + uint32_t dataLen; // length of data + uint32_t keyLen; // length of the key + uint16_t refCount; // reference count + int8_t removed; // flag to indicate removed + char data[]; +}; + typedef struct SHashEntry { - int32_t num; // number of elements in current entry - SRWLatch latch; // entry latch - SHashNode *next; + int32_t num; // number of elements in current entry + SRWLatch latch; // entry latch + SHashNode *next; } SHashEntry; -typedef struct SHashObj { - SHashEntry **hashList; - size_t capacity; // number of slots - size_t size; // number of elements in hash table - _hash_fn_t hashFp; // hash function - _equal_fn_t equalFp; // equal function - _hash_free_fn_t freeFp; // hash node free callback function - SRWLatch lock; // read-write spin lock - SHashLockTypeE type; // lock type - bool enableUpdate; // enable update - SArray *pMemBlock; // memory block allocated for SHashEntry - _hash_before_fn_t callbackFp; // function invoked before return the value to caller -} SHashObj; +struct SHashObj { + SHashEntry ** hashList; + size_t capacity; // number of slots + size_t size; // number of elements in hash table + _hash_fn_t hashFp; // hash function + _equal_fn_t equalFp; // equal function + _hash_free_fn_t freeFp; // hash node free callback function + SRWLatch lock; // read-write spin lock + SHashLockTypeE type; // lock type + bool enableUpdate; // enable update + SArray * pMemBlock; // memory block allocated for SHashEntry + _hash_before_fn_t callbackFp; // function invoked before return the value to caller +}; /* * Function definition @@ -367,7 +377,7 @@ int32_t taosHashPut(SHashObj *pHashObj, const void *key, size_t keyLen, void *da // enable resize taosHashRUnlock(pHashObj); - return pHashObj->enableUpdate ? 0 : -1; + return pHashObj->enableUpdate ? 0 : -2; } } @@ -464,7 +474,7 @@ void* taosHashGetImpl(SHashObj *pHashObj, const void *key, size_t keyLen, void** return data; } -int32_t taosHashRemoveWithData(SHashObj *pHashObj, const void *key, size_t keyLen, void *data, size_t dsize) { +int32_t taosHashRemove(SHashObj *pHashObj, const void *key, size_t keyLen) { if (pHashObj == NULL || taosHashTableEmpty(pHashObj) || key == NULL || keyLen == 0) { return -1; } @@ -507,8 +517,6 @@ int32_t taosHashRemoveWithData(SHashObj *pHashObj, const void *key, size_t keyLe prevNode->next = pNode->next; } - if (data) memcpy(data, GET_HASH_NODE_DATA(pNode), dsize); - pe->num--; atomic_sub_fetch_64(&pHashObj->size, 1); FREE_HASH_NODE(pNode); @@ -525,10 +533,6 @@ int32_t taosHashRemoveWithData(SHashObj *pHashObj, const void *key, size_t keyLe return code; } -int32_t taosHashRemove(SHashObj *pHashObj, const void *key, size_t keyLen) { - return taosHashRemoveWithData(pHashObj, key, keyLen, NULL, 0); -} - void taosHashClear(SHashObj *pHashObj) { if (pHashObj == NULL) { return; diff --git a/source/util/src/tlog.c b/source/util/src/tlog.c index 8afd04c3e6..7f5664b9cd 100644 --- a/source/util/src/tlog.c +++ b/source/util/src/tlog.c @@ -180,7 +180,7 @@ static void taosKeepOldLog(char *oldName) { char compressFileName[LOG_FILE_NAME_LEN + 20]; snprintf(compressFileName, LOG_FILE_NAME_LEN + 20, "%s.%" PRId64 ".gz", tsLogObj.logName, fileSec); if (taosCompressFile(fileName, compressFileName) == 0) { - (void)remove(fileName); + (void)taosRemoveFile(fileName); } } @@ -251,7 +251,7 @@ void taosResetLog() { tsLogObj.lines = tsLogObj.maxLines + 10; taosOpenNewLogFile(); - (void)remove(lastName); + (void)taosRemoveFile(lastName); uInfo("=================================="); uInfo(" reset log file "); diff --git a/source/util/src/tpagedbuf.c b/source/util/src/tpagedbuf.c index 5bc4b81be7..8c2fd7809d 100644 --- a/source/util/src/tpagedbuf.c +++ b/source/util/src/tpagedbuf.c @@ -563,7 +563,7 @@ void destroyDiskbasedBuf(SDiskbasedBuf* pBuf) { ps->loadPages, ps->loadBytes / (1024.0 * ps->loadPages)); } - remove(pBuf->path); + taosRemoveFile(pBuf->path); tfree(pBuf->path); SArray** p = taosHashIterate(pBuf->groupSet, NULL);