diff --git a/example/src/tmq.c b/example/src/tmq.c index d9b8c70b78..21f60ada5d 100644 --- a/example/src/tmq.c +++ b/example/src/tmq.c @@ -48,7 +48,6 @@ int32_t init_env() { return -1; } taos_free_result(pRes); - taosSsleep(1); pRes = taos_query(pConn, "use abc1"); if (taos_errno(pRes) != 0) { diff --git a/include/client/taos.h b/include/client/taos.h index 3cbf8a773b..6ff55aef19 100644 --- a/include/client/taos.h +++ b/include/client/taos.h @@ -27,10 +27,7 @@ typedef void TAOS; typedef void TAOS_STMT; typedef void TAOS_RES; typedef void **TAOS_ROW; -#if 0 -typedef void TAOS_STREAM; -#endif -typedef void TAOS_SUB; +typedef void TAOS_SUB; // Data type definition #define TSDB_DATA_TYPE_NULL 0 // 1 bytes @@ -196,12 +193,6 @@ DLL_EXPORT TAOS_RES *taos_consume(TAOS_SUB *tsub); DLL_EXPORT void taos_unsubscribe(TAOS_SUB *tsub, int keepProgress); #endif -#if 0 -DLL_EXPORT TAOS_STREAM *taos_open_stream(TAOS *taos, const char *sql, void (*fp)(void *param, TAOS_RES *, TAOS_ROW row), - int64_t stime, void *param, void (*callback)(void *)); -DLL_EXPORT void taos_close_stream(TAOS_STREAM *tstr); -#endif - DLL_EXPORT int taos_load_table_info(TAOS *taos, const char *tableNameList); DLL_EXPORT TAOS_RES *taos_schemaless_insert(TAOS *taos, char *lines[], int numLines, int protocol, int precision); @@ -241,12 +232,8 @@ DLL_EXPORT const char *tmq_err2str(tmq_resp_err_t); DLL_EXPORT tmq_resp_err_t tmq_subscribe(tmq_t *tmq, const tmq_list_t *topic_list); DLL_EXPORT tmq_resp_err_t tmq_unsubscribe(tmq_t *tmq); DLL_EXPORT tmq_resp_err_t tmq_subscription(tmq_t *tmq, tmq_list_t **topics); -DLL_EXPORT TAOS_RES *tmq_consumer_poll(tmq_t *tmq, int64_t blocking_time); +DLL_EXPORT TAOS_RES *tmq_consumer_poll(tmq_t *tmq, int64_t wait_time); DLL_EXPORT tmq_resp_err_t tmq_consumer_close(tmq_t *tmq); -#if 0 -DLL_EXPORT tmq_resp_err_t tmq_assign(tmq_t* tmq, const tmq_topic_vgroup_list_t* vgroups); -DLL_EXPORT tmq_resp_err_t tmq_assignment(tmq_t* tmq, tmq_topic_vgroup_list_t** vgroups); -#endif DLL_EXPORT tmq_resp_err_t tmq_commit(tmq_t *tmq, const tmq_topic_vgroup_list_t *offsets, int32_t async); #if 0 DLL_EXPORT tmq_resp_err_t tmq_commit_message(tmq_t* tmq, const tmq_message_t* tmqmessage, int32_t async); @@ -273,7 +260,7 @@ DLL_EXPORT char *tmq_get_topic_name(TAOS_RES *res); DLL_EXPORT int32_t tmq_get_vgroup_id(TAOS_RES *res); // TODO #if 0 -DLL_EXPORT char *tmq_get_block_table_name(TAOS_RES *res); +DLL_EXPORT char *tmq_get_table_name(TAOS_RES *res); #endif #if 0 diff --git a/include/common/tcommon.h b/include/common/tcommon.h index 7c308f9354..00f9e39c7a 100644 --- a/include/common/tcommon.h +++ b/include/common/tcommon.h @@ -99,6 +99,15 @@ typedef struct SColumnInfoData { }; } SColumnInfoData; +typedef struct SQueryTableDataCond { + STimeWindow twindow; + int32_t order; // desc|asc order to iterate the data block + int32_t numOfCols; + SColumnInfo *colList; + bool loadExternalRows; // load external rows or not + int32_t type; // data block load type: +} SQueryTableDataCond; + void* blockDataDestroy(SSDataBlock* pBlock); int32_t tEncodeDataBlock(void** buf, const SSDataBlock* pBlock); void* tDecodeDataBlock(const void* buf, SSDataBlock* pBlock); @@ -229,7 +238,6 @@ typedef struct SResSchame { char name[TSDB_COL_NAME_LEN]; } SResSchema; -// TODO move away to executor.h typedef struct SExprBasicInfo { SResSchema resSchema; int16_t numOfParams; // argument value of each function diff --git a/include/dnode/mnode/sdb/sdb.h b/include/dnode/mnode/sdb/sdb.h index d7d53ad1d0..d534b0405f 100644 --- a/include/dnode/mnode/sdb/sdb.h +++ b/include/dnode/mnode/sdb/sdb.h @@ -221,7 +221,7 @@ int32_t sdbWrite(SSdb *pSdb, SSdbRaw *pRaw); * @param pRaw The raw data. * @return int32_t 0 for success, -1 for failure. */ -int32_t sdbWriteNotFree(SSdb *pSdb, SSdbRaw *pRaw); +int32_t sdbWriteWithoutFree(SSdb *pSdb, SSdbRaw *pRaw); /** * @brief Acquire a row from sdb diff --git a/include/libs/function/function.h b/include/libs/function/function.h index ef32533e5f..ec0d644aba 100644 --- a/include/libs/function/function.h +++ b/include/libs/function/function.h @@ -202,7 +202,7 @@ typedef struct SqlFunctionCtx { SPoint1 end; SFuncExecFuncs fpSet; SScalarFuncExecFuncs sfp; - SExprInfo *pExpr; + struct SExprInfo *pExpr; struct SDiskbasedBuf *pBuf; struct SSDataBlock *pSrcBlock; int32_t curBufPage; diff --git a/include/libs/sync/sync.h b/include/libs/sync/sync.h index 0be27e6d3a..ffe435a8b2 100644 --- a/include/libs/sync/sync.h +++ b/include/libs/sync/sync.h @@ -158,6 +158,8 @@ typedef enum { int32_t syncPropose(int64_t rid, const SRpcMsg* pMsg, bool isWeak); +bool syncEnvIsStart(); + extern int32_t sDebugFlag; //----------------------------------------- diff --git a/include/util/taoserror.h b/include/util/taoserror.h index f0c3cc4b14..2c249a2d8d 100644 --- a/include/util/taoserror.h +++ b/include/util/taoserror.h @@ -139,16 +139,9 @@ int32_t* taosGetErrno(); // mnode-common #define TSDB_CODE_MND_APP_ERROR TAOS_DEF_ERROR_CODE(0, 0x0300) #define TSDB_CODE_MND_NOT_READY TAOS_DEF_ERROR_CODE(0, 0x0301) -#define TSDB_CODE_MND_MSG_NOT_PROCESSED TAOS_DEF_ERROR_CODE(0, 0x0302) -#define TSDB_CODE_MND_ACTION_IN_PROGRESS TAOS_DEF_ERROR_CODE(0, 0x0303) -#define TSDB_CODE_MND_ACTION_NEED_REPROCESSED TAOS_DEF_ERROR_CODE(0, 0x0304) -#define TSDB_CODE_MND_NO_RIGHTS TAOS_DEF_ERROR_CODE(0, 0x0305) -#define TSDB_CODE_MND_INVALID_OPTIONS TAOS_DEF_ERROR_CODE(0, 0x0306) -#define TSDB_CODE_MND_INVALID_CONNECTION TAOS_DEF_ERROR_CODE(0, 0x0307) -#define TSDB_CODE_MND_INVALID_MSG_VERSION TAOS_DEF_ERROR_CODE(0, 0x0308) -#define TSDB_CODE_MND_INVALID_MSG_LEN TAOS_DEF_ERROR_CODE(0, 0x0309) -#define TSDB_CODE_MND_INVALID_MSG_TYPE TAOS_DEF_ERROR_CODE(0, 0x030A) -#define TSDB_CODE_MND_TOO_MANY_SHELL_CONNS TAOS_DEF_ERROR_CODE(0, 0x030B) +#define TSDB_CODE_MND_ACTION_IN_PROGRESS TAOS_DEF_ERROR_CODE(0, 0x0302) +#define TSDB_CODE_MND_NO_RIGHTS TAOS_DEF_ERROR_CODE(0, 0x0303) +#define TSDB_CODE_MND_INVALID_CONNECTION TAOS_DEF_ERROR_CODE(0, 0x0304) // mnode-show #define TSDB_CODE_MND_INVALID_SHOWOBJ TAOS_DEF_ERROR_CODE(0, 0x0310) diff --git a/source/client/src/tmq.c b/source/client/src/tmq.c index f9540bc8fc..66e10f8c6a 100644 --- a/source/client/src/tmq.c +++ b/source/client/src/tmq.c @@ -56,7 +56,7 @@ struct tmq_conf_t { int8_t autoCommit; int8_t resetOffset; uint16_t port; - uint16_t autoCommitInterval; + int32_t autoCommitInterval; char* ip; char* user; char* pass; @@ -76,17 +76,25 @@ struct tmq_t { char groupId[TSDB_CGROUP_LEN]; char clientId[256]; int8_t autoCommit; - int64_t consumerId; + int32_t autoCommitInterval; int32_t resetOffsetCfg; + int64_t consumerId; tmq_commit_cb* commit_cb; // status int8_t status; - int8_t epStatus; int32_t epoch; +#if 0 + int8_t epStatus; int32_t epSkipCnt; +#endif int64_t pollCnt; + // timer + tmr_h hbTimer; + tmr_h reportTimer; + tmr_h commitTimer; + // connection STscObj* pTscObj; @@ -111,6 +119,12 @@ enum { TMQ_CONSUMER_STATUS__READY, }; +enum { + TMQ_DELAYED_TASK__HB = 1, + TMQ_DELAYED_TASK__REPORT, + TMQ_DELAYED_TASK__COMMIT, +}; + typedef struct { // statistics int64_t pollCnt; @@ -280,6 +294,50 @@ static int32_t tmqMakeTopicVgKey(char* dst, const char* topicName, int32_t vg) { return sprintf(dst, "%s:%d", topicName, vg); } +void tmqAssignDelayedHbTask(void* param, void* tmrId) { + tmq_t* tmq = (tmq_t*)param; + int8_t* pTaskType = taosAllocateQitem(sizeof(int8_t)); + *pTaskType = TMQ_DELAYED_TASK__HB; + taosWriteQitem(tmq->delayedTask, pTaskType); +} + +void tmqAssignDelayedCommitTask(void* param, void* tmrId) { + tmq_t* tmq = (tmq_t*)param; + int8_t* pTaskType = taosAllocateQitem(sizeof(int8_t)); + *pTaskType = TMQ_DELAYED_TASK__COMMIT; + taosWriteQitem(tmq->delayedTask, pTaskType); +} + +void tmqAssignDelayedReportTask(void* param, void* tmrId) { + tmq_t* tmq = (tmq_t*)param; + int8_t* pTaskType = taosAllocateQitem(sizeof(int8_t)); + *pTaskType = TMQ_DELAYED_TASK__REPORT; + taosWriteQitem(tmq->delayedTask, pTaskType); +} + +int32_t tmqHandleAllDelayedTask(tmq_t* tmq) { + STaosQall* qall = taosAllocateQall(); + taosReadAllQitems(tmq->delayedTask, qall); + while (1) { + int8_t* pTaskType = NULL; + taosGetQitem(qall, (void**)&pTaskType); + if (pTaskType == NULL) break; + + if (*pTaskType == TMQ_DELAYED_TASK__HB) { + tmqAskEp(tmq, false); + taosTmrReset(tmqAssignDelayedHbTask, 1000, tmq, tmqMgmt.timer, &tmq->hbTimer); + } else if (*pTaskType == TMQ_DELAYED_TASK__COMMIT) { + tmq_commit(tmq, NULL, true); + taosTmrReset(tmqAssignDelayedCommitTask, tmq->autoCommitInterval, tmq, tmqMgmt.timer, &tmq->commitTimer); + } else if (*pTaskType == TMQ_DELAYED_TASK__REPORT) { + } else { + ASSERT(0); + } + } + taosFreeQall(qall); + return 0; +} + void tmqClearUnhandleMsg(tmq_t* tmq) { SMqRspWrapper* msg = NULL; while (1) { @@ -408,13 +466,15 @@ tmq_t* tmq_consumer_new(tmq_conf_t* conf, char* errstr, int32_t errstrLen) { pTmq->status = TMQ_CONSUMER_STATUS__INIT; pTmq->pollCnt = 0; pTmq->epoch = 0; - pTmq->epStatus = 0; - pTmq->epSkipCnt = 0; + /*pTmq->epStatus = 0;*/ + /*pTmq->epSkipCnt = 0;*/ // set conf strcpy(pTmq->clientId, conf->clientId); strcpy(pTmq->groupId, conf->groupId); - pTmq->autoCommit = conf->autoCommit; + /*pTmq->autoCommit = conf->autoCommit;*/ + pTmq->autoCommit = 0; + pTmq->autoCommitInterval = conf->autoCommitInterval; pTmq->commit_cb = conf->commit_cb; pTmq->resetOffsetCfg = conf->resetOffset; @@ -607,6 +667,14 @@ tmq_resp_err_t tmq_subscribe(tmq_t* tmq, const tmq_list_t* topic_list) { taosMsleep(500); } + // init hb timer + tmq->hbTimer = taosTmrStart(tmqAssignDelayedHbTask, 1000, tmq, tmqMgmt.timer); + + // init auto commit timer + if (tmq->autoCommit) { + tmq->commitTimer = taosTmrStart(tmqAssignDelayedCommitTask, tmq->autoCommitInterval, tmq, tmqMgmt.timer); + } + code = 0; FAIL: if (req.topicNames != NULL) taosArrayDestroyP(req.topicNames, taosMemoryFree); @@ -909,7 +977,7 @@ int32_t tmqAskEpCb(void* param, const SDataBuf* pMsg, int32_t code) { } END: - atomic_store_8(&tmq->epStatus, 0); + /*atomic_store_8(&tmq->epStatus, 0);*/ if (pParam->sync) { tsem_post(&pParam->rspSem); } @@ -918,6 +986,7 @@ END: int32_t tmqAskEp(tmq_t* tmq, bool sync) { int32_t code = 0; +#if 0 int8_t epStatus = atomic_val_compare_exchange_8(&tmq->epStatus, 0, 1); if (epStatus == 1) { int32_t epSkipCnt = atomic_add_fetch_32(&tmq->epSkipCnt, 1); @@ -925,11 +994,12 @@ int32_t tmqAskEp(tmq_t* tmq, bool sync) { if (epSkipCnt < 5000) return 0; } atomic_store_32(&tmq->epSkipCnt, 0); +#endif int32_t tlen = sizeof(SMqCMGetSubEpReq); SMqCMGetSubEpReq* req = taosMemoryMalloc(tlen); if (req == NULL) { tscError("failed to malloc get subscribe ep buf"); - atomic_store_8(&tmq->epStatus, 0); + /*atomic_store_8(&tmq->epStatus, 0);*/ return -1; } req->consumerId = htobe64(tmq->consumerId); @@ -940,7 +1010,7 @@ int32_t tmqAskEp(tmq_t* tmq, bool sync) { if (pParam == NULL) { tscError("failed to malloc subscribe param"); taosMemoryFree(req); - atomic_store_8(&tmq->epStatus, 0); + /*atomic_store_8(&tmq->epStatus, 0);*/ return -1; } pParam->tmq = tmq; @@ -952,7 +1022,7 @@ int32_t tmqAskEp(tmq_t* tmq, bool sync) { tsem_destroy(&pParam->rspSem); taosMemoryFree(pParam); taosMemoryFree(req); - atomic_store_8(&tmq->epStatus, 0); + /*atomic_store_8(&tmq->epStatus, 0);*/ return -1; } @@ -1216,7 +1286,7 @@ TAOS_RES* tmq_consumer_poll(tmq_t* tmq, int64_t blocking_time) { } while (1) { - tmqAskEp(tmq, false); + tmqHandleAllDelayedTask(tmq); tmqPollImpl(tmq, blocking_time); /*tsem_wait(&tmq->rspSem);*/ diff --git a/source/common/src/tdatablock.c b/source/common/src/tdatablock.c index 09452b584d..5270bdeb46 100644 --- a/source/common/src/tdatablock.c +++ b/source/common/src/tdatablock.c @@ -79,7 +79,11 @@ int32_t colDataGetLength(const SColumnInfoData* pColumnInfoData, int32_t numOfRo if (IS_VAR_DATA_TYPE(pColumnInfoData->info.type)) { return pColumnInfoData->varmeta.length; } else { - return pColumnInfoData->info.bytes * numOfRows; + if (pColumnInfoData->info.type == TSDB_DATA_TYPE_NULL) { + return 0; + } else { + return pColumnInfoData->info.bytes * numOfRows; + } } } diff --git a/source/dnode/mgmt/exe/dmMain.c b/source/dnode/mgmt/exe/dmMain.c index 5983ba92ac..6d9ead4c87 100644 --- a/source/dnode/mgmt/exe/dmMain.c +++ b/source/dnode/mgmt/exe/dmMain.c @@ -68,6 +68,7 @@ static void dmSetSignalHandle() { static int32_t dmParseArgs(int32_t argc, char const *argv[]) { int32_t cmdEnvIndex = 0; + if (argc < 2) return 0; global.envCmd = taosMemoryMalloc(argc-1); memset(global.envCmd, 0, argc-1); for (int32_t i = 1; i < argc; ++i) { diff --git a/source/dnode/mnode/impl/inc/mndDef.h b/source/dnode/mnode/impl/inc/mndDef.h index cc56db354e..36a8173adf 100644 --- a/source/dnode/mnode/impl/inc/mndDef.h +++ b/source/dnode/mnode/impl/inc/mndDef.h @@ -126,6 +126,8 @@ typedef enum { DND_REASON_OTHERS } EDndReason; +typedef void (*TransCbFp)(SMnode* pMnode, void* param); + typedef struct { int32_t id; ETrnStage stage; @@ -148,6 +150,8 @@ typedef struct { int64_t dbUid; char dbname[TSDB_DB_FNAME_LEN]; char lastError[TSDB_TRANS_ERROR_LEN]; + TransCbFp transCbFp; + void* transCbParam; } STrans; typedef struct { diff --git a/source/dnode/mnode/impl/inc/mndTrans.h b/source/dnode/mnode/impl/inc/mndTrans.h index 5c1b0991be..2fcc82e861 100644 --- a/source/dnode/mnode/impl/inc/mndTrans.h +++ b/source/dnode/mnode/impl/inc/mndTrans.h @@ -44,6 +44,7 @@ int32_t mndTransAppendCommitlog(STrans *pTrans, SSdbRaw *pRaw); int32_t mndTransAppendRedoAction(STrans *pTrans, STransAction *pAction); int32_t mndTransAppendUndoAction(STrans *pTrans, STransAction *pAction); void mndTransSetRpcRsp(STrans *pTrans, void *pCont, int32_t contLen); +void mndTransSetCb(STrans *pTrans, TransCbFp fp, void *param); void mndTransSetDbInfo(STrans *pTrans, SDbObj *pDb); int32_t mndTransPrepare(SMnode *pMnode, STrans *pTrans); diff --git a/source/dnode/mnode/impl/src/mndAcct.c b/source/dnode/mnode/impl/src/mndAcct.c index 25bf20fa0d..cf4c41ee36 100644 --- a/source/dnode/mnode/impl/src/mndAcct.c +++ b/source/dnode/mnode/impl/src/mndAcct.c @@ -17,8 +17,8 @@ #include "mndAcct.h" #include "mndShow.h" -#define TSDB_ACCT_VER_NUMBER 1 -#define TSDB_ACCT_RESERVE_SIZE 128 +#define ACCT_VER_NUMBER 1 +#define ACCT_RESERVE_SIZE 128 static int32_t mndCreateDefaultAcct(SMnode *pMnode); static SSdbRaw *mndAcctActionEncode(SAcctObj *pAcct); @@ -55,6 +55,7 @@ static int32_t mndCreateDefaultAcct(SMnode *pMnode) { acctObj.createdTime = taosGetTimestampMs(); acctObj.updateTime = acctObj.createdTime; acctObj.acctId = 1; + acctObj.status = 0; acctObj.cfg = (SAcctCfg){.maxUsers = INT32_MAX, .maxDbs = INT32_MAX, .maxStbs = INT32_MAX, @@ -79,7 +80,7 @@ static int32_t mndCreateDefaultAcct(SMnode *pMnode) { static SSdbRaw *mndAcctActionEncode(SAcctObj *pAcct) { terrno = TSDB_CODE_OUT_OF_MEMORY; - SSdbRaw *pRaw = sdbAllocRaw(SDB_ACCT, TSDB_ACCT_VER_NUMBER, sizeof(SAcctObj) + TSDB_ACCT_RESERVE_SIZE); + SSdbRaw *pRaw = sdbAllocRaw(SDB_ACCT, ACCT_VER_NUMBER, sizeof(SAcctObj) + ACCT_RESERVE_SIZE); if (pRaw == NULL) goto _OVER; int32_t dataPos = 0; @@ -100,7 +101,7 @@ static SSdbRaw *mndAcctActionEncode(SAcctObj *pAcct) { SDB_SET_INT32(pRaw, dataPos, pAcct->cfg.maxTopics, _OVER) SDB_SET_INT64(pRaw, dataPos, pAcct->cfg.maxStorage, _OVER) SDB_SET_INT32(pRaw, dataPos, pAcct->cfg.accessState, _OVER) - SDB_SET_RESERVE(pRaw, dataPos, TSDB_ACCT_RESERVE_SIZE, _OVER) + SDB_SET_RESERVE(pRaw, dataPos, ACCT_RESERVE_SIZE, _OVER) SDB_SET_DATALEN(pRaw, dataPos, _OVER) terrno = 0; @@ -122,7 +123,7 @@ static SSdbRow *mndAcctActionDecode(SSdbRaw *pRaw) { int8_t sver = 0; if (sdbGetRawSoftVer(pRaw, &sver) != 0) goto _OVER; - if (sver != TSDB_ACCT_VER_NUMBER) { + if (sver != ACCT_VER_NUMBER) { terrno = TSDB_CODE_SDB_INVALID_DATA_VER; goto _OVER; } @@ -151,7 +152,7 @@ static SSdbRow *mndAcctActionDecode(SSdbRaw *pRaw) { SDB_GET_INT32(pRaw, dataPos, &pAcct->cfg.maxTopics, _OVER) SDB_GET_INT64(pRaw, dataPos, &pAcct->cfg.maxStorage, _OVER) SDB_GET_INT32(pRaw, dataPos, &pAcct->cfg.accessState, _OVER) - SDB_GET_RESERVE(pRaw, dataPos, TSDB_ACCT_RESERVE_SIZE, _OVER) + SDB_GET_RESERVE(pRaw, dataPos, ACCT_RESERVE_SIZE, _OVER) terrno = 0; @@ -178,7 +179,6 @@ static int32_t mndAcctActionDelete(SSdb *pSdb, SAcctObj *pAcct) { static int32_t mndAcctActionUpdate(SSdb *pSdb, SAcctObj *pOld, SAcctObj *pNew) { mTrace("acct:%s, perform update action, old row:%p new row:%p", pOld->acct, pOld, pNew); - pOld->updateTime = pNew->updateTime; pOld->status = pNew->status; memcpy(&pOld->cfg, &pNew->cfg, sizeof(SAcctCfg)); @@ -186,19 +186,19 @@ static int32_t mndAcctActionUpdate(SSdb *pSdb, SAcctObj *pOld, SAcctObj *pNew) { } static int32_t mndProcessCreateAcctReq(SNodeMsg *pReq) { - terrno = TSDB_CODE_MND_MSG_NOT_PROCESSED; + terrno = TSDB_CODE_MSG_NOT_PROCESSED; mError("failed to process create acct request since %s", terrstr()); return -1; } static int32_t mndProcessAlterAcctReq(SNodeMsg *pReq) { - terrno = TSDB_CODE_MND_MSG_NOT_PROCESSED; + terrno = TSDB_CODE_MSG_NOT_PROCESSED; mError("failed to process create acct request since %s", terrstr()); return -1; } static int32_t mndProcessDropAcctReq(SNodeMsg *pReq) { - terrno = TSDB_CODE_MND_MSG_NOT_PROCESSED; + terrno = TSDB_CODE_MSG_NOT_PROCESSED; mError("failed to process create acct request since %s", terrstr()); return -1; } \ No newline at end of file diff --git a/source/dnode/mnode/impl/src/mndDb.c b/source/dnode/mnode/impl/src/mndDb.c index b97be87422..15ebcf02db 100644 --- a/source/dnode/mnode/impl/src/mndDb.c +++ b/source/dnode/mnode/impl/src/mndDb.c @@ -620,7 +620,7 @@ static int32_t mndSetAlterDbRedoLogs(SMnode *pMnode, STrans *pTrans, SDbObj *pOl SSdbRaw *pRedoRaw = mndDbActionEncode(pOld); if (pRedoRaw == NULL) return -1; if (mndTransAppendRedolog(pTrans, pRedoRaw) != 0) return -1; - if (sdbSetRawStatus(pRedoRaw, SDB_STATUS_UPDATING) != 0) return -1; + if (sdbSetRawStatus(pRedoRaw, SDB_STATUS_READY) != 0) return -1; return 0; } diff --git a/source/dnode/mnode/impl/src/mndDnode.c b/source/dnode/mnode/impl/src/mndDnode.c index 5fa5182079..3eeec61dbb 100644 --- a/source/dnode/mnode/impl/src/mndDnode.c +++ b/source/dnode/mnode/impl/src/mndDnode.c @@ -363,7 +363,7 @@ static int32_t mndProcessStatusReq(SNodeMsg *pReq) { pDnode->offlineReason = DND_REASON_VERSION_NOT_MATCH; } mError("dnode:%d, status msg version:%d not match cluster:%d", statusReq.dnodeId, statusReq.sver, tsVersion); - terrno = TSDB_CODE_MND_INVALID_MSG_VERSION; + terrno = TSDB_CODE_VERSION_NOT_COMPATIBLE; goto PROCESS_STATUS_MSG_OVER; } diff --git a/source/dnode/mnode/impl/src/mndSync.c b/source/dnode/mnode/impl/src/mndSync.c index 15a70e3311..e09e297cf3 100644 --- a/source/dnode/mnode/impl/src/mndSync.c +++ b/source/dnode/mnode/impl/src/mndSync.c @@ -72,7 +72,7 @@ static int32_t mndRestoreWal(SMnode *pMnode) { } mTrace("wal:%" PRId64 ", will be restored, content:%p", ver, pHead->head.body); - if (sdbWriteNotFree(pSdb, (void *)pHead->head.body) < 0) { + if (sdbWriteWithoutFree(pSdb, (void *)pHead->head.body) < 0) { mError("failed to read wal from sdb since %s, ver:%" PRId64, terrstr(), ver); goto WAL_RESTORE_OVER; } diff --git a/source/dnode/mnode/impl/src/mndTrans.c b/source/dnode/mnode/impl/src/mndTrans.c index 9e9a8b56c9..582b60f7b2 100644 --- a/source/dnode/mnode/impl/src/mndTrans.c +++ b/source/dnode/mnode/impl/src/mndTrans.c @@ -193,9 +193,9 @@ TRANS_ENCODE_OVER: static SSdbRow *mndTransActionDecode(SSdbRaw *pRaw) { terrno = TSDB_CODE_OUT_OF_MEMORY; - SSdbRow * pRow = NULL; - STrans * pTrans = NULL; - char * pData = NULL; + SSdbRow *pRow = NULL; + STrans *pTrans = NULL; + char *pData = NULL; int32_t dataLen = 0; int8_t sver = 0; int32_t redoLogNum = 0; @@ -456,7 +456,7 @@ static int32_t mndTransActionUpdate(SSdb *pSdb, STrans *pOld, STrans *pNew) { } static STrans *mndAcquireTrans(SMnode *pMnode, int32_t transId) { - SSdb * pSdb = pMnode->pSdb; + SSdb *pSdb = pMnode->pSdb; STrans *pTrans = sdbAcquire(pSdb, SDB_TRANS, &transId); if (pTrans == NULL) { terrno = TSDB_CODE_MND_TRANS_NOT_EXIST; @@ -574,6 +574,11 @@ void mndTransSetRpcRsp(STrans *pTrans, void *pCont, int32_t contLen) { pTrans->rpcRspLen = contLen; } +void mndTransSetCb(STrans *pTrans, TransCbFp fp, void *param) { + pTrans->transCbFp = fp; + pTrans->transCbParam = param; +} + void mndTransSetDbInfo(STrans *pTrans, SDbObj *pDb) { pTrans->dbUid = pDb->uid; memcpy(pTrans->dbname, pDb->name, TSDB_DB_FNAME_LEN); @@ -626,7 +631,7 @@ static int32_t mndCheckTransCanBeStartedInParallel(SMnode *pMnode, STrans *pNewT if (mndIsBasicTrans(pNewTrans)) return 0; STrans *pTrans = NULL; - void * pIter = NULL; + void *pIter = NULL; int32_t code = 0; while (1) { @@ -707,6 +712,8 @@ int32_t mndTransPrepare(SMnode *pMnode, STrans *pTrans) { pNew->rpcRefId = pTrans->rpcRefId; pNew->rpcRsp = pTrans->rpcRsp; pNew->rpcRspLen = pTrans->rpcRspLen; + pNew->transCbFp = pTrans->transCbFp; + pNew->transCbParam = pTrans->transCbParam; pTrans->rpcRsp = NULL; pTrans->rpcRspLen = 0; @@ -830,14 +837,14 @@ HANDLE_ACTION_RSP_OVER: } static int32_t mndTransExecuteLogs(SMnode *pMnode, SArray *pArray) { - SSdb * pSdb = pMnode->pSdb; + SSdb *pSdb = pMnode->pSdb; int32_t arraySize = taosArrayGetSize(pArray); if (arraySize == 0) return 0; for (int32_t i = 0; i < arraySize; ++i) { SSdbRaw *pRaw = taosArrayGetP(pArray, i); - int32_t code = sdbWriteNotFree(pSdb, pRaw); + int32_t code = sdbWriteWithoutFree(pSdb, pRaw); if (code != 0) { return code; } @@ -1117,6 +1124,11 @@ static bool mndTransPerfromFinishedStage(SMnode *pMnode, STrans *pTrans) { } mDebug("trans:%d, finished, code:0x%04x, failedTimes:%d", pTrans->id, pTrans->code, pTrans->failedTimes); + + if (pTrans->transCbFp != NULL) { + (*pTrans->transCbFp)(pMnode, pTrans->transCbParam); + } + return continueExec; } @@ -1205,11 +1217,11 @@ static int32_t mndKillTrans(SMnode *pMnode, STrans *pTrans) { } static int32_t mndProcessKillTransReq(SNodeMsg *pReq) { - SMnode * pMnode = pReq->pNode; + SMnode *pMnode = pReq->pNode; SKillTransReq killReq = {0}; int32_t code = -1; - SUserObj * pUser = NULL; - STrans * pTrans = NULL; + SUserObj *pUser = NULL; + STrans *pTrans = NULL; if (tDeserializeSKillTransReq(pReq->rpcMsg.pCont, pReq->rpcMsg.contLen, &killReq) != 0) { terrno = TSDB_CODE_INVALID_MSG; @@ -1249,7 +1261,7 @@ KILL_OVER: void mndTransPullup(SMnode *pMnode) { STrans *pTrans = NULL; - void * pIter = NULL; + void *pIter = NULL; while (1) { pIter = sdbFetch(pMnode->pSdb, SDB_TRANS, pIter, (void **)&pTrans); @@ -1264,11 +1276,11 @@ void mndTransPullup(SMnode *pMnode) { static int32_t mndRetrieveTrans(SNodeMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows) { SMnode *pMnode = pReq->pNode; - SSdb * pSdb = pMnode->pSdb; + SSdb *pSdb = pMnode->pSdb; int32_t numOfRows = 0; STrans *pTrans = NULL; int32_t cols = 0; - char * pWrite; + char *pWrite; while (numOfRows < rows) { pShow->pIter = sdbFetch(pSdb, SDB_TRANS, pShow->pIter, (void **)&pTrans); diff --git a/source/dnode/mnode/impl/src/mnode.c b/source/dnode/mnode/impl/src/mnode.c index 75caef2336..41a309e941 100644 --- a/source/dnode/mnode/impl/src/mnode.c +++ b/source/dnode/mnode/impl/src/mnode.c @@ -368,7 +368,7 @@ int32_t mndProcessMsg(SNodeMsg *pMsg) { } if (isReq && (pRpc->contLen == 0 || pRpc->pCont == NULL)) { - terrno = TSDB_CODE_MND_INVALID_MSG_LEN; + terrno = TSDB_CODE_INVALID_MSG_LEN; mError("msg:%p, failed to process since %s, app:%p", pMsg, terrstr(), ahandle); return -1; } diff --git a/source/dnode/mnode/impl/test/acct/acct.cpp b/source/dnode/mnode/impl/test/acct/acct.cpp index 0260b5f59e..b143594ec0 100644 --- a/source/dnode/mnode/impl/test/acct/acct.cpp +++ b/source/dnode/mnode/impl/test/acct/acct.cpp @@ -32,7 +32,7 @@ TEST_F(MndTestAcct, 01_Create_Acct) { SRpcMsg* pRsp = test.SendReq(TDMT_MND_CREATE_ACCT, pReq, contLen); ASSERT_NE(pRsp, nullptr); - ASSERT_EQ(pRsp->code, TSDB_CODE_MND_MSG_NOT_PROCESSED); + ASSERT_EQ(pRsp->code, TSDB_CODE_MSG_NOT_PROCESSED); } TEST_F(MndTestAcct, 02_Alter_Acct) { @@ -42,7 +42,7 @@ TEST_F(MndTestAcct, 02_Alter_Acct) { SRpcMsg* pRsp = test.SendReq(TDMT_MND_ALTER_ACCT, pReq, contLen); ASSERT_NE(pRsp, nullptr); - ASSERT_EQ(pRsp->code, TSDB_CODE_MND_MSG_NOT_PROCESSED); + ASSERT_EQ(pRsp->code, TSDB_CODE_MSG_NOT_PROCESSED); } TEST_F(MndTestAcct, 03_Drop_Acct) { @@ -52,5 +52,5 @@ TEST_F(MndTestAcct, 03_Drop_Acct) { SRpcMsg* pRsp = test.SendReq(TDMT_MND_DROP_ACCT, pReq, contLen); ASSERT_NE(pRsp, nullptr); - ASSERT_EQ(pRsp->code, TSDB_CODE_MND_MSG_NOT_PROCESSED); + ASSERT_EQ(pRsp->code, TSDB_CODE_MSG_NOT_PROCESSED); } diff --git a/source/dnode/mnode/sdb/src/sdbFile.c b/source/dnode/mnode/sdb/src/sdbFile.c index f61899766e..a0885f2916 100644 --- a/source/dnode/mnode/sdb/src/sdbFile.c +++ b/source/dnode/mnode/sdb/src/sdbFile.c @@ -202,7 +202,7 @@ int32_t sdbReadFile(SSdb *pSdb) { break; } - code = sdbWriteNotFree(pSdb, pRaw); + code = sdbWriteWithoutFree(pSdb, pRaw); if (code != 0) { mError("failed to read file:%s since %s", file, terrstr()); goto PARSE_SDB_DATA_ERROR; @@ -263,7 +263,7 @@ static int32_t sdbWriteFileImp(SSdb *pSdb) { continue; } - sdbPrintOper(pSdb, pRow, "writeFile"); + sdbPrintOper(pSdb, pRow, "write"); SSdbRaw *pRaw = (*encodeFp)(pRow->pObj); if (pRaw != NULL) { diff --git a/source/dnode/mnode/sdb/src/sdbHash.c b/source/dnode/mnode/sdb/src/sdbHash.c index dc2c12a2c4..1158bedc21 100644 --- a/source/dnode/mnode/sdb/src/sdbHash.c +++ b/source/dnode/mnode/sdb/src/sdbHash.c @@ -51,7 +51,9 @@ const char *sdbTableName(ESdbType type) { case SDB_TOPIC: return "topic"; case SDB_VGROUP: - return "vgId"; + return "vgroup"; + case SDB_SMA: + return "sma"; case SDB_STB: return "stb"; case SDB_DB: @@ -86,13 +88,13 @@ void sdbPrintOper(SSdb *pSdb, SSdbRow *pRow, const char *oper) { EKeyType keyType = pSdb->keyTypes[pRow->type]; if (keyType == SDB_KEY_BINARY) { - mTrace("%s:%s, refCount:%d oper:%s row:%p status:%s", sdbTableName(pRow->type), (char *)pRow->pObj, pRow->refCount, - oper, pRow->pObj, sdbStatusStr(pRow->status)); + mTrace("%s:%s, ref:%d oper:%s row:%p status:%s", sdbTableName(pRow->type), (char *)pRow->pObj, pRow->refCount, oper, + pRow->pObj, sdbStatusStr(pRow->status)); } else if (keyType == SDB_KEY_INT32) { - mTrace("%s:%d, refCount:%d oper:%s row:%p status:%s", sdbTableName(pRow->type), *(int32_t *)pRow->pObj, - pRow->refCount, oper, pRow->pObj, sdbStatusStr(pRow->status)); + mTrace("%s:%d, ref:%d oper:%s row:%p status:%s", sdbTableName(pRow->type), *(int32_t *)pRow->pObj, pRow->refCount, + oper, pRow->pObj, sdbStatusStr(pRow->status)); } else if (keyType == SDB_KEY_INT64) { - mTrace("%s:%" PRId64 ", refCount:%d oper:%s row:%p status:%s", sdbTableName(pRow->type), *(int64_t *)pRow->pObj, + mTrace("%s:%" PRId64 ", ref:%d oper:%s row:%p status:%s", sdbTableName(pRow->type), *(int64_t *)pRow->pObj, pRow->refCount, oper, pRow->pObj, sdbStatusStr(pRow->status)); } else { } @@ -142,7 +144,7 @@ static int32_t sdbInsertRow(SSdb *pSdb, SHashObj *hash, SSdbRaw *pRaw, SSdbRow * pRow->refCount = 0; pRow->status = pRaw->status; - sdbPrintOper(pSdb, pRow, "insertRow"); + sdbPrintOper(pSdb, pRow, "insert"); if (taosHashPut(hash, pRow->pObj, keySize, &pRow, sizeof(void *)) != 0) { taosWUnLockLatch(pLock); @@ -191,7 +193,7 @@ static int32_t sdbUpdateRow(SSdb *pSdb, SHashObj *hash, SSdbRaw *pRaw, SSdbRow * SSdbRow *pOldRow = *ppOldRow; pOldRow->status = pRaw->status; - sdbPrintOper(pSdb, pOldRow, "updateRow"); + sdbPrintOper(pSdb, pOldRow, "update"); taosRUnLockLatch(pLock); int32_t code = 0; @@ -220,7 +222,7 @@ static int32_t sdbDeleteRow(SSdb *pSdb, SHashObj *hash, SSdbRaw *pRaw, SSdbRow * SSdbRow *pOldRow = *ppOldRow; pOldRow->status = pRaw->status; - sdbPrintOper(pSdb, pOldRow, "deleteRow"); + sdbPrintOper(pSdb, pOldRow, "delete"); taosHashRemove(hash, pOldRow->pObj, keySize); taosWUnLockLatch(pLock); @@ -233,7 +235,7 @@ static int32_t sdbDeleteRow(SSdb *pSdb, SHashObj *hash, SSdbRaw *pRaw, SSdbRow * return 0; } -int32_t sdbWriteNotFree(SSdb *pSdb, SSdbRaw *pRaw) { +int32_t sdbWriteWithoutFree(SSdb *pSdb, SSdbRaw *pRaw) { SHashObj *hash = sdbGetHash(pSdb, pRaw->type); if (hash == NULL) return terrno; @@ -266,7 +268,7 @@ int32_t sdbWriteNotFree(SSdb *pSdb, SSdbRaw *pRaw) { } int32_t sdbWrite(SSdb *pSdb, SSdbRaw *pRaw) { - int32_t code = sdbWriteNotFree(pSdb, pRaw); + int32_t code = sdbWriteWithoutFree(pSdb, pRaw); sdbFreeRaw(pRaw); return code; } @@ -296,7 +298,7 @@ void *sdbAcquire(SSdb *pSdb, ESdbType type, const void *pKey) { case SDB_STATUS_UPDATING: atomic_add_fetch_32(&pRow->refCount, 1); pRet = pRow->pObj; - sdbPrintOper(pSdb, pRow, "acquireRow"); + sdbPrintOper(pSdb, pRow, "acquire"); break; case SDB_STATUS_CREATING: terrno = TSDB_CODE_SDB_OBJ_CREATING; @@ -318,7 +320,7 @@ static void sdbCheck(SSdb *pSdb, SSdbRow *pRow) { taosRLockLatch(pLock); int32_t ref = atomic_load_32(&pRow->refCount); - sdbPrintOper(pSdb, pRow, "checkRow"); + sdbPrintOper(pSdb, pRow, "check"); if (ref <= 0 && pRow->status == SDB_STATUS_DROPPED) { sdbFreeRow(pSdb, pRow); } @@ -330,13 +332,13 @@ void sdbRelease(SSdb *pSdb, void *pObj) { if (pObj == NULL) return; SSdbRow *pRow = (SSdbRow *)((char *)pObj - sizeof(SSdbRow)); - if (pRow->type >= SDB_MAX ) return; + if (pRow->type >= SDB_MAX) return; SRWLatch *pLock = &pSdb->locks[pRow->type]; taosRLockLatch(pLock); int32_t ref = atomic_sub_fetch_32(&pRow->refCount, 1); - sdbPrintOper(pSdb, pRow, "releaseRow"); + sdbPrintOper(pSdb, pRow, "release"); if (ref <= 0 && pRow->status == SDB_STATUS_DROPPED) { sdbFreeRow(pSdb, pRow); } @@ -372,7 +374,7 @@ void *sdbFetch(SSdb *pSdb, ESdbType type, void *pIter, void **ppObj) { } atomic_add_fetch_32(&pRow->refCount, 1); - sdbPrintOper(pSdb, pRow, "fetchRow"); + sdbPrintOper(pSdb, pRow, "fetch"); *ppObj = pRow->pObj; break; } diff --git a/source/dnode/mnode/sdb/src/sdbRaw.c b/source/dnode/mnode/sdb/src/sdbRaw.c index c3aaf562be..326fe53fc7 100644 --- a/source/dnode/mnode/sdb/src/sdbRaw.c +++ b/source/dnode/mnode/sdb/src/sdbRaw.c @@ -27,7 +27,7 @@ SSdbRaw *sdbAllocRaw(ESdbType type, int8_t sver, int32_t dataLen) { pRaw->sver = sver; pRaw->dataLen = dataLen; - mTrace("raw:%p, is created, len:%d", pRaw, dataLen); + mTrace("raw:%p, is created, len:%d table:%s", pRaw, dataLen, sdbTableName(type)); return pRaw; } diff --git a/source/dnode/mnode/sdb/src/sdbRow.c b/source/dnode/mnode/sdb/src/sdbRow.c index ac86a72155..94f87cb350 100644 --- a/source/dnode/mnode/sdb/src/sdbRow.c +++ b/source/dnode/mnode/sdb/src/sdbRow.c @@ -43,7 +43,7 @@ void sdbFreeRow(SSdb *pSdb, SSdbRow *pRow) { (*deleteFp)(pSdb, pRow->pObj); } - sdbPrintOper(pSdb, pRow, "freeRow"); + sdbPrintOper(pSdb, pRow, "free"); mTrace("row:%p, is freed", pRow->pObj); taosMemoryFreeClear(pRow); diff --git a/source/dnode/vnode/inc/vnode.h b/source/dnode/vnode/inc/vnode.h index 162b5c7c3a..8984b3d8ae 100644 --- a/source/dnode/vnode/inc/vnode.h +++ b/source/dnode/vnode/inc/vnode.h @@ -91,16 +91,15 @@ int metaTbCursorNext(SMTbCursor *pTbCur); // tsdb typedef struct STsdb STsdb; -typedef struct STsdbQueryCond STsdbQueryCond; typedef void *tsdbReaderT; #define BLOCK_LOAD_OFFSET_SEQ_ORDER 1 #define BLOCK_LOAD_TABLE_SEQ_ORDER 2 #define BLOCK_LOAD_TABLE_RR_ORDER 3 -tsdbReaderT *tsdbQueryTables(STsdb *tsdb, STsdbQueryCond *pCond, STableGroupInfo *tableInfoGroup, uint64_t qId, +tsdbReaderT *tsdbQueryTables(STsdb *tsdb, SQueryTableDataCond *pCond, STableGroupInfo *tableInfoGroup, uint64_t qId, uint64_t taskId); -tsdbReaderT tsdbQueryCacheLast(STsdb *tsdb, STsdbQueryCond *pCond, STableGroupInfo *groupList, uint64_t qId, +tsdbReaderT tsdbQueryCacheLast(STsdb *tsdb, SQueryTableDataCond *pCond, STableGroupInfo *groupList, uint64_t qId, void *pMemRef); int32_t tsdbGetFileBlocksDistInfo(tsdbReaderT *pReader, STableBlockDistInfo *pTableBlockInfo); bool isTsdbCacheLastRow(tsdbReaderT *pReader); @@ -112,6 +111,7 @@ bool tsdbNextDataBlock(tsdbReaderT pTsdbReadHandle); void tsdbRetrieveDataBlockInfo(tsdbReaderT *pTsdbReadHandle, SDataBlockInfo *pBlockInfo); int32_t tsdbRetrieveDataBlockStatisInfo(tsdbReaderT *pTsdbReadHandle, SColumnDataAgg **pBlockStatis); SArray *tsdbRetrieveDataBlock(tsdbReaderT *pTsdbReadHandle, SArray *pColumnIdList); +void tsdbResetReadHandle(tsdbReaderT queryHandle, SQueryTableDataCond* pCond); void tsdbDestroyTableGroup(STableGroupInfo *pGroupList); int32_t tsdbGetOneTableGroup(void *pMeta, uint64_t uid, TSKEY startKey, STableGroupInfo *pGroupInfo); int32_t tsdbGetTableGroupFromIdList(STsdb *tsdb, SArray *pTableIdList, STableGroupInfo *pGroupInfo); @@ -172,15 +172,6 @@ struct SVnodeCfg { int8_t hashMethod; }; -struct STsdbQueryCond { - STimeWindow twindow; - int32_t order; // desc|asc order to iterate the data block - int32_t numOfCols; - SColumnInfo *colList; - bool loadExternalRows; // load external rows or not - int32_t type; // data block load type: -}; - typedef struct { TSKEY lastKey; uint64_t uid; diff --git a/source/dnode/vnode/src/tsdb/tsdbRead.c b/source/dnode/vnode/src/tsdb/tsdbRead.c index de0dbe7167..2c8819f60b 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead.c @@ -254,7 +254,7 @@ static SArray* createCheckInfoFromTableGroup(STsdbReadHandle* pTsdbReadHandle, S assert(info.lastKey >= pTsdbReadHandle->window.skey && info.lastKey <= pTsdbReadHandle->window.ekey); } else { - assert(info.lastKey >= pTsdbReadHandle->window.ekey && info.lastKey <= pTsdbReadHandle->window.skey); + info.lastKey = pTsdbReadHandle->window.skey; } taosArrayPush(pTableCheckInfo, &info); @@ -317,7 +317,7 @@ static int64_t getEarliestValidTimestamp(STsdb* pTsdb) { return now - (tsTickPerDay[pCfg->precision] * pCfg->keep2) + 1; // needs to add one tick } -static void setQueryTimewindow(STsdbReadHandle* pTsdbReadHandle, STsdbQueryCond* pCond) { +static void setQueryTimewindow(STsdbReadHandle* pTsdbReadHandle, SQueryTableDataCond* pCond) { pTsdbReadHandle->window = pCond->twindow; bool updateTs = false; @@ -343,7 +343,7 @@ static void setQueryTimewindow(STsdbReadHandle* pTsdbReadHandle, STsdbQueryCond* } } -static STsdbReadHandle* tsdbQueryTablesImpl(STsdb* tsdb, STsdbQueryCond* pCond, uint64_t qId, uint64_t taskId) { +static STsdbReadHandle* tsdbQueryTablesImpl(STsdb* tsdb, SQueryTableDataCond* pCond, uint64_t qId, uint64_t taskId) { STsdbReadHandle* pReadHandle = taosMemoryCalloc(1, sizeof(STsdbReadHandle)); if (pReadHandle == NULL) { goto _end; @@ -422,7 +422,7 @@ _end: return NULL; } -tsdbReaderT* tsdbQueryTables(STsdb* tsdb, STsdbQueryCond* pCond, STableGroupInfo* groupList, uint64_t qId, +tsdbReaderT* tsdbQueryTables(STsdb* tsdb, SQueryTableDataCond* pCond, STableGroupInfo* groupList, uint64_t qId, uint64_t taskId) { STsdbReadHandle* pTsdbReadHandle = tsdbQueryTablesImpl(tsdb, pCond, qId, taskId); if (pTsdbReadHandle == NULL) { @@ -448,7 +448,7 @@ tsdbReaderT* tsdbQueryTables(STsdb* tsdb, STsdbQueryCond* pCond, STableGroupInfo return (tsdbReaderT)pTsdbReadHandle; } -void tsdbResetQueryHandle(tsdbReaderT queryHandle, STsdbQueryCond* pCond) { +void tsdbResetReadHandle(tsdbReaderT queryHandle, SQueryTableDataCond* pCond) { STsdbReadHandle* pTsdbReadHandle = queryHandle; if (emptyQueryTimewindow(pTsdbReadHandle)) { @@ -485,7 +485,7 @@ void tsdbResetQueryHandle(tsdbReaderT queryHandle, STsdbQueryCond* pCond) { resetCheckInfo(pTsdbReadHandle); } -void tsdbResetQueryHandleForNewTable(tsdbReaderT queryHandle, STsdbQueryCond* pCond, STableGroupInfo* groupList) { +void tsdbResetQueryHandleForNewTable(tsdbReaderT queryHandle, SQueryTableDataCond* pCond, STableGroupInfo* groupList) { STsdbReadHandle* pTsdbReadHandle = queryHandle; pTsdbReadHandle->order = pCond->order; @@ -526,7 +526,7 @@ void tsdbResetQueryHandleForNewTable(tsdbReaderT queryHandle, STsdbQueryCond* pC // pTsdbReadHandle->next = doFreeColumnInfoData(pTsdbReadHandle->next); } -tsdbReaderT tsdbQueryLastRow(STsdb* tsdb, STsdbQueryCond* pCond, STableGroupInfo* groupList, uint64_t qId, +tsdbReaderT tsdbQueryLastRow(STsdb* tsdb, SQueryTableDataCond* pCond, STableGroupInfo* groupList, uint64_t qId, uint64_t taskId) { pCond->twindow = updateLastrowForEachGroup(groupList); @@ -555,7 +555,7 @@ tsdbReaderT tsdbQueryLastRow(STsdb* tsdb, STsdbQueryCond* pCond, STableGroupInfo } #if 0 -tsdbReaderT tsdbQueryCacheLast(STsdb *tsdb, STsdbQueryCond *pCond, STableGroupInfo *groupList, uint64_t qId, STsdbMemTable* pMemRef) { +tsdbReaderT tsdbQueryCacheLast(STsdb *tsdb, SQueryTableDataCond *pCond, STableGroupInfo *groupList, uint64_t qId, STsdbMemTable* pMemRef) { STsdbReadHandle *pTsdbReadHandle = (STsdbReadHandle*) tsdbQueryTables(tsdb, pCond, groupList, qId, pMemRef); if (pTsdbReadHandle == NULL) { return NULL; @@ -618,8 +618,8 @@ static STableGroupInfo* trimTableGroup(STimeWindow* window, STableGroupInfo* pGr return pNew; } -tsdbReaderT tsdbQueryRowsInExternalWindow(STsdb* tsdb, STsdbQueryCond* pCond, STableGroupInfo* groupList, uint64_t qId, - uint64_t taskId) { +tsdbReaderT tsdbQueryRowsInExternalWindow(STsdb* tsdb, SQueryTableDataCond* pCond, STableGroupInfo* groupList, + uint64_t qId, uint64_t taskId) { STableGroupInfo* pNew = trimTableGroup(&pCond->twindow, groupList); if (pNew->numOfTables == 0) { @@ -1185,10 +1185,12 @@ static int32_t handleDataMergeIfNeeded(STsdbReadHandle* pTsdbReadHandle, SBlock* tsdbDebug("%p no data in mem, %s", pTsdbReadHandle, pTsdbReadHandle->idStr); } - if ((ASCENDING_TRAVERSE(pTsdbReadHandle->order) && (key != TSKEY_INITIAL_VAL && key <= binfo.window.ekey)) || - (!ASCENDING_TRAVERSE(pTsdbReadHandle->order) && (key != TSKEY_INITIAL_VAL && key >= binfo.window.skey))) { - if ((ASCENDING_TRAVERSE(pTsdbReadHandle->order) && (key != TSKEY_INITIAL_VAL && key < binfo.window.skey)) || - (!ASCENDING_TRAVERSE(pTsdbReadHandle->order) && (key != TSKEY_INITIAL_VAL && key > binfo.window.ekey))) { + bool ascScan = ASCENDING_TRAVERSE(pTsdbReadHandle->order); + + if ((ascScan && (key != TSKEY_INITIAL_VAL && key <= binfo.window.ekey)) || + (!ascScan && (key != TSKEY_INITIAL_VAL && key >= binfo.window.skey))) { + if ((ascScan && (key != TSKEY_INITIAL_VAL && key < binfo.window.skey)) || + (!ascScan && (key != TSKEY_INITIAL_VAL && key > binfo.window.ekey))) { // do not load file block into buffer int32_t step = ASCENDING_TRAVERSE(pTsdbReadHandle->order) ? 1 : -1; @@ -1225,8 +1227,8 @@ static int32_t handleDataMergeIfNeeded(STsdbReadHandle* pTsdbReadHandle, SBlock* assert(pTsdbReadHandle->outputCapacity >= binfo.rows); int32_t endPos = getEndPosInDataBlock(pTsdbReadHandle, &binfo); - if ((cur->pos == 0 && endPos == binfo.rows - 1 && ASCENDING_TRAVERSE(pTsdbReadHandle->order)) || - (cur->pos == (binfo.rows - 1) && endPos == 0 && (!ASCENDING_TRAVERSE(pTsdbReadHandle->order)))) { + if ((cur->pos == 0 && endPos == binfo.rows - 1 && ascScan) || + (cur->pos == (binfo.rows - 1) && endPos == 0 && (!ascScan))) { pTsdbReadHandle->realNumOfRows = binfo.rows; cur->rows = binfo.rows; @@ -1234,7 +1236,7 @@ static int32_t handleDataMergeIfNeeded(STsdbReadHandle* pTsdbReadHandle, SBlock* cur->mixBlock = false; cur->blockCompleted = true; - if (ASCENDING_TRAVERSE(pTsdbReadHandle->order)) { + if (ascScan) { cur->lastKey = binfo.window.ekey + 1; cur->pos = binfo.rows; } else { @@ -1382,8 +1384,6 @@ static int doBinarySearchKey(char* pValue, int num, TSKEY key, int order) { static int32_t doCopyRowsFromFileBlock(STsdbReadHandle* pTsdbReadHandle, int32_t capacity, int32_t numOfRows, int32_t start, int32_t end) { - int32_t step = ASCENDING_TRAVERSE(pTsdbReadHandle->order) ? 1 : -1; - SDataCols* pCols = pTsdbReadHandle->rhelper.pDCols[0]; TSKEY* tsArray = pCols->cols[0].pData; @@ -1394,6 +1394,11 @@ static int32_t doCopyRowsFromFileBlock(STsdbReadHandle* pTsdbReadHandle, int32_t return numOfRows; } + bool ascScan = ASCENDING_TRAVERSE(pTsdbReadHandle->order); + int32_t trueStart = ascScan ? start : end; + int32_t trueEnd = ascScan ? end : start; + int32_t step = ascScan ? 1 : -1; + int32_t requiredNumOfCols = (int32_t)taosArrayGetSize(pTsdbReadHandle->pColumns); // data in buffer has greater timestamp, copy data in file block @@ -1411,7 +1416,7 @@ static int32_t doCopyRowsFromFileBlock(STsdbReadHandle* pTsdbReadHandle, int32_t if (!IS_VAR_DATA_TYPE(pColInfo->info.type)) { // todo opt performance // memmove(pData, (char*)src->pData + bytes * start, bytes * num); int32_t rowIndex = numOfRows; - for (int32_t k = start; k <= end; ++k, ++rowIndex) { + for (int32_t k = trueStart; ((ascScan && k <= trueEnd) || (!ascScan && k >= trueEnd)); k += step, ++rowIndex) { SCellVal sVal = {0}; if (tdGetColDataOfRow(&sVal, src, k, pCols->bitmapMode) < 0) { TASSERT(0); @@ -1427,7 +1432,7 @@ static int32_t doCopyRowsFromFileBlock(STsdbReadHandle* pTsdbReadHandle, int32_t int32_t rowIndex = numOfRows; // todo refactor, only copy one-by-one - for (int32_t k = start; k < num + start; ++k, ++rowIndex) { + for (int32_t k = trueStart; ((ascScan && k <= trueEnd) || (!ascScan && k >= trueEnd)); k += step, ++rowIndex) { SCellVal sVal = {0}; if (tdGetColDataOfRow(&sVal, src, k, pCols->bitmapMode) < 0) { TASSERT(0); @@ -1444,27 +1449,19 @@ static int32_t doCopyRowsFromFileBlock(STsdbReadHandle* pTsdbReadHandle, int32_t j++; i++; } else { // pColInfo->info.colId < src->colId, it is a NULL data - int32_t rowIndex = numOfRows; - for (int32_t k = start; k < num + start; ++k, ++rowIndex) { // TODO opt performance - colDataAppend(pColInfo, rowIndex, NULL, true); - } + colDataAppendNNULL(pColInfo, numOfRows, num); i++; } } while (i < requiredNumOfCols) { // the remain columns are all null data SColumnInfoData* pColInfo = taosArrayGet(pTsdbReadHandle->pColumns, i); - int32_t rowIndex = numOfRows; - - for (int32_t k = start; k < num + start; ++k, ++rowIndex) { - colDataAppend(pColInfo, rowIndex, NULL, - true); // TODO add a fast version to set a number of consecutive NULL value. - } + colDataAppendNNULL(pColInfo, numOfRows, num); i++; } - pTsdbReadHandle->cur.win.ekey = tsArray[end]; - pTsdbReadHandle->cur.lastKey = tsArray[end] + step; + pTsdbReadHandle->cur.win.ekey = tsArray[trueEnd]; + pTsdbReadHandle->cur.lastKey = tsArray[trueEnd] + step; return numOfRows + num; } @@ -2968,7 +2965,7 @@ bool tsdbNextDataBlock(tsdbReaderT pHandle) { // } // // // load the previous row -// STsdbQueryCond cond = {.numOfCols = numOfCols, .loadExternalRows = false, .type = BLOCK_LOAD_OFFSET_SEQ_ORDER}; +// SQueryTableDataCond cond = {.numOfCols = numOfCols, .loadExternalRows = false, .type = BLOCK_LOAD_OFFSET_SEQ_ORDER}; // if (type == TSDB_PREV_ROW) { // cond.order = TSDB_ORDER_DESC; // cond.twindow = (STimeWindow){pTsdbReadHandle->window.skey, INT64_MIN}; @@ -3225,7 +3222,7 @@ void tsdbRetrieveDataBlockInfo(tsdbReaderT* pTsdbReadHandle, SDataBlockInfo* pDa tsdbDebug("data block generated, uid:%" PRIu64 " numOfRows:%d, tsrange:%" PRId64 " - %" PRId64 " %s", uid, cur->rows, cur->win.skey, cur->win.ekey, pHandle->idStr); - pDataBlockInfo->uid = uid; + pDataBlockInfo->uid = uid; #if 0 // for multi-group data query processing test purpose @@ -3332,21 +3329,7 @@ SArray* tsdbRetrieveDataBlock(tsdbReaderT* pTsdbReadHandle, SArray* pIdList) { return NULL; } - // todo refactor int32_t numOfRows = doCopyRowsFromFileBlock(pHandle, pHandle->outputCapacity, 0, 0, pBlock->numOfRows - 1); - - // if the buffer is not full in case of descending order query, move the data in the front of the buffer - if (!ASCENDING_TRAVERSE(pHandle->order) && numOfRows < pHandle->outputCapacity) { - int32_t emptySize = pHandle->outputCapacity - numOfRows; - int32_t reqNumOfCols = (int32_t)taosArrayGetSize(pHandle->pColumns); - - for (int32_t i = 0; i < reqNumOfCols; ++i) { - SColumnInfoData* pColInfo = taosArrayGet(pHandle->pColumns, i); - memmove((char*)pColInfo->pData, (char*)pColInfo->pData + emptySize * pColInfo->info.bytes, - numOfRows * pColInfo->info.bytes); - } - } - return pHandle->pColumns; } } diff --git a/source/dnode/vnode/src/vnd/vnodeSvr.c b/source/dnode/vnode/src/vnd/vnodeSvr.c index 1d9885fe63..61cedc6b50 100644 --- a/source/dnode/vnode/src/vnd/vnodeSvr.c +++ b/source/dnode/vnode/src/vnd/vnodeSvr.c @@ -201,84 +201,89 @@ void smaHandleRes(void *pVnode, int64_t smaId, const SArray *data) { // sync integration int vnodeProcessSyncReq(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) { - SSyncNode *pSyncNode = syncNodeAcquire(pVnode->sync); - assert(pSyncNode != NULL); - ESyncState state = syncGetMyRole(pVnode->sync); - SyncTerm currentTerm = syncGetMyTerm(pVnode->sync); + if (syncEnvIsStart()) { - SMsgHead *pHead = pMsg->pCont; + SSyncNode *pSyncNode = syncNodeAcquire(pVnode->sync); + assert(pSyncNode != NULL); - char logBuf[512]; - char *syncNodeStr = sync2SimpleStr(pVnode->sync); - snprintf(logBuf, sizeof(logBuf), "==vnodeProcessSyncReq== msgType:%d, syncNode: %s", pMsg->msgType, syncNodeStr); - syncRpcMsgLog2(logBuf, pMsg); - taosMemoryFree(syncNodeStr); + ESyncState state = syncGetMyRole(pVnode->sync); + SyncTerm currentTerm = syncGetMyTerm(pVnode->sync); - SRpcMsg *pRpcMsg = pMsg; + SMsgHead *pHead = pMsg->pCont; - if (pRpcMsg->msgType == TDMT_VND_SYNC_TIMEOUT) { - SyncTimeout *pSyncMsg = syncTimeoutFromRpcMsg2(pRpcMsg); - assert(pSyncMsg != NULL); + char logBuf[512]; + char *syncNodeStr = sync2SimpleStr(pVnode->sync); + snprintf(logBuf, sizeof(logBuf), "==vnodeProcessSyncReq== msgType:%d, syncNode: %s", pMsg->msgType, syncNodeStr); + syncRpcMsgLog2(logBuf, pMsg); + taosMemoryFree(syncNodeStr); - syncNodeOnTimeoutCb(pSyncNode, pSyncMsg); - syncTimeoutDestroy(pSyncMsg); + SRpcMsg *pRpcMsg = pMsg; - } else if (pRpcMsg->msgType == TDMT_VND_SYNC_PING) { - SyncPing *pSyncMsg = syncPingFromRpcMsg2(pRpcMsg); - assert(pSyncMsg != NULL); + if (pRpcMsg->msgType == TDMT_VND_SYNC_TIMEOUT) { + SyncTimeout *pSyncMsg = syncTimeoutFromRpcMsg2(pRpcMsg); + assert(pSyncMsg != NULL); - syncNodeOnPingCb(pSyncNode, pSyncMsg); - syncPingDestroy(pSyncMsg); + syncNodeOnTimeoutCb(pSyncNode, pSyncMsg); + syncTimeoutDestroy(pSyncMsg); - } else if (pRpcMsg->msgType == TDMT_VND_SYNC_PING_REPLY) { - SyncPingReply *pSyncMsg = syncPingReplyFromRpcMsg2(pRpcMsg); - assert(pSyncMsg != NULL); + } else if (pRpcMsg->msgType == TDMT_VND_SYNC_PING) { + SyncPing *pSyncMsg = syncPingFromRpcMsg2(pRpcMsg); + assert(pSyncMsg != NULL); - syncNodeOnPingReplyCb(pSyncNode, pSyncMsg); - syncPingReplyDestroy(pSyncMsg); + syncNodeOnPingCb(pSyncNode, pSyncMsg); + syncPingDestroy(pSyncMsg); - } else if (pRpcMsg->msgType == TDMT_VND_SYNC_CLIENT_REQUEST) { - SyncClientRequest *pSyncMsg = syncClientRequestFromRpcMsg2(pRpcMsg); - assert(pSyncMsg != NULL); + } else if (pRpcMsg->msgType == TDMT_VND_SYNC_PING_REPLY) { + SyncPingReply *pSyncMsg = syncPingReplyFromRpcMsg2(pRpcMsg); + assert(pSyncMsg != NULL); - syncNodeOnClientRequestCb(pSyncNode, pSyncMsg); - syncClientRequestDestroy(pSyncMsg); + syncNodeOnPingReplyCb(pSyncNode, pSyncMsg); + syncPingReplyDestroy(pSyncMsg); - } else if (pRpcMsg->msgType == TDMT_VND_SYNC_REQUEST_VOTE) { - SyncRequestVote *pSyncMsg = syncRequestVoteFromRpcMsg2(pRpcMsg); - assert(pSyncMsg != NULL); + } else if (pRpcMsg->msgType == TDMT_VND_SYNC_CLIENT_REQUEST) { + SyncClientRequest *pSyncMsg = syncClientRequestFromRpcMsg2(pRpcMsg); + assert(pSyncMsg != NULL); - syncNodeOnRequestVoteCb(pSyncNode, pSyncMsg); - syncRequestVoteDestroy(pSyncMsg); + syncNodeOnClientRequestCb(pSyncNode, pSyncMsg); + syncClientRequestDestroy(pSyncMsg); - } else if (pRpcMsg->msgType == TDMT_VND_SYNC_REQUEST_VOTE_REPLY) { - SyncRequestVoteReply *pSyncMsg = syncRequestVoteReplyFromRpcMsg2(pRpcMsg); - assert(pSyncMsg != NULL); + } else if (pRpcMsg->msgType == TDMT_VND_SYNC_REQUEST_VOTE) { + SyncRequestVote *pSyncMsg = syncRequestVoteFromRpcMsg2(pRpcMsg); + assert(pSyncMsg != NULL); - syncNodeOnRequestVoteReplyCb(pSyncNode, pSyncMsg); - syncRequestVoteReplyDestroy(pSyncMsg); + syncNodeOnRequestVoteCb(pSyncNode, pSyncMsg); + syncRequestVoteDestroy(pSyncMsg); - } else if (pRpcMsg->msgType == TDMT_VND_SYNC_APPEND_ENTRIES) { - SyncAppendEntries *pSyncMsg = syncAppendEntriesFromRpcMsg2(pRpcMsg); - assert(pSyncMsg != NULL); + } else if (pRpcMsg->msgType == TDMT_VND_SYNC_REQUEST_VOTE_REPLY) { + SyncRequestVoteReply *pSyncMsg = syncRequestVoteReplyFromRpcMsg2(pRpcMsg); + assert(pSyncMsg != NULL); - syncNodeOnAppendEntriesCb(pSyncNode, pSyncMsg); - syncAppendEntriesDestroy(pSyncMsg); + syncNodeOnRequestVoteReplyCb(pSyncNode, pSyncMsg); + syncRequestVoteReplyDestroy(pSyncMsg); - } else if (pRpcMsg->msgType == TDMT_VND_SYNC_APPEND_ENTRIES_REPLY) { - SyncAppendEntriesReply *pSyncMsg = syncAppendEntriesReplyFromRpcMsg2(pRpcMsg); - assert(pSyncMsg != NULL); + } else if (pRpcMsg->msgType == TDMT_VND_SYNC_APPEND_ENTRIES) { + SyncAppendEntries *pSyncMsg = syncAppendEntriesFromRpcMsg2(pRpcMsg); + assert(pSyncMsg != NULL); - syncNodeOnAppendEntriesReplyCb(pSyncNode, pSyncMsg); - syncAppendEntriesReplyDestroy(pSyncMsg); + syncNodeOnAppendEntriesCb(pSyncNode, pSyncMsg); + syncAppendEntriesDestroy(pSyncMsg); + } else if (pRpcMsg->msgType == TDMT_VND_SYNC_APPEND_ENTRIES_REPLY) { + SyncAppendEntriesReply *pSyncMsg = syncAppendEntriesReplyFromRpcMsg2(pRpcMsg); + assert(pSyncMsg != NULL); + + syncNodeOnAppendEntriesReplyCb(pSyncNode, pSyncMsg); + syncAppendEntriesReplyDestroy(pSyncMsg); + + } else { + vError("==vnodeProcessSyncReq== error msg type:%d", pRpcMsg->msgType); + } + + syncNodeRelease(pSyncNode); } else { - vError("==vnodeProcessSyncReq== error msg type:%d", pRpcMsg->msgType); + vError("==vnodeProcessSyncReq== error syncEnv stop"); } - - syncNodeRelease(pSyncNode); - return 0; } diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h index 4163da398e..00ec92465f 100644 --- a/source/libs/executor/inc/executorimpl.h +++ b/source/libs/executor/inc/executorimpl.h @@ -322,27 +322,32 @@ typedef struct SColMatchInfo { bool output; } SColMatchInfo; +typedef struct SScanInfo { + int32_t numOfAsc; + int32_t numOfDesc; +} SScanInfo; + typedef struct STableScanInfo { void* dataReader; + int32_t numOfBlocks; // extract basic running information. int32_t numOfSkipped; int32_t numOfBlockStatis; int64_t numOfRows; - int32_t order; // scan order - int32_t times; // repeat counts + int64_t elapsedTime; + int32_t prevGroupId; // previous table group id + SScanInfo scanInfo; int32_t current; - int32_t reverseTimes; // 0 by default - SNode* pFilterNode; // filter operator info - SqlFunctionCtx* pCtx; // next operator query context + SNode* pFilterNode; // filter operator info + SqlFunctionCtx* pCtx; // next operator query context SResultRowInfo* pResultRowInfo; int32_t* rowCellInfoOffset; SExprInfo* pExpr; SSDataBlock* pResBlock; SArray* pColMatchInfo; int32_t numOfOutput; - int64_t elapsedTime; - int32_t prevGroupId; // previous table group id + SQueryTableDataCond cond; int32_t scanFlag; // table scan flag to denote if it is a repeat/reverse/main scan int32_t dataBlockLoadFlag; double sampleRatio; // data block sample ratio, 1 by default @@ -623,6 +628,7 @@ int32_t setSDataBlockFromFetchRsp(SSDataBlock* pRes, SLoadRemoteDataInfo* pLoadI int32_t compLen, int32_t numOfOutput, int64_t startTs, uint64_t* total, SArray* pColList); void getAlignQueryTimeWindow(SInterval* pInterval, int32_t precision, int64_t key, STimeWindow* win); +int32_t getTableScanOrder(SOperatorInfo* pOperator); void doSetOperatorCompleted(SOperatorInfo* pOperator); void doFilter(const SNode* pFilterNode, SSDataBlock* pBlock); @@ -630,9 +636,9 @@ SqlFunctionCtx* createSqlFunctionCtx(SExprInfo* pExprInfo, int32_t numOfOutput, SOperatorInfo* createExchangeOperatorInfo(const SNodeList* pSources, SSDataBlock* pBlock, SExecTaskInfo* pTaskInfo); -SOperatorInfo* createTableScanOperatorInfo(void* pReaderHandle, int32_t order, int32_t numOfCols, int32_t dataLoadFlag, int32_t repeatTime, - int32_t reverseTime, SArray* pColMatchInfo, SSDataBlock* pResBlock, SNode* pCondition, - SInterval* pInterval, double ratio, SExecTaskInfo* pTaskInfo); +SOperatorInfo* createTableScanOperatorInfo(void* pDataReader, SQueryTableDataCond* pCond, int32_t numOfOutput, int32_t dataLoadFlag, const uint8_t* scanInfo, + SArray* pColMatchInfo, SSDataBlock* pResBlock, SNode* pCondition, SInterval* pInterval, double sampleRatio, SExecTaskInfo* pTaskInfo); + SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResultBlock, SExprInfo* pScalarExprInfo, int32_t numOfScalarExpr, SExecTaskInfo* pTaskInfo, const STableGroupInfo* pTableGroupInfo); diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index 434591c4c3..3541013015 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -191,7 +191,7 @@ static SColumnInfo* extractColumnFilterInfo(SExprInfo* pExpr, int32_t numOfOutpu static int32_t setTimestampListJoinInfo(STaskRuntimeEnv* pRuntimeEnv, SVariant* pTag, STableQueryInfo* pTableQueryInfo); static void releaseQueryBuf(size_t numOfTables); static int32_t binarySearchForKey(char* pValue, int num, TSKEY key, int order); -// static STsdbQueryCond createTsdbQueryCond(STaskAttr* pQueryAttr, STimeWindow* win); +// static SQueryTableDataCond createTsdbQueryCond(STaskAttr* pQueryAttr, STimeWindow* win); static STableIdInfo createTableIdInfo(STableQueryInfo* pTableQueryInfo); static int32_t getNumOfScanTimes(STaskAttr* pQueryAttr); @@ -207,7 +207,6 @@ static void destroyAggOperatorInfo(void* param, int32_t numOfOutput); static void destroyIntervalOperatorInfo(void* param, int32_t numOfOutput); static void destroyExchangeOperatorInfo(void* param, int32_t numOfOutput); -static void destroyConditionOperatorInfo(void* param, int32_t numOfOutput); static void destroyOperatorInfo(SOperatorInfo* pOperator); static void destroySysTableScannerOperatorInfo(void* param, int32_t numOfOutput); @@ -3588,8 +3587,8 @@ static void doTableQueryInfoTimeWindowCheck(SExecTaskInfo* pTaskInfo, STableQuer #endif } -// STsdbQueryCond createTsdbQueryCond(STaskAttr* pQueryAttr, STimeWindow* win) { -// STsdbQueryCond cond = { +// SQueryTableDataCond createTsdbQueryCond(STaskAttr* pQueryAttr, STimeWindow* win) { +// SQueryTableDataCond cond = { // .colList = pQueryAttr->tableCols, // .order = pQueryAttr->order.order, // .numOfCols = pQueryAttr->numOfCols, @@ -4681,7 +4680,18 @@ _error: return NULL; } -static int32_t getTableScanOrder(STableScanInfo* pTableScanInfo) { return pTableScanInfo->order; } +int32_t getTableScanOrder(SOperatorInfo* pOperator) { + if (pOperator->operatorType != QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN) { + if (pOperator->pDownstream == NULL || pOperator->pDownstream[0] == NULL) { + return TSDB_ORDER_ASC; + } else { + return getTableScanOrder(pOperator->pDownstream[0]); + } + } + + STableScanInfo* pTableScanInfo = pOperator->info; + return pTableScanInfo->cond.order; +} // this is a blocking operator static int32_t doOpenAggregateOptr(SOperatorInfo* pOperator) { @@ -4885,6 +4895,76 @@ bool aggDecodeResultRow(SOperatorInfo* pOperator, SAggSupporter* pSup, SOptrBasi return true; } +enum { + PROJECT_RETRIEVE_CONTINUE = 0x1, + PROJECT_RETRIEVE_DONE = 0x2, +}; + +static int32_t handleLimitOffset(SOperatorInfo* pOperator, SSDataBlock* pBlock) { + SProjectOperatorInfo* pProjectInfo = pOperator->info; + SOptrBasicInfo* pInfo = &pProjectInfo->binfo; + SSDataBlock* pRes = pInfo->pRes; + + if (pProjectInfo->curSOffset > 0) { + if (pProjectInfo->groupId == 0) { // it is the first group + pProjectInfo->groupId = pBlock->info.groupId; + blockDataCleanup(pInfo->pRes); + return PROJECT_RETRIEVE_CONTINUE; + } else if (pProjectInfo->groupId != pBlock->info.groupId) { + pProjectInfo->curSOffset -= 1; + + // ignore data block in current group + if (pProjectInfo->curSOffset > 0) { + blockDataCleanup(pInfo->pRes); + return PROJECT_RETRIEVE_CONTINUE; + } + } + + // set current group id of the project operator + pProjectInfo->groupId = pBlock->info.groupId; + } + + if (pProjectInfo->groupId != 0 && pProjectInfo->groupId != pBlock->info.groupId) { + pProjectInfo->curGroupOutput += 1; + if ((pProjectInfo->slimit.limit > 0) && (pProjectInfo->slimit.limit <= pProjectInfo->curGroupOutput)) { + pOperator->status = OP_EXEC_DONE; + blockDataCleanup(pRes); + + return PROJECT_RETRIEVE_DONE; + } + + // reset the value for a new group data + pProjectInfo->curOffset = 0; + pProjectInfo->curOutput = 0; + } + + // here we reach the start position, according to the limit/offset requirements. + + // set current group id + pProjectInfo->groupId = pBlock->info.groupId; + + if (pProjectInfo->curOffset >= pRes->info.rows) { + pProjectInfo->curOffset -= pRes->info.rows; + blockDataCleanup(pRes); + return PROJECT_RETRIEVE_CONTINUE; + } else if (pProjectInfo->curOffset < pRes->info.rows && pProjectInfo->curOffset > 0) { + blockDataTrimFirstNRows(pRes, pProjectInfo->curOffset); + pProjectInfo->curOffset = 0; + } + + if (pRes->info.rows >= pOperator->resultInfo.threshold) { + + // check for the limitation in each group + if (pProjectInfo->limit.limit > 0 && pProjectInfo->curOutput + pRes->info.rows >= pProjectInfo->limit.limit) { + pRes->info.rows = (int32_t)(pProjectInfo->limit.limit - pProjectInfo->curOutput); + } + + return PROJECT_RETRIEVE_DONE; + } else { // not full enough, continue to accumulate the output data in the buffer. + return PROJECT_RETRIEVE_CONTINUE; + } +} + static SSDataBlock* doProjectOperation(SOperatorInfo* pOperator, bool* newgroup) { SProjectOperatorInfo* pProjectInfo = pOperator->info; SOptrBasicInfo* pInfo = &pProjectInfo->binfo; @@ -4953,63 +5033,22 @@ static SSDataBlock* doProjectOperation(SOperatorInfo* pOperator, bool* newgroup) // } // the pDataBlock are always the same one, no need to call this again - setInputDataBlock(pOperator, pInfo->pCtx, pBlock, TSDB_ORDER_ASC, false); + int32_t order = getTableScanOrder(pOperator->pDownstream[0]); + + setInputDataBlock(pOperator, pInfo->pCtx, pBlock, order, false); blockDataEnsureCapacity(pInfo->pRes, pInfo->pRes->info.rows + pBlock->info.rows); projectApplyFunctions(pOperator->pExpr, pInfo->pRes, pBlock, pInfo->pCtx, pOperator->numOfOutput, pProjectInfo->pPseudoColInfo); - if (pProjectInfo->curSOffset > 0) { - if (pProjectInfo->groupId == 0) { // it is the first group - pProjectInfo->groupId = pBlock->info.groupId; - blockDataCleanup(pInfo->pRes); - continue; - } else if (pProjectInfo->groupId != pBlock->info.groupId) { - pProjectInfo->curSOffset -= 1; - - // ignore data block in current group - if (pProjectInfo->curSOffset > 0) { - blockDataCleanup(pInfo->pRes); - continue; - } - } - - pProjectInfo->groupId = pBlock->info.groupId; - } - - if (pProjectInfo->groupId != 0 && pProjectInfo->groupId != pBlock->info.groupId) { - pProjectInfo->curGroupOutput += 1; - if ((pProjectInfo->slimit.limit > 0) && (pProjectInfo->slimit.limit <= pProjectInfo->curGroupOutput)) { - pOperator->status = OP_EXEC_DONE; - return NULL; - } - - // reset the value for a new group data - pProjectInfo->curOffset = 0; - pProjectInfo->curOutput = 0; - } - - pProjectInfo->groupId = pBlock->info.groupId; - - // todo extract method - if (pProjectInfo->curOffset < pInfo->pRes->info.rows && pProjectInfo->curOffset > 0) { - blockDataTrimFirstNRows(pInfo->pRes, pProjectInfo->curOffset); - pProjectInfo->curOffset = 0; - } else if (pProjectInfo->curOffset >= pInfo->pRes->info.rows) { - pProjectInfo->curOffset -= pInfo->pRes->info.rows; - blockDataCleanup(pInfo->pRes); + int32_t status = handleLimitOffset(pOperator, pBlock); + if (status == PROJECT_RETRIEVE_CONTINUE) { continue; - } - - if (pRes->info.rows >= pOperator->resultInfo.threshold) { + } else if (status == PROJECT_RETRIEVE_DONE) { break; } } - if (pProjectInfo->limit.limit > 0 && pProjectInfo->curOutput + pInfo->pRes->info.rows >= pProjectInfo->limit.limit) { - pInfo->pRes->info.rows = (int32_t)(pProjectInfo->limit.limit - pProjectInfo->curOutput); - } - pProjectInfo->curOutput += pInfo->pRes->info.rows; // copyTsColoum(pRes, pInfo->pCtx, pOperator->numOfOutput); @@ -5661,8 +5700,7 @@ static STableQueryInfo* initTableQueryInfo(const STableGroupInfo* pTableGroupInf SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResultBlock, SExprInfo* pScalarExprInfo, - int32_t numOfScalarExpr, SExecTaskInfo* pTaskInfo, - const STableGroupInfo* pTableGroupInfo) { + int32_t numOfScalarExpr, SExecTaskInfo* pTaskInfo, const STableGroupInfo* pTableGroupInfo) { SAggOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SAggOperatorInfo)); SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); if (pInfo == NULL || pOperator == NULL) { @@ -6319,6 +6357,19 @@ static SArray* extractColMatchInfo(SNodeList* pNodeList, SDataBlockDescNode* pOu static SArray* createSortInfo(SNodeList* pNodeList, SNodeList* pNodeListTarget); static SArray* createIndexMap(SNodeList* pNodeList); static SArray* extractPartitionColInfo(SNodeList* pNodeList); +static int32_t initQueryTableDataCond(SQueryTableDataCond* pCond, const STableScanPhysiNode* pTableScanNode); + +static SInterval extractIntervalInfo(const STableScanPhysiNode* pTableScanNode) { + SInterval interval = { + .interval = pTableScanNode->interval, + .sliding = pTableScanNode->sliding, + .intervalUnit = pTableScanNode->intervalUnit, + .slidingUnit = pTableScanNode->slidingUnit, + .offset = pTableScanNode->offset, + }; + + return interval; +} SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo, SReadHandle* pHandle, uint64_t queryId, uint64_t taskId, STableGroupInfo* pTableGroupInfo) { @@ -6329,7 +6380,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo SScanPhysiNode* pScanPhyNode = (SScanPhysiNode*)pPhyNode; STableScanPhysiNode* pTableScanNode = (STableScanPhysiNode*)pPhyNode; - int32_t numOfCols = 0; + int32_t numOfCols = 0; tsdbReaderT pDataReader = doCreateDataReader(pTableScanNode, pHandle, pTableGroupInfo, (uint64_t)queryId, taskId); if (pDataReader == NULL && terrno != 0) { return NULL; @@ -6339,16 +6390,14 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo extractColMatchInfo(pScanPhyNode->pScanCols, pScanPhyNode->node.pOutputDataBlockDesc, &numOfCols); SSDataBlock* pResBlock = createResDataBlock(pScanPhyNode->node.pOutputDataBlockDesc); - SInterval interval = { - .interval = pTableScanNode->interval, - .sliding = pTableScanNode->sliding, - .intervalUnit = pTableScanNode->intervalUnit, - .slidingUnit = pTableScanNode->slidingUnit, - .offset = pTableScanNode->offset, - }; + SQueryTableDataCond cond = {0}; + int32_t code = initQueryTableDataCond(&cond, pTableScanNode); + if (code != TSDB_CODE_SUCCESS) { + return NULL; + } - return createTableScanOperatorInfo(pDataReader, pTableScanNode->scanSeq[0] > 0 ? TSDB_ORDER_ASC : TSDB_ORDER_DESC, - numOfCols, pTableScanNode->dataRequired, pTableScanNode->scanSeq[0], pTableScanNode->scanSeq[1], pColList, + SInterval interval = extractIntervalInfo(pTableScanNode); + return createTableScanOperatorInfo(pDataReader, &cond, numOfCols, pTableScanNode->dataRequired, pTableScanNode->scanSeq, pColList, pResBlock, pScanPhyNode->node.pConditions, &interval, pTableScanNode->ratio, pTaskInfo); } else if (QUERY_NODE_PHYSICAL_PLAN_EXCHANGE == type) { SExchangePhysiNode* pExchange = (SExchangePhysiNode*)pPhyNode; @@ -6369,10 +6418,10 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo return pOperator; } else if (QUERY_NODE_PHYSICAL_PLAN_SYSTABLE_SCAN == type) { SSystemTableScanPhysiNode* pSysScanPhyNode = (SSystemTableScanPhysiNode*)pPhyNode; - SSDataBlock* pResBlock = createResDataBlock(pSysScanPhyNode->scan.node.pOutputDataBlockDesc); + SScanPhysiNode* pScanNode = &pSysScanPhyNode->scan; - struct SScanPhysiNode* pScanNode = &pSysScanPhyNode->scan; - SArray* colList = extractScanColumnId(pScanNode->pScanCols); + SSDataBlock* pResBlock = createResDataBlock(pScanNode->node.pOutputDataBlockDesc); + SArray* colList = extractScanColumnId(pScanNode->pScanCols); SOperatorInfo* pOperator = createSysTableScanOperatorInfo( pHandle->meta, pResBlock, &pScanNode->tableName, pScanNode->node.pConditions, pSysScanPhyNode->mgmtEpSet, @@ -6493,38 +6542,47 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo return pOptr; } -static tsdbReaderT createDataReaderImpl(STableScanPhysiNode* pTableScanNode, STableGroupInfo* pGroupInfo, - void* readHandle, uint64_t queryId, uint64_t taskId) { - STsdbQueryCond cond = {.loadExternalRows = false}; - cond.order = pTableScanNode->scanSeq[0] > 0 ? TSDB_ORDER_ASC : TSDB_ORDER_DESC; - cond.numOfCols = LIST_LENGTH(pTableScanNode->scan.pScanCols); - cond.colList = taosMemoryCalloc(cond.numOfCols, sizeof(SColumnInfo)); - if (cond.colList == NULL) { +static int32_t initQueryTableDataCond(SQueryTableDataCond* pCond, const STableScanPhysiNode* pTableScanNode) { + pCond->loadExternalRows = false; + + pCond->order = pTableScanNode->scanSeq[0] > 0 ? TSDB_ORDER_ASC : TSDB_ORDER_DESC; + pCond->numOfCols = LIST_LENGTH(pTableScanNode->scan.pScanCols); + pCond->colList = taosMemoryCalloc(pCond->numOfCols, sizeof(SColumnInfo)); + if (pCond->colList == NULL) { terrno = TSDB_CODE_QRY_OUT_OF_MEMORY; - return NULL; + return terrno; } - cond.twindow = pTableScanNode->scanRange; - cond.type = BLOCK_LOAD_OFFSET_SEQ_ORDER; - // cond.type = pTableScanNode->scanFlag; + pCond->twindow = pTableScanNode->scanRange; + +#if 1 + //todo work around a problem, remove it later + if ((pCond->order == TSDB_ORDER_ASC && pCond->twindow.skey > pCond->twindow.ekey) || + (pCond->order == TSDB_ORDER_DESC && pCond->twindow.skey < pCond->twindow.ekey)) { + TSWAP(pCond->twindow.skey, pCond->twindow.ekey, int64_t); + } +#endif + + pCond->type = BLOCK_LOAD_OFFSET_SEQ_ORDER; + // pCond->type = pTableScanNode->scanFlag; int32_t j = 0; - for (int32_t i = 0; i < cond.numOfCols; ++i) { + for (int32_t i = 0; i < pCond->numOfCols; ++i) { STargetNode* pNode = (STargetNode*)nodesListGetNode(pTableScanNode->scan.pScanCols, i); SColumnNode* pColNode = (SColumnNode*)pNode->pExpr; if (pColNode->colType == COLUMN_TYPE_TAG) { continue; } - cond.colList[j].type = pColNode->node.resType.type; - cond.colList[j].bytes = pColNode->node.resType.bytes; - cond.colList[j].colId = pColNode->colId; + pCond->colList[j].type = pColNode->node.resType.type; + pCond->colList[j].bytes = pColNode->node.resType.bytes; + pCond->colList[j].colId = pColNode->colId; j += 1; } - cond.numOfCols = j; - return tsdbQueryTables(readHandle, &cond, pGroupInfo, queryId, taskId); + pCond->numOfCols = j; + return TSDB_CODE_SUCCESS; } SArray* extractScanColumnId(SNodeList* pNodeList) { @@ -6742,7 +6800,13 @@ tsdbReaderT doCreateDataReader(STableScanPhysiNode* pTableScanNode, SReadHandle* goto _error; } - return createDataReaderImpl(pTableScanNode, pTableGroupInfo, pHandle->reader, queryId, taskId); + SQueryTableDataCond cond = {0}; + code = initQueryTableDataCond(&cond, pTableScanNode); + if (code != TSDB_CODE_SUCCESS) { + goto _error; + } + + return tsdbQueryTables(pHandle->reader, &cond, pTableGroupInfo, queryId, taskId); _error: terrno = code; diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 64a28e48e6..277ab9bc6f 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -245,6 +245,10 @@ int32_t loadDataBlock(SOperatorInfo* pOperator, STableScanInfo* pTableScanInfo, relocateColumnData(pBlock, pTableScanInfo->pColMatchInfo, pCols); + // reset the block to be 0 by default, this blockId is assigned by physical plan and is used by direct upstream + // operator. + pBlock->info.blockId = 0; + doFilter(pTableScanInfo->pFilterNode, pBlock); if (pBlock->info.rows == 0) { pCost->filterOutBlocks += 1; @@ -255,17 +259,15 @@ int32_t loadDataBlock(SOperatorInfo* pOperator, STableScanInfo* pTableScanInfo, return TSDB_CODE_SUCCESS; } -static void setupEnvForReverseScan(STableScanInfo* pTableScanInfo, SqlFunctionCtx* pCtx, int32_t numOfOutput) { - // reverse order time range +static void prepareForDescendingScan(STableScanInfo* pTableScanInfo, SqlFunctionCtx* pCtx, int32_t numOfOutput) { SET_REVERSE_SCAN_FLAG(pTableScanInfo); switchCtxOrder(pCtx, numOfOutput); - SWITCH_ORDER(pTableScanInfo->order); - setupQueryRangeForReverseScan(pTableScanInfo); + // setupQueryRangeForReverseScan(pTableScanInfo); - pTableScanInfo->times = 1; - pTableScanInfo->current = 0; - pTableScanInfo->reverseTimes = 0; + STimeWindow* pTWindow = &pTableScanInfo->cond.twindow; + TSWAP(pTWindow->skey, pTWindow->ekey, int64_t); + pTableScanInfo->cond.order = TSDB_ORDER_DESC; } static SSDataBlock* doTableScanImpl(SOperatorInfo* pOperator, bool* newgroup) { @@ -294,9 +296,6 @@ static SSDataBlock* doTableScanImpl(SOperatorInfo* pOperator, bool* newgroup) { continue; } - // reset the block to be 0 by default, this blockId is assigned by physical plan and is used by direct upstream - // operator. - pBlock->info.blockId = 0; return pBlock; } @@ -312,64 +311,71 @@ static SSDataBlock* doTableScan(SOperatorInfo* pOperator, bool* newgroup) { return NULL; } - SResultRowInfo* pResultRowInfo = pTableScanInfo->pResultRowInfo; *newgroup = false; - while (pTableScanInfo->current < pTableScanInfo->times) { + while (pTableScanInfo->current < pTableScanInfo->scanInfo.numOfAsc) { SSDataBlock* p = doTableScanImpl(pOperator, newgroup); if (p != NULL) { return p; } - if (++pTableScanInfo->current >= pTableScanInfo->times) { - if (pTableScanInfo->reverseTimes <= 0 /* || isTsdbCacheLastRow(pTableScanInfo->pTsdbReadHandle)*/) { - return NULL; - } else { - break; + pTableScanInfo->current += 1; + + if (pTableScanInfo->current < pTableScanInfo->scanInfo.numOfAsc) { + setTaskStatus(pTaskInfo, TASK_NOT_COMPLETED); + pTableScanInfo->scanFlag = REPEAT_SCAN; + + STimeWindow* pWin = &pTableScanInfo->cond.twindow; + qDebug("%s start to repeat ascending order scan data blocks due to query func required, qrange:%" PRId64 + "-%" PRId64, + GET_TASKID(pTaskInfo), pWin->skey, pWin->ekey); + + // do prepare for the next round table scan operation + tsdbResetReadHandle(pTableScanInfo->dataReader, &pTableScanInfo->cond); + } + } + + int32_t total = pTableScanInfo->scanInfo.numOfAsc + pTableScanInfo->scanInfo.numOfDesc; + if (pTableScanInfo->current < total) { + if (pTableScanInfo->cond.order == TSDB_ORDER_ASC) { + prepareForDescendingScan(pTableScanInfo, pTableScanInfo->pCtx, pTableScanInfo->numOfOutput); + tsdbResetReadHandle(pTableScanInfo->dataReader, &pTableScanInfo->cond); + } + + STimeWindow* pWin = &pTableScanInfo->cond.twindow; + qDebug("%s start to descending order scan data blocks due to query func required, qrange:%" PRId64 "-%" PRId64, + GET_TASKID(pTaskInfo), pWin->skey, pWin->ekey); + + while (pTableScanInfo->current < total) { + SSDataBlock* p = doTableScanImpl(pOperator, newgroup); + if (p != NULL) { + return p; + } + + pTableScanInfo->current += 1; + + if (pTableScanInfo->current < pTableScanInfo->scanInfo.numOfAsc) { + setTaskStatus(pTaskInfo, TASK_NOT_COMPLETED); + pTableScanInfo->scanFlag = REPEAT_SCAN; + + qDebug("%s start to repeat descending order scan data blocks due to query func required, qrange:%" PRId64 + "-%" PRId64, + GET_TASKID(pTaskInfo), pTaskInfo->window.skey, pTaskInfo->window.ekey); + + // do prepare for the next round table scan operation + tsdbResetReadHandle(pTableScanInfo->dataReader, &pTableScanInfo->cond); } } - - // do prepare for the next round table scan operation - // STsdbQueryCond cond = createTsdbQueryCond(pQueryAttr, &pQueryAttr->window); - // tsdbResetQueryHandle(pTableScanInfo->pTsdbReadHandle, &cond); - - setTaskStatus(pTaskInfo, TASK_NOT_COMPLETED); - pTableScanInfo->scanFlag = REPEAT_SCAN; - - // if (pResultRowInfo->size > 0) { - // pResultRowInfo->curPos = 0; - // } - - qDebug("%s start to repeat scan data blocks due to query func required, qrange:%" PRId64 "-%" PRId64, - GET_TASKID(pTaskInfo), pTaskInfo->window.skey, pTaskInfo->window.ekey); } - SSDataBlock* p = NULL; - // todo refactor - if (pTableScanInfo->reverseTimes > 0) { - setupEnvForReverseScan(pTableScanInfo, pTableScanInfo->pCtx, pTableScanInfo->numOfOutput); - // STsdbQueryCond cond = createTsdbQueryCond(pQueryAttr, &pQueryAttr->window); - // tsdbResetQueryHandle(pTableScanInfo->pTsdbReadHandle, &cond); - - qDebug("%s start to reverse scan data blocks due to query func required, qrange:%" PRId64 "-%" PRId64, - GET_TASKID(pTaskInfo), pTaskInfo->window.skey, pTaskInfo->window.ekey); - - if (pResultRowInfo->size > 0) { - // pResultRowInfo->curPos = pResultRowInfo->size - 1; - } - - p = doTableScanImpl(pOperator, newgroup); - } - - return p; + setTaskStatus(pTaskInfo, TASK_COMPLETED); + return NULL; } -SOperatorInfo* createTableScanOperatorInfo(void* pDataReader, int32_t order, int32_t numOfOutput, int32_t dataLoadFlag, - int32_t repeatTime, int32_t reverseTime, SArray* pColMatchInfo, +SOperatorInfo* createTableScanOperatorInfo(void* pDataReader, SQueryTableDataCond* pCond, int32_t numOfOutput, + int32_t dataLoadFlag, const uint8_t* scanInfo, SArray* pColMatchInfo, SSDataBlock* pResBlock, SNode* pCondition, SInterval* pInterval, double sampleRatio, SExecTaskInfo* pTaskInfo) { - assert(repeatTime > 0); - STableScanInfo* pInfo = taosMemoryCalloc(1, sizeof(STableScanInfo)); SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); if (pInfo == NULL || pOperator == NULL) { @@ -380,18 +386,19 @@ SOperatorInfo* createTableScanOperatorInfo(void* pDataReader, int32_t order, int return NULL; } + pInfo->cond = *pCond; + pInfo->scanInfo = (SScanInfo){.numOfAsc = scanInfo[0], .numOfDesc = scanInfo[1]}; + pInfo->interval = *pInterval; pInfo->sampleRatio = sampleRatio; pInfo->dataBlockLoadFlag = dataLoadFlag; pInfo->pResBlock = pResBlock; pInfo->pFilterNode = pCondition; pInfo->dataReader = pDataReader; - pInfo->times = repeatTime; - pInfo->reverseTimes = reverseTime; - pInfo->order = order; pInfo->current = 0; pInfo->scanFlag = MAIN_SCAN; pInfo->pColMatchInfo = pColMatchInfo; + pOperator->name = "TableScanOperator"; pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN; pOperator->blockingOptr = false; @@ -413,8 +420,6 @@ SOperatorInfo* createTableSeqScanOperatorInfo(void* pTsdbReadHandle) { STableScanInfo* pInfo = taosMemoryCalloc(1, sizeof(STableScanInfo)); pInfo->dataReader = pTsdbReadHandle; - pInfo->times = 1; - pInfo->reverseTimes = 0; pInfo->current = 0; pInfo->prevGroupId = -1; diff --git a/source/libs/scalar/inc/sclvector.h b/source/libs/scalar/inc/sclvector.h index adbdd13a84..f80ffc70a2 100644 --- a/source/libs/scalar/inc/sclvector.h +++ b/source/libs/scalar/inc/sclvector.h @@ -86,8 +86,10 @@ static FORCE_INLINE _getDoubleValue_fn_t getVectorDoubleValueFn(int32_t srcType) p = getVectorDoubleValue_JSON; } else if (srcType == TSDB_DATA_TYPE_BOOL) { p = getVectorDoubleValue_BOOL; + } else if (srcType == TSDB_DATA_TYPE_NULL) { + p = NULL; } else { - assert(0); + ASSERT(0); } return p; } diff --git a/source/libs/scalar/src/scalar.c b/source/libs/scalar/src/scalar.c index 294afc8073..334c58c184 100644 --- a/source/libs/scalar/src/scalar.c +++ b/source/libs/scalar/src/scalar.c @@ -591,21 +591,25 @@ EDealRes sclRewriteOperator(SNode** pNode, SScalarCtx *ctx) { SValueNode *res = (SValueNode *)nodesMakeNode(QUERY_NODE_VALUE); if (NULL == res) { - sclError("make value node failed"); - sclFreeParam(&output); + sclError("make value node failed"); + sclFreeParam(&output); ctx->code = TSDB_CODE_QRY_OUT_OF_MEMORY; return DEAL_RES_ERROR; } - res->node.resType = node->node.resType; res->translate = true; - int32_t type = output.columnData->info.type; - if (IS_VAR_DATA_TYPE(type)) { // todo refactor - res->datum.p = output.columnData->pData; - output.columnData->pData = NULL; + if (colDataIsNull_s(output.columnData, 0)) { + res->node.resType.type = TSDB_DATA_TYPE_NULL; } else { - memcpy(nodesGetValueFromNode(res), output.columnData->pData, tDataTypes[type].bytes); + res->node.resType = node->node.resType; + int32_t type = output.columnData->info.type; + if (IS_VAR_DATA_TYPE(type)) { // todo refactor + res->datum.p = output.columnData->pData; + output.columnData->pData = NULL; + } else { + memcpy(nodesGetValueFromNode(res), output.columnData->pData, tDataTypes[type].bytes); + } } nodesDestroyNode(*pNode); @@ -628,7 +632,7 @@ EDealRes sclConstantsRewriter(SNode** pNode, void* pContext) { if (QUERY_NODE_OPERATOR == nodeType(*pNode)) { return sclRewriteOperator(pNode, ctx); - } + } return DEAL_RES_CONTINUE; } @@ -636,7 +640,7 @@ EDealRes sclConstantsRewriter(SNode** pNode, void* pContext) { EDealRes sclWalkFunction(SNode* pNode, SScalarCtx *ctx) { SFunctionNode *node = (SFunctionNode *)pNode; SScalarParam output = {0}; - + ctx->code = sclExecFunction(node, ctx, &output); if (ctx->code) { return DEAL_RES_ERROR; @@ -653,7 +657,7 @@ EDealRes sclWalkFunction(SNode* pNode, SScalarCtx *ctx) { EDealRes sclWalkLogic(SNode* pNode, SScalarCtx *ctx) { SLogicConditionNode *node = (SLogicConditionNode *)pNode; SScalarParam output = {0}; - + ctx->code = sclExecLogic(node, ctx, &output); if (ctx->code) { return DEAL_RES_ERROR; diff --git a/source/libs/scalar/src/sclvector.c b/source/libs/scalar/src/sclvector.c index f63e539a96..11a5bc9d0e 100644 --- a/source/libs/scalar/src/sclvector.c +++ b/source/libs/scalar/src/sclvector.c @@ -150,34 +150,36 @@ int64_t getVectorBigintValue_JSON(void *src, int32_t index){ _getBigintValue_fn_t getVectorBigintValueFn(int32_t srcType) { _getBigintValue_fn_t p = NULL; - if(srcType==TSDB_DATA_TYPE_TINYINT) { - p = getVectorBigintValue_TINYINT; - }else if(srcType==TSDB_DATA_TYPE_UTINYINT) { - p = getVectorBigintValue_UTINYINT; - }else if(srcType==TSDB_DATA_TYPE_SMALLINT) { - p = getVectorBigintValue_SMALLINT; - }else if(srcType==TSDB_DATA_TYPE_USMALLINT) { - p = getVectorBigintValue_USMALLINT; - }else if(srcType==TSDB_DATA_TYPE_INT) { - p = getVectorBigintValue_INT; - }else if(srcType==TSDB_DATA_TYPE_UINT) { - p = getVectorBigintValue_UINT; - }else if(srcType==TSDB_DATA_TYPE_BIGINT) { - p = getVectorBigintValue_BIGINT; - }else if(srcType==TSDB_DATA_TYPE_UBIGINT) { - p = getVectorBigintValue_UBIGINT; - }else if(srcType==TSDB_DATA_TYPE_FLOAT) { - p = getVectorBigintValue_FLOAT; - }else if(srcType==TSDB_DATA_TYPE_DOUBLE) { - p = getVectorBigintValue_DOUBLE; - }else if(srcType==TSDB_DATA_TYPE_TIMESTAMP) { - p = getVectorBigintValue_BIGINT; - }else if(srcType==TSDB_DATA_TYPE_BOOL) { - p = getVectorBigintValue_BOOL; - }else if(srcType==TSDB_DATA_TYPE_JSON) { - p = getVectorBigintValue_JSON; - }else { - assert(0); + if (srcType==TSDB_DATA_TYPE_TINYINT) { + p = getVectorBigintValue_TINYINT; + } else if (srcType==TSDB_DATA_TYPE_UTINYINT) { + p = getVectorBigintValue_UTINYINT; + } else if (srcType==TSDB_DATA_TYPE_SMALLINT) { + p = getVectorBigintValue_SMALLINT; + } else if (srcType==TSDB_DATA_TYPE_USMALLINT) { + p = getVectorBigintValue_USMALLINT; + } else if (srcType==TSDB_DATA_TYPE_INT) { + p = getVectorBigintValue_INT; + } else if (srcType==TSDB_DATA_TYPE_UINT) { + p = getVectorBigintValue_UINT; + } else if (srcType==TSDB_DATA_TYPE_BIGINT) { + p = getVectorBigintValue_BIGINT; + } else if (srcType==TSDB_DATA_TYPE_UBIGINT) { + p = getVectorBigintValue_UBIGINT; + } else if (srcType==TSDB_DATA_TYPE_FLOAT) { + p = getVectorBigintValue_FLOAT; + } else if (srcType==TSDB_DATA_TYPE_DOUBLE) { + p = getVectorBigintValue_DOUBLE; + } else if (srcType==TSDB_DATA_TYPE_TIMESTAMP) { + p = getVectorBigintValue_BIGINT; + } else if (srcType==TSDB_DATA_TYPE_BOOL) { + p = getVectorBigintValue_BOOL; + } else if (srcType==TSDB_DATA_TYPE_JSON) { + p = getVectorBigintValue_JSON; + } else if (srcType==TSDB_DATA_TYPE_NULL){ + p = NULL; + } else { + ASSERT(0); } return p; } @@ -1594,7 +1596,7 @@ _bin_scalar_fn_t getBinScalarOperatorFn(int32_t binFunctionId) { case OP_TYPE_JSON_CONTAINS: return vectorJsonContains; default: - assert(0); + ASSERT(0); return NULL; } } diff --git a/source/libs/sync/inc/syncEnv.h b/source/libs/sync/inc/syncEnv.h index e9550ff989..4f91990ccf 100644 --- a/source/libs/sync/inc/syncEnv.h +++ b/source/libs/sync/inc/syncEnv.h @@ -39,6 +39,8 @@ extern "C" { #define EMPTY_RAFT_ID ((SRaftId){.addr = 0, .vgId = 0}) typedef struct SSyncEnv { + uint8_t isStart; + // tick timer tmr_h pEnvTickTimer; int32_t envTickTimerMS; diff --git a/source/libs/sync/src/syncEnv.c b/source/libs/sync/src/syncEnv.c index ff2d4d4d27..4b2bc4130a 100644 --- a/source/libs/sync/src/syncEnv.c +++ b/source/libs/sync/src/syncEnv.c @@ -26,6 +26,14 @@ static int32_t doSyncEnvStopTimer(SSyncEnv *pSyncEnv); static void syncEnvTick(void *param, void *tmrId); // -------------------------------- +bool syncEnvIsStart() { + if (gSyncEnv == NULL) { + return false; + } + + return atomic_load_8(&(gSyncEnv->isStart)); +} + int32_t syncEnvStart() { int32_t ret = 0; taosSeedRand(taosGetTimestampSec()); @@ -88,12 +96,15 @@ static SSyncEnv *doSyncEnvStart() { // start tmr thread pSyncEnv->pTimerManager = taosTmrInit(1000, 50, 10000, "SYNC-ENV"); + + atomic_store_8(&(pSyncEnv->isStart), 1); return pSyncEnv; } static int32_t doSyncEnvStop(SSyncEnv *pSyncEnv) { assert(pSyncEnv == gSyncEnv); if (pSyncEnv != NULL) { + atomic_store_8(&(pSyncEnv->isStart), 0); taosTmrCleanUp(pSyncEnv->pTimerManager); taosMemoryFree(pSyncEnv); } diff --git a/source/libs/wal/src/walWrite.c b/source/libs/wal/src/walWrite.c index 207b7d8421..d025ca3781 100644 --- a/source/libs/wal/src/walWrite.c +++ b/source/libs/wal/src/walWrite.c @@ -61,7 +61,6 @@ int32_t walRollback(SWal *pWal, int64_t ver) { walBuildIdxName(pWal, walGetCurFileFirstVer(pWal), fnameStr); TdFilePtr pIdxTFile = taosOpenFile(fnameStr, TD_FILE_WRITE | TD_FILE_READ); - // TODO:change to deserialize function if (pIdxTFile == NULL) { taosThreadMutexUnlock(&pWal->mutex); return -1; @@ -73,7 +72,6 @@ int32_t walRollback(SWal *pWal, int64_t ver) { return -1; } // read idx file and get log file pos - // TODO:change to deserialize function SWalIdxEntry entry; if (taosReadFile(pIdxTFile, &entry, sizeof(SWalIdxEntry)) != sizeof(SWalIdxEntry)) { taosThreadMutexUnlock(&pWal->mutex); @@ -167,7 +165,7 @@ int32_t walEndSnapshot(SWal *pWal) { char fnameStr[WAL_FILE_LEN]; // remove file for (int i = 0; i < deleteCnt; i++) { - SWalFileInfo *pInfo = taosArrayGet(pWal->fileInfoSet, i); + pInfo = taosArrayGet(pWal->fileInfoSet, i); walBuildLogName(pWal, pInfo->firstVer, fnameStr); taosRemoveFile(fnameStr); walBuildIdxName(pWal, pInfo->firstVer, fnameStr); diff --git a/source/os/src/osSystem.c b/source/os/src/osSystem.c index 148529170c..62c1747619 100644 --- a/source/os/src/osSystem.c +++ b/source/os/src/osSystem.c @@ -39,11 +39,11 @@ void* taosLoadDll(const char* filename) { #else void* handle = dlopen(filename, RTLD_LAZY); if (!handle) { - //printf("load dll:%s failed, error:%s", filename, dlerror()); + // printf("load dll:%s failed, error:%s", filename, dlerror()); return NULL; } - //printf("dll %s loaded", filename); + // printf("dll %s loaded", filename); return handle; #endif @@ -59,17 +59,17 @@ void* taosLoadSym(void* handle, char* name) { char* error = NULL; if ((error = dlerror()) != NULL) { - //printf("load sym:%s failed, error:%s", name, dlerror()); + // printf("load sym:%s failed, error:%s", name, dlerror()); return NULL; } - //printf("sym %s loaded", name); + // printf("sym %s loaded", name); return sym; #endif } -void taosCloseDll(void* handle) { +void taosCloseDll(void* handle) { #if defined(WINDOWS) return; #elif defined(_TD_DARWIN_64) @@ -100,7 +100,7 @@ int taosSetConsoleEcho(bool on) { struct termios term; if (tcgetattr(STDIN_FILENO, &term) == -1) { - perror("Cannot get the attribution of the terminal"); + /*perror("Cannot get the attribution of the terminal");*/ return -1; } @@ -111,7 +111,7 @@ int taosSetConsoleEcho(bool on) { err = tcsetattr(STDIN_FILENO, TCSAFLUSH, &term); if (err == -1 || err == EINTR) { - printf("Cannot set the attribution of the terminal"); + /*printf("Cannot set the attribution of the terminal");*/ return -1; } @@ -154,7 +154,7 @@ void taosSetTerminalMode() { int32_t taosGetOldTerminalMode() { #if defined(WINDOWS) - + #else /* Make sure stdin is a terminal. */ if (!isatty(STDIN_FILENO)) { @@ -181,7 +181,7 @@ void taosResetTerminalMode() { #endif } -TdCmdPtr taosOpenCmd(const char *cmd) { +TdCmdPtr taosOpenCmd(const char* cmd) { if (cmd == NULL) return NULL; #ifdef WINDOWS return (TdCmdPtr)_popen(cmd, "r"); @@ -190,8 +190,8 @@ TdCmdPtr taosOpenCmd(const char *cmd) { #endif } -int64_t taosGetLineCmd(TdCmdPtr pCmd, char ** __restrict ptrBuf) { - if (pCmd == NULL || ptrBuf == NULL ) { +int64_t taosGetLineCmd(TdCmdPtr pCmd, char** __restrict ptrBuf) { + if (pCmd == NULL || ptrBuf == NULL) { return -1; } if (*ptrBuf != NULL) { @@ -219,7 +219,7 @@ int32_t taosEOFCmd(TdCmdPtr pCmd) { return feof((FILE*)pCmd); } -int64_t taosCloseCmd(TdCmdPtr *ppCmd) { +int64_t taosCloseCmd(TdCmdPtr* ppCmd) { if (ppCmd == NULL || *ppCmd == NULL) { return 0; } diff --git a/source/util/src/tconfig.c b/source/util/src/tconfig.c index 5bfef810c3..b864f48522 100644 --- a/source/util/src/tconfig.c +++ b/source/util/src/tconfig.c @@ -145,12 +145,6 @@ static int32_t cfgCheckAndSetDir(SConfigItem *pItem, const char *inputDir) { return -1; } - if (taosRealPath(fullDir, NULL, PATH_MAX) != 0) { - terrno = TAOS_SYSTEM_ERROR(errno); - uError("failed to get realpath of dir:%s since %s", inputDir, terrstr()); - return -1; - } - taosMemoryFreeClear(pItem->str); pItem->str = strdup(fullDir); if (pItem->str == NULL) { diff --git a/source/util/src/terror.c b/source/util/src/terror.c index 399a2255ac..d3059bffd0 100644 --- a/source/util/src/terror.c +++ b/source/util/src/terror.c @@ -144,17 +144,10 @@ TAOS_DEFINE_ERROR(TSDB_CODE_TSC_STMT_CLAUSE_ERROR, "not supported stmt cl // mnode-common TAOS_DEFINE_ERROR(TSDB_CODE_MND_APP_ERROR, "Mnode internal error") -TAOS_DEFINE_ERROR(TSDB_CODE_MND_NOT_READY, "Cluster not ready") -TAOS_DEFINE_ERROR(TSDB_CODE_MND_MSG_NOT_PROCESSED, "Message not processed") +TAOS_DEFINE_ERROR(TSDB_CODE_MND_NOT_READY, "Mnode not ready") TAOS_DEFINE_ERROR(TSDB_CODE_MND_ACTION_IN_PROGRESS, "Message is progressing") -TAOS_DEFINE_ERROR(TSDB_CODE_MND_ACTION_NEED_REPROCESSED, "Message need to be reprocessed") TAOS_DEFINE_ERROR(TSDB_CODE_MND_NO_RIGHTS, "Insufficient privilege for operation") -TAOS_DEFINE_ERROR(TSDB_CODE_MND_INVALID_OPTIONS, "Invalid mnode options") TAOS_DEFINE_ERROR(TSDB_CODE_MND_INVALID_CONNECTION, "Invalid message connection") -TAOS_DEFINE_ERROR(TSDB_CODE_MND_INVALID_MSG_VERSION, "Incompatible protocol version") -TAOS_DEFINE_ERROR(TSDB_CODE_MND_INVALID_MSG_LEN, "Invalid message length") -TAOS_DEFINE_ERROR(TSDB_CODE_MND_INVALID_MSG_TYPE, "Invalid message type") -TAOS_DEFINE_ERROR(TSDB_CODE_MND_TOO_MANY_SHELL_CONNS, "Too many connections") // mnode-show TAOS_DEFINE_ERROR(TSDB_CODE_MND_INVALID_SHOWOBJ, "Data expired") diff --git a/tests/script/sh/deploy.sh b/tests/script/sh/deploy.sh index b8abd4ff10..ec847dedbb 100755 --- a/tests/script/sh/deploy.sh +++ b/tests/script/sh/deploy.sh @@ -135,7 +135,7 @@ echo "qDebugFlag 143" >> $TAOS_CFG echo "rpcDebugFlag 143" >> $TAOS_CFG echo "tmrDebugFlag 131" >> $TAOS_CFG echo "uDebugFlag 143" >> $TAOS_CFG -echo "sDebugFlag 143" >> $TAOS_CFG +echo "sDebugFlag 135" >> $TAOS_CFG echo "wDebugFlag 143" >> $TAOS_CFG echo "numOfLogLines 20000000" >> $TAOS_CFG echo "statusInterval 1" >> $TAOS_CFG diff --git a/tests/script/tsim/snode/basic1.sim b/tests/script/tsim/snode/basic1.sim index 2351403909..660951c591 100644 --- a/tests/script/tsim/snode/basic1.sim +++ b/tests/script/tsim/snode/basic1.sim @@ -75,46 +75,46 @@ if $data02 != LEADER then return -1 endi -print =============== create drop qnode 1 -sql create qnode on dnode 1 -sql show qnodes +print =============== create drop snode 1 +sql create snode on dnode 1 +sql show snodes if $rows != 1 then return -1 endi if $data00 != 1 then return -1 endi -sql_error create qnode on dnode 1 +sql_error create snode on dnode 1 -sql drop qnode on dnode 1 -sql show qnodes +sql drop snode on dnode 1 +sql show snodes if $rows != 0 then return -1 endi -sql_error drop qnode on dnode 1 +sql_error drop snode on dnode 1 -print =============== create drop qnode 2 -sql create qnode on dnode 2 -sql show qnodes +print =============== create drop snode 2 +sql create snode on dnode 2 +sql show snodes if $rows != 1 then return -1 endi if $data00 != 2 then return -1 endi -sql_error create qnode on dnode 2 +sql_error create snode on dnode 2 -sql drop qnode on dnode 2 -sql show qnodes +sql drop snode on dnode 2 +sql show snodes if $rows != 0 then return -1 endi -sql_error drop qnode on dnode 2 +sql_error drop snode on dnode 2 -print =============== create drop qnodes -sql create qnode on dnode 1 -sql create qnode on dnode 2 -sql show qnodes +print =============== create drop snodes +sql create snode on dnode 1 +sql create snode on dnode 2 +sql show snodes if $rows != 2 then return -1 endi @@ -126,7 +126,7 @@ system sh/exec.sh -n dnode1 -s start system sh/exec.sh -n dnode2 -s start sleep 2000 -sql show qnodes +sql show snodes if $rows != 2 then return -1 endi diff --git a/tests/script/tsim/tmq/basic1.sim b/tests/script/tsim/tmq/basic1.sim index d7534338e1..0c96635a78 100644 --- a/tests/script/tsim/tmq/basic1.sim +++ b/tests/script/tsim/tmq/basic1.sim @@ -25,7 +25,7 @@ $rowsPerCtb = 10 $tstart = 1640966400000 # 2022-01-01 00:00:00.000 #---- global parameters end ----# -$pullDelay = 5 +$pullDelay = 3 $ifcheckdata = 1 $showMsg = 1 $showRow = 0 diff --git a/tests/script/tsim/tmq/basic2.sim b/tests/script/tsim/tmq/basic2.sim index ac0d2bb6df..53f10e2247 100644 --- a/tests/script/tsim/tmq/basic2.sim +++ b/tests/script/tsim/tmq/basic2.sim @@ -25,7 +25,7 @@ $rowsPerCtb = 10 $tstart = 1640966400000 # 2022-01-01 00:00:00.000 #---- global parameters end ----# -$pullDelay = 5 +$pullDelay = 3 $ifcheckdata = 1 $showMsg = 1 $showRow = 0 diff --git a/tests/script/tsim/tmq/basic3.sim b/tests/script/tsim/tmq/basic3.sim index c0ba2c97fb..de771ba892 100644 --- a/tests/script/tsim/tmq/basic3.sim +++ b/tests/script/tsim/tmq/basic3.sim @@ -25,7 +25,7 @@ $rowsPerCtb = 10 $tstart = 1640966400000 # 2022-01-01 00:00:00.000 #---- global parameters end ----# -$pullDelay = 5 +$pullDelay = 3 $ifcheckdata = 1 $showMsg = 1 $showRow = 0 diff --git a/tests/script/tsim/tmq/basic4.sim b/tests/script/tsim/tmq/basic4.sim index 1eed93463c..42023bda7e 100644 --- a/tests/script/tsim/tmq/basic4.sim +++ b/tests/script/tsim/tmq/basic4.sim @@ -25,7 +25,7 @@ $rowsPerCtb = 10 $tstart = 1640966400000 # 2022-01-01 00:00:00.000 #---- global parameters end ----# -$pullDelay = 5 +$pullDelay = 3 $ifcheckdata = 1 $showMsg = 1 $showRow = 0