Merge remote-tracking branch 'origin/3.0' into feature/3.0_liaohj

This commit is contained in:
Haojun Liao 2022-04-29 22:23:13 +08:00
commit aad6740d47
28 changed files with 347 additions and 187 deletions

View File

@ -44,7 +44,6 @@ ENDIF ()
IF (TD_WINDOWS) IF (TD_WINDOWS)
MESSAGE("${Yellow} set compiler flag for Windows! ${ColourReset}") MESSAGE("${Yellow} set compiler flag for Windows! ${ColourReset}")
SET(CMAKE_GENERATOR "NMake Makefiles" CACHE INTERNAL "" FORCE)
SET(COMMON_FLAGS "/W3 /D_WIN32") SET(COMMON_FLAGS "/W3 /D_WIN32")
# IF (MSVC AND (MSVC_VERSION GREATER_EQUAL 1900)) # IF (MSVC AND (MSVC_VERSION GREATER_EQUAL 1900))

View File

@ -14,9 +14,7 @@
*/ */
#include <assert.h> #include <assert.h>
#include <signal.h>
#include <stdio.h> #include <stdio.h>
#include <stdlib.h>
#include <string.h> #include <string.h>
#include <time.h> #include <time.h>
#include "taos.h" #include "taos.h"

View File

@ -842,7 +842,7 @@ typedef struct {
char db[TSDB_DB_FNAME_LEN]; char db[TSDB_DB_FNAME_LEN];
int64_t dbUid; int64_t dbUid;
int32_t vgVersion; int32_t vgVersion;
int32_t numOfStables; int32_t numOfStables;
int32_t buffer; int32_t buffer;
int32_t pageSize; int32_t pageSize;
int32_t pages; int32_t pages;
@ -1442,32 +1442,32 @@ typedef struct {
SArray* lostConsumers; // SArray<int64_t> SArray* lostConsumers; // SArray<int64_t>
SArray* removedConsumers; // SArray<int64_t> SArray* removedConsumers; // SArray<int64_t>
SArray* newConsumers; // SArray<int64_t> SArray* newConsumers; // SArray<int64_t>
} SMqRebSubscribe; } SMqRebInfo;
static FORCE_INLINE SMqRebSubscribe* tNewSMqRebSubscribe(const char* key) { static FORCE_INLINE SMqRebInfo* tNewSMqRebSubscribe(const char* key) {
SMqRebSubscribe* pRebSub = (SMqRebSubscribe*)taosMemoryCalloc(1, sizeof(SMqRebSubscribe)); SMqRebInfo* pRebInfo = (SMqRebInfo*)taosMemoryCalloc(1, sizeof(SMqRebInfo));
if (pRebSub == NULL) { if (pRebInfo == NULL) {
goto _err; goto _err;
} }
strcpy(pRebSub->key, key); strcpy(pRebInfo->key, key);
pRebSub->lostConsumers = taosArrayInit(0, sizeof(int64_t)); pRebInfo->lostConsumers = taosArrayInit(0, sizeof(int64_t));
if (pRebSub->lostConsumers == NULL) { if (pRebInfo->lostConsumers == NULL) {
goto _err; goto _err;
} }
pRebSub->removedConsumers = taosArrayInit(0, sizeof(int64_t)); pRebInfo->removedConsumers = taosArrayInit(0, sizeof(int64_t));
if (pRebSub->removedConsumers == NULL) { if (pRebInfo->removedConsumers == NULL) {
goto _err; goto _err;
} }
pRebSub->newConsumers = taosArrayInit(0, sizeof(int64_t)); pRebInfo->newConsumers = taosArrayInit(0, sizeof(int64_t));
if (pRebSub->newConsumers == NULL) { if (pRebInfo->newConsumers == NULL) {
goto _err; goto _err;
} }
return pRebSub; return pRebInfo;
_err: _err:
taosArrayDestroy(pRebSub->lostConsumers); taosArrayDestroy(pRebInfo->lostConsumers);
taosArrayDestroy(pRebSub->removedConsumers); taosArrayDestroy(pRebInfo->removedConsumers);
taosArrayDestroy(pRebSub->newConsumers); taosArrayDestroy(pRebInfo->newConsumers);
taosMemoryFreeClear(pRebSub); taosMemoryFreeClear(pRebInfo);
return NULL; return NULL;
} }

View File

@ -231,6 +231,7 @@ typedef struct SSelectStmt {
uint8_t precision; uint8_t precision;
bool isEmptyResult; bool isEmptyResult;
bool hasAggFuncs; bool hasAggFuncs;
bool isTimeOrderQuery;
} SSelectStmt; } SSelectStmt;
typedef enum ESetOperatorType { SET_OP_TYPE_UNION_ALL = 1, SET_OP_TYPE_UNION } ESetOperatorType; typedef enum ESetOperatorType { SET_OP_TYPE_UNION_ALL = 1, SET_OP_TYPE_UNION } ESetOperatorType;

View File

@ -62,7 +62,7 @@ extern "C" {
#define strncasecmp _strnicmp #define strncasecmp _strnicmp
#define wcsncasecmp _wcsnicmp #define wcsncasecmp _wcsnicmp
#define strtok_r strtok_s #define strtok_r strtok_s
#define snprintf _snprintf // #define snprintf _snprintf
#define in_addr_t unsigned long #define in_addr_t unsigned long
// #define socklen_t int // #define socklen_t int

View File

@ -517,6 +517,7 @@ void* tDecodeSMqConsumerEp(const void* buf, SMqConsumerEp* pEp);
typedef struct { typedef struct {
char key[TSDB_SUBSCRIBE_KEY_LEN]; char key[TSDB_SUBSCRIBE_KEY_LEN];
SRWLatch lock; SRWLatch lock;
int64_t dbUid;
int32_t vgNum; int32_t vgNum;
int8_t subType; int8_t subType;
int8_t withTbName; int8_t withTbName;
@ -553,9 +554,8 @@ int32_t tEncodeSMqSubActionLogObj(void** buf, const SMqSubActionLogO
void* tDecodeSMqSubActionLogObj(const void* buf, SMqSubActionLogObj* pLog); void* tDecodeSMqSubActionLogObj(const void* buf, SMqSubActionLogObj* pLog);
typedef struct { typedef struct {
const SMqSubscribeObj* pOldSub; int32_t oldConsumerNum;
const SMqTopicObj* pTopic; const SMqRebInfo* pRebInfo;
const SMqRebSubscribe* pRebInfo;
} SMqRebInputObj; } SMqRebInputObj;
typedef struct { typedef struct {

View File

@ -31,6 +31,8 @@ void mndReleaseTopic(SMnode *pMnode, SMqTopicObj *pTopic);
SSdbRaw *mndTopicActionEncode(SMqTopicObj *pTopic); SSdbRaw *mndTopicActionEncode(SMqTopicObj *pTopic);
SSdbRow *mndTopicActionDecode(SSdbRaw *pRaw); SSdbRow *mndTopicActionDecode(SSdbRaw *pRaw);
int32_t mndDropTopicByDB(SMnode *pMnode, STrans *pTrans, SDbObj *pDb);
#ifdef __cplusplus #ifdef __cplusplus
} }
#endif #endif

View File

@ -134,15 +134,15 @@ FAIL:
return -1; return -1;
} }
static SMqRebSubscribe *mndGetOrCreateRebSub(SHashObj *pHash, const char *key) { static SMqRebInfo *mndGetOrCreateRebSub(SHashObj *pHash, const char *key) {
SMqRebSubscribe *pRebSub = taosHashGet(pHash, key, strlen(key) + 1); SMqRebInfo *pRebSub = taosHashGet(pHash, key, strlen(key) + 1);
if (pRebSub == NULL) { if (pRebSub == NULL) {
pRebSub = tNewSMqRebSubscribe(key); pRebSub = tNewSMqRebSubscribe(key);
if (pRebSub == NULL) { if (pRebSub == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
return NULL; return NULL;
} }
taosHashPut(pHash, key, strlen(key) + 1, pRebSub, sizeof(SMqRebSubscribe)); taosHashPut(pHash, key, strlen(key) + 1, pRebSub, sizeof(SMqRebInfo));
} }
return pRebSub; return pRebSub;
} }
@ -189,7 +189,7 @@ static int32_t mndProcessMqTimerMsg(SNodeMsg *pMsg) {
char key[TSDB_SUBSCRIBE_KEY_LEN]; char key[TSDB_SUBSCRIBE_KEY_LEN];
char *removedTopic = taosArrayGetP(pConsumer->currentTopics, i); char *removedTopic = taosArrayGetP(pConsumer->currentTopics, i);
mndMakeSubscribeKey(key, pConsumer->cgroup, removedTopic); mndMakeSubscribeKey(key, pConsumer->cgroup, removedTopic);
SMqRebSubscribe *pRebSub = mndGetOrCreateRebSub(pRebMsg->rebSubHash, key); SMqRebInfo *pRebSub = mndGetOrCreateRebSub(pRebMsg->rebSubHash, key);
taosArrayPush(pRebSub->removedConsumers, &pConsumer->consumerId); taosArrayPush(pRebSub->removedConsumers, &pConsumer->consumerId);
} }
taosRUnLockLatch(&pConsumer->lock); taosRUnLockLatch(&pConsumer->lock);
@ -200,7 +200,7 @@ static int32_t mndProcessMqTimerMsg(SNodeMsg *pMsg) {
char key[TSDB_SUBSCRIBE_KEY_LEN]; char key[TSDB_SUBSCRIBE_KEY_LEN];
char *newTopic = taosArrayGetP(pConsumer->rebNewTopics, i); char *newTopic = taosArrayGetP(pConsumer->rebNewTopics, i);
mndMakeSubscribeKey(key, pConsumer->cgroup, newTopic); mndMakeSubscribeKey(key, pConsumer->cgroup, newTopic);
SMqRebSubscribe *pRebSub = mndGetOrCreateRebSub(pRebMsg->rebSubHash, key); SMqRebInfo *pRebSub = mndGetOrCreateRebSub(pRebMsg->rebSubHash, key);
taosArrayPush(pRebSub->newConsumers, &pConsumer->consumerId); taosArrayPush(pRebSub->newConsumers, &pConsumer->consumerId);
} }
@ -209,7 +209,7 @@ static int32_t mndProcessMqTimerMsg(SNodeMsg *pMsg) {
char key[TSDB_SUBSCRIBE_KEY_LEN]; char key[TSDB_SUBSCRIBE_KEY_LEN];
char *removedTopic = taosArrayGetP(pConsumer->rebRemovedTopics, i); char *removedTopic = taosArrayGetP(pConsumer->rebRemovedTopics, i);
mndMakeSubscribeKey(key, pConsumer->cgroup, removedTopic); mndMakeSubscribeKey(key, pConsumer->cgroup, removedTopic);
SMqRebSubscribe *pRebSub = mndGetOrCreateRebSub(pRebMsg->rebSubHash, key); SMqRebInfo *pRebSub = mndGetOrCreateRebSub(pRebMsg->rebSubHash, key);
taosArrayPush(pRebSub->removedConsumers, &pConsumer->consumerId); taosArrayPush(pRebSub->removedConsumers, &pConsumer->consumerId);
} }
taosRUnLockLatch(&pConsumer->lock); taosRUnLockLatch(&pConsumer->lock);

View File

@ -241,7 +241,7 @@ int32_t tEncodeSMqConsumerEp(void **buf, const SMqConsumerEp *pConsumerEp) {
void *tDecodeSMqConsumerEp(const void *buf, SMqConsumerEp *pConsumerEp) { void *tDecodeSMqConsumerEp(const void *buf, SMqConsumerEp *pConsumerEp) {
buf = taosDecodeFixedI64(buf, &pConsumerEp->consumerId); buf = taosDecodeFixedI64(buf, &pConsumerEp->consumerId);
buf = taosDecodeArray(buf, &pConsumerEp->vgs, (FDecode)tDecodeSMqVgEp, sizeof(SMqSubVgEp)); buf = taosDecodeArray(buf, &pConsumerEp->vgs, (FDecode)tDecodeSMqVgEp, sizeof(SMqVgEp));
#if 0 #if 0
int32_t sz; int32_t sz;
buf = taosDecodeFixedI32(buf, &sz); buf = taosDecodeFixedI32(buf, &sz);
@ -277,6 +277,7 @@ SMqSubscribeObj *tCloneSubscribeObj(const SMqSubscribeObj *pSub) {
memcpy(pSubNew->key, pSub->key, TSDB_SUBSCRIBE_KEY_LEN); memcpy(pSubNew->key, pSub->key, TSDB_SUBSCRIBE_KEY_LEN);
taosInitRWLatch(&pSubNew->lock); taosInitRWLatch(&pSubNew->lock);
pSubNew->dbUid = pSub->dbUid;
pSubNew->subType = pSub->subType; pSubNew->subType = pSub->subType;
pSubNew->withTbName = pSub->withTbName; pSubNew->withTbName = pSub->withTbName;
pSubNew->withSchema = pSub->withSchema; pSubNew->withSchema = pSub->withSchema;
@ -310,6 +311,7 @@ void tDeleteSubscribeObj(SMqSubscribeObj *pSub) {
int32_t tEncodeSubscribeObj(void **buf, const SMqSubscribeObj *pSub) { int32_t tEncodeSubscribeObj(void **buf, const SMqSubscribeObj *pSub) {
int32_t tlen = 0; int32_t tlen = 0;
tlen += taosEncodeString(buf, pSub->key); tlen += taosEncodeString(buf, pSub->key);
tlen += taosEncodeFixedI64(buf, pSub->dbUid);
tlen += taosEncodeFixedI32(buf, pSub->vgNum); tlen += taosEncodeFixedI32(buf, pSub->vgNum);
tlen += taosEncodeFixedI8(buf, pSub->subType); tlen += taosEncodeFixedI8(buf, pSub->subType);
tlen += taosEncodeFixedI8(buf, pSub->withTbName); tlen += taosEncodeFixedI8(buf, pSub->withTbName);
@ -336,6 +338,7 @@ int32_t tEncodeSubscribeObj(void **buf, const SMqSubscribeObj *pSub) {
void *tDecodeSubscribeObj(const void *buf, SMqSubscribeObj *pSub) { void *tDecodeSubscribeObj(const void *buf, SMqSubscribeObj *pSub) {
// //
buf = taosDecodeStringTo(buf, pSub->key); buf = taosDecodeStringTo(buf, pSub->key);
buf = taosDecodeFixedI64(buf, &pSub->dbUid);
buf = taosDecodeFixedI32(buf, &pSub->vgNum); buf = taosDecodeFixedI32(buf, &pSub->vgNum);
buf = taosDecodeFixedI8(buf, &pSub->subType); buf = taosDecodeFixedI8(buf, &pSub->subType);
buf = taosDecodeFixedI8(buf, &pSub->withTbName); buf = taosDecodeFixedI8(buf, &pSub->withTbName);

View File

@ -731,7 +731,6 @@ _OVER:
static int32_t mndProcessMCreateStbReq(SNodeMsg *pReq) { static int32_t mndProcessMCreateStbReq(SNodeMsg *pReq) {
SMnode *pMnode = pReq->pNode; SMnode *pMnode = pReq->pNode;
int32_t code = -1; int32_t code = -1;
SStbObj *pTopicStb = NULL;
SStbObj *pStb = NULL; SStbObj *pStb = NULL;
SDbObj *pDb = NULL; SDbObj *pDb = NULL;
SUserObj *pUser = NULL; SUserObj *pUser = NULL;
@ -762,12 +761,6 @@ static int32_t mndProcessMCreateStbReq(SNodeMsg *pReq) {
goto _OVER; goto _OVER;
} }
pTopicStb = mndAcquireStb(pMnode, createReq.name);
if (pTopicStb != NULL) {
terrno = TSDB_CODE_MND_NAME_CONFLICT_WITH_TOPIC;
goto _OVER;
}
pDb = mndAcquireDbByStb(pMnode, createReq.name); pDb = mndAcquireDbByStb(pMnode, createReq.name);
if (pDb == NULL) { if (pDb == NULL) {
terrno = TSDB_CODE_MND_DB_NOT_SELECTED; terrno = TSDB_CODE_MND_DB_NOT_SELECTED;
@ -785,7 +778,7 @@ static int32_t mndProcessMCreateStbReq(SNodeMsg *pReq) {
int32_t numOfStbs = -1; int32_t numOfStbs = -1;
mndGetNumOfStbs(pMnode, pDb->name, &numOfStbs); mndGetNumOfStbs(pMnode, pDb->name, &numOfStbs);
if (pDb->cfg.numOfStables == 1 && numOfStbs != 0 ) { if (pDb->cfg.numOfStables == 1 && numOfStbs != 0) {
terrno = TSDB_CODE_MND_SINGLE_STB_MODE_DB; terrno = TSDB_CODE_MND_SINGLE_STB_MODE_DB;
goto _OVER; goto _OVER;
} }
@ -799,7 +792,6 @@ _OVER:
} }
mndReleaseStb(pMnode, pStb); mndReleaseStb(pMnode, pStb);
mndReleaseStb(pMnode, pTopicStb);
mndReleaseDb(pMnode, pDb); mndReleaseDb(pMnode, pDb);
mndReleaseUser(pMnode, pUser); mndReleaseUser(pMnode, pUser);
tFreeSMCreateStbReq(&createReq); tFreeSMCreateStbReq(&createReq);

View File

@ -80,6 +80,7 @@ static SMqSubscribeObj *mndCreateSub(SMnode *pMnode, const SMqTopicObj *pTopic,
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
return NULL; return NULL;
} }
pSub->dbUid = pTopic->dbUid;
pSub->subType = pTopic->subType; pSub->subType = pTopic->subType;
pSub->withTbName = pTopic->withTbName; pSub->withTbName = pTopic->withTbName;
pSub->withSchema = pTopic->withSchema; pSub->withSchema = pTopic->withSchema;
@ -174,27 +175,20 @@ static int32_t mndSplitSubscribeKey(const char *key, char *topic, char *cgroup)
return 0; return 0;
} }
static SMqRebSubscribe *mndGetOrCreateRebSub(SHashObj *pHash, const char *key) { static SMqRebInfo *mndGetOrCreateRebSub(SHashObj *pHash, const char *key) {
SMqRebSubscribe *pRebSub = taosHashGet(pHash, key, strlen(key) + 1); SMqRebInfo *pRebSub = taosHashGet(pHash, key, strlen(key) + 1);
if (pRebSub == NULL) { if (pRebSub == NULL) {
pRebSub = tNewSMqRebSubscribe(key); pRebSub = tNewSMqRebSubscribe(key);
if (pRebSub == NULL) { if (pRebSub == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
return NULL; return NULL;
} }
taosHashPut(pHash, key, strlen(key) + 1, pRebSub, sizeof(SMqRebSubscribe)); taosHashPut(pHash, key, strlen(key) + 1, pRebSub, sizeof(SMqRebInfo));
} }
return pRebSub; return pRebSub;
} }
static int32_t mndDoRebalance(SMnode *pMnode, const SMqRebInputObj *pInput, SMqRebOutputObj *pOutput) { static int32_t mndDoRebalance(SMnode *pMnode, const SMqRebInputObj *pInput, SMqRebOutputObj *pOutput) {
if (pInput->pTopic != NULL) {
// create subscribe
pOutput->pSub = mndCreateSub(pMnode, pInput->pTopic, pInput->pRebInfo->key);
ASSERT(taosHashGetSize(pOutput->pSub->consumerHash) == 0);
} else {
pOutput->pSub = tCloneSubscribeObj(pInput->pOldSub);
}
int32_t totalVgNum = pOutput->pSub->vgNum; int32_t totalVgNum = pOutput->pSub->vgNum;
mInfo("mq rebalance subscription: %s, vgNum: %d", pOutput->pSub->key, pOutput->pSub->vgNum); mInfo("mq rebalance subscription: %s, vgNum: %d", pOutput->pSub->key, pOutput->pSub->vgNum);
@ -245,12 +239,8 @@ static int32_t mndDoRebalance(SMnode *pMnode, const SMqRebInputObj *pInput, SMqR
} }
// 3. calc vg number of each consumer // 3. calc vg number of each consumer
int32_t oldSz = 0; int32_t afterRebConsumerNum = pInput->oldConsumerNum + taosArrayGetSize(pInput->pRebInfo->newConsumers) -
if (pInput->pOldSub) { taosArrayGetSize(pInput->pRebInfo->removedConsumers);
oldSz = taosHashGetSize(pInput->pOldSub->consumerHash);
}
int32_t afterRebConsumerNum =
oldSz + taosArrayGetSize(pInput->pRebInfo->newConsumers) - taosArrayGetSize(pInput->pRebInfo->removedConsumers);
int32_t minVgCnt = 0; int32_t minVgCnt = 0;
int32_t imbConsumerNum = 0; int32_t imbConsumerNum = 0;
// calc num // calc num
@ -488,22 +478,34 @@ static int32_t mndProcessRebalanceReq(SNodeMsg *pMsg) {
rebOutput.touchedConsumers = taosArrayInit(0, sizeof(void *)); rebOutput.touchedConsumers = taosArrayInit(0, sizeof(void *));
rebOutput.rebVgs = taosArrayInit(0, sizeof(SMqRebOutputVg)); rebOutput.rebVgs = taosArrayInit(0, sizeof(SMqRebOutputVg));
SMqRebSubscribe *pRebSub = (SMqRebSubscribe *)pIter; SMqRebInfo *pRebInfo = (SMqRebInfo *)pIter;
SMqSubscribeObj *pSub = mndAcquireSubscribeByKey(pMnode, pRebSub->key); SMqSubscribeObj *pSub = mndAcquireSubscribeByKey(pMnode, pRebInfo->key);
rebInput.pRebInfo = pRebInfo;
if (pSub == NULL) { if (pSub == NULL) {
// split sub key and extract topic // split sub key and extract topic
char topic[TSDB_TOPIC_FNAME_LEN]; char topic[TSDB_TOPIC_FNAME_LEN];
char cgroup[TSDB_CGROUP_LEN]; char cgroup[TSDB_CGROUP_LEN];
mndSplitSubscribeKey(pRebSub->key, topic, cgroup); mndSplitSubscribeKey(pRebInfo->key, topic, cgroup);
SMqTopicObj *pTopic = mndAcquireTopic(pMnode, topic); SMqTopicObj *pTopic = mndAcquireTopic(pMnode, topic);
ASSERT(pTopic); ASSERT(pTopic);
taosRLockLatch(&pTopic->lock); taosRLockLatch(&pTopic->lock);
rebInput.pTopic = pTopic;
}
rebInput.pRebInfo = pRebSub; rebOutput.pSub = mndCreateSub(pMnode, pTopic, pRebInfo->key);
rebInput.pOldSub = pSub; ASSERT(taosHashGetSize(rebOutput.pSub->consumerHash) == 0);
taosRUnLockLatch(&pTopic->lock);
mndReleaseTopic(pMnode, pTopic);
rebInput.oldConsumerNum = 0;
} else {
taosRLockLatch(&pSub->lock);
rebInput.oldConsumerNum = taosHashGetSize(pSub->consumerHash);
rebOutput.pSub = tCloneSubscribeObj(pSub);
taosRUnLockLatch(&pSub->lock);
mndReleaseSubscribe(pMnode, pSub);
}
// TODO replace assert with error check // TODO replace assert with error check
ASSERT(mndDoRebalance(pMnode, &rebInput, &rebOutput) == 0); ASSERT(mndDoRebalance(pMnode, &rebInput, &rebOutput) == 0);
@ -516,14 +518,6 @@ static int32_t mndProcessRebalanceReq(SNodeMsg *pMsg) {
if (mndPersistRebResult(pMnode, pMsg, &rebOutput) < 0) { if (mndPersistRebResult(pMnode, pMsg, &rebOutput) < 0) {
mError("persist rebalance output error, possibly vnode splitted or dropped"); mError("persist rebalance output error, possibly vnode splitted or dropped");
} }
if (rebInput.pTopic) {
SMqTopicObj *pTopic = (SMqTopicObj *)rebInput.pTopic;
taosRUnLockLatch(&pTopic->lock);
mndReleaseTopic(pMnode, pTopic);
} else {
mndReleaseSubscribe(pMnode, pSub);
}
} }
// reset flag // reset flag
@ -593,7 +587,7 @@ static SSdbRow *mndSubActionDecode(SSdbRaw *pRaw) {
int32_t dataPos = 0; int32_t dataPos = 0;
int32_t tlen; int32_t tlen;
SDB_GET_INT32(pRaw, dataPos, &tlen, SUB_DECODE_OVER); SDB_GET_INT32(pRaw, dataPos, &tlen, SUB_DECODE_OVER);
buf = taosMemoryMalloc(tlen + 1); buf = taosMemoryMalloc(tlen);
if (buf == NULL) goto SUB_DECODE_OVER; if (buf == NULL) goto SUB_DECODE_OVER;
SDB_GET_BINARY(pRaw, dataPos, buf, tlen, SUB_DECODE_OVER); SDB_GET_BINARY(pRaw, dataPos, buf, tlen, SUB_DECODE_OVER);
SDB_GET_RESERVE(pRaw, dataPos, MND_SUBSCRIBE_RESERVE_SIZE, SUB_DECODE_OVER); SDB_GET_RESERVE(pRaw, dataPos, MND_SUBSCRIBE_RESERVE_SIZE, SUB_DECODE_OVER);
@ -679,3 +673,36 @@ static int32_t mndProcessSubscribeInternalRsp(SNodeMsg *pRsp) {
mndTransProcessRsp(pRsp); mndTransProcessRsp(pRsp);
return 0; return 0;
} }
static int32_t mndSetDropSubCommitLogs(SMnode *pMnode, STrans *pTrans, SMqSubscribeObj *pSub) {
SSdbRaw *pCommitRaw = mndSubActionEncode(pSub);
if (pCommitRaw == NULL) return -1;
if (mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) return -1;
if (sdbSetRawStatus(pCommitRaw, SDB_STATUS_DROPPED) != 0) return -1;
return 0;
}
int32_t mndDropSubByDB(SMnode *pMnode, STrans *pTrans, SDbObj *pDb) {
int32_t code = -1;
SSdb *pSdb = pMnode->pSdb;
void *pIter = NULL;
SMqSubscribeObj *pSub = NULL;
while (1) {
pIter = sdbFetch(pSdb, SDB_SUBSCRIBE, pIter, (void **)&pSub);
if (pIter == NULL) break;
if (pSub->dbUid != pDb->uid) {
sdbRelease(pSdb, pSub);
continue;
}
if (mndSetDropSubCommitLogs(pMnode, pTrans, pSub) < 0) {
goto END;
}
}
code = 0;
END:
return code;
}

View File

@ -38,6 +38,8 @@ static int32_t mndProcessDropTopicInRsp(SNodeMsg *pRsp);
static int32_t mndRetrieveTopic(SNodeMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows); static int32_t mndRetrieveTopic(SNodeMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows);
static void mndCancelGetNextTopic(SMnode *pMnode, void *pIter); static void mndCancelGetNextTopic(SMnode *pMnode, void *pIter);
static int32_t mndSetDropTopicCommitLogs(SMnode *pMnode, STrans *pTrans, SMqTopicObj *pTopic);
int32_t mndInitTopic(SMnode *pMnode) { int32_t mndInitTopic(SMnode *pMnode) {
SSdbTable table = {.sdbType = SDB_TOPIC, SSdbTable table = {.sdbType = SDB_TOPIC,
.keyType = SDB_KEY_BINARY, .keyType = SDB_KEY_BINARY,
@ -553,7 +555,41 @@ static int32_t mndRetrieveTopic(SNodeMsg *pReq, SShowObj *pShow, SSDataBlock *pB
return numOfRows; return numOfRows;
} }
static int32_t mndSetDropTopicCommitLogs(SMnode *pMnode, STrans *pTrans, SMqTopicObj *pTopic) {
SSdbRaw *pCommitRaw = mndTopicActionEncode(pTopic);
if (pCommitRaw == NULL) return -1;
if (mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) return -1;
if (sdbSetRawStatus(pCommitRaw, SDB_STATUS_DROPPED) != 0) return -1;
return 0;
}
static void mndCancelGetNextTopic(SMnode *pMnode, void *pIter) { static void mndCancelGetNextTopic(SMnode *pMnode, void *pIter) {
SSdb *pSdb = pMnode->pSdb; SSdb *pSdb = pMnode->pSdb;
sdbCancelFetch(pSdb, pIter); sdbCancelFetch(pSdb, pIter);
} }
int32_t mndDropTopicByDB(SMnode *pMnode, STrans *pTrans, SDbObj *pDb) {
int32_t code = -1;
SSdb *pSdb = pMnode->pSdb;
void *pIter = NULL;
SMqTopicObj *pTopic = NULL;
while (1) {
pIter = sdbFetch(pSdb, SDB_TOPIC, pIter, (void **)&pTopic);
if (pIter == NULL) break;
if (pTopic->dbUid != pDb->uid) {
sdbRelease(pSdb, pTopic);
continue;
}
if (mndSetDropTopicCommitLogs(pMnode, pTrans, pTopic) < 0) {
goto END;
}
}
code = 0;
END:
return code;
}

View File

@ -58,6 +58,48 @@ void tqClose(STQ* pTq) {
// TODO // TODO
} }
static void tdSRowDemo() {
#define DEMO_N_COLS 3
int16_t schemaVersion = 0;
int32_t numOfCols = DEMO_N_COLS; // ts + int
SRowBuilder rb = {0};
SSchema schema[DEMO_N_COLS] = {
{.type = TSDB_DATA_TYPE_TIMESTAMP, .colId = 1, .name = "ts", .bytes = 8, .flags = SCHEMA_SMA_ON},
{.type = TSDB_DATA_TYPE_INT, .colId = 2, .name = "c1", .bytes = 4, .flags = SCHEMA_SMA_ON},
{.type = TSDB_DATA_TYPE_INT, .colId = 3, .name = "c2", .bytes = 4, .flags = SCHEMA_SMA_ON}};
SSchema* pSchema = schema;
STSchema* pTSChema = tdGetSTSChemaFromSSChema(&pSchema, numOfCols);
tdSRowInit(&rb, schemaVersion);
tdSRowSetTpInfo(&rb, numOfCols, pTSChema->flen);
int32_t maxLen = TD_ROW_MAX_BYTES_FROM_SCHEMA(pTSChema);
void* row = taosMemoryCalloc(1, maxLen); // make sure the buffer is enough
// set row buf
tdSRowResetBuf(&rb, row);
for (int32_t idx = 0; idx < pTSChema->numOfCols; ++idx) {
STColumn* pColumn = pTSChema->columns + idx;
if (idx == 0) {
int64_t tsKey = 1651234567;
tdAppendColValToRow(&rb, pColumn->colId, pColumn->type, TD_VTYPE_NORM, &tsKey, true, pColumn->offset, idx);
} else if (idx == 1) {
int32_t val1 = 10;
tdAppendColValToRow(&rb, pColumn->colId, pColumn->type, TD_VTYPE_NORM, &val1, true, pColumn->offset, idx);
} else {
tdAppendColValToRow(&rb, pColumn->colId, pColumn->type, TD_VTYPE_NONE, NULL, true, pColumn->offset, idx);
}
}
// print
tdSRowPrint(row, pTSChema, __func__);
taosMemoryFree(pTSChema);
}
int32_t tqPushMsgNew(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_t ver) { int32_t tqPushMsgNew(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_t ver) {
if (msgType != TDMT_VND_SUBMIT) return 0; if (msgType != TDMT_VND_SUBMIT) return 0;
void* pIter = NULL; void* pIter = NULL;

View File

@ -45,47 +45,6 @@ int vnodePreprocessWriteReqs(SVnode *pVnode, SArray *pMsgs, int64_t *version) {
#endif #endif
return 0; return 0;
} }
static void tdSRowDemo() {
#define DEMO_N_COLS 3
int16_t schemaVersion = 0;
int32_t numOfCols = DEMO_N_COLS; // ts + int
SRowBuilder rb = {0};
SSchema schema[DEMO_N_COLS] = {
{.type = TSDB_DATA_TYPE_TIMESTAMP, .colId = 1, .name = "ts", .bytes = 8, .flags = SCHEMA_SMA_ON},
{.type = TSDB_DATA_TYPE_INT, .colId = 2, .name = "c1", .bytes = 4, .flags = SCHEMA_SMA_ON},
{.type = TSDB_DATA_TYPE_INT, .colId = 3, .name = "c2", .bytes = 4, .flags = SCHEMA_SMA_ON}};
SSchema *pSchema = schema;
STSchema *pTSChema = tdGetSTSChemaFromSSChema(&pSchema, numOfCols);
tdSRowInit(&rb, schemaVersion);
tdSRowSetTpInfo(&rb, numOfCols, pTSChema->flen);
int32_t maxLen = TD_ROW_MAX_BYTES_FROM_SCHEMA(pTSChema);
void *row = taosMemoryCalloc(1, maxLen); // make sure the buffer is enough
// set row buf
tdSRowResetBuf(&rb, row);
for (int32_t idx = 0; idx < pTSChema->numOfCols; ++idx) {
STColumn *pColumn = pTSChema->columns + idx;
if (idx == 0) {
int64_t tsKey = 1651234567;
tdAppendColValToRow(&rb, pColumn->colId, pColumn->type, TD_VTYPE_NORM, &tsKey, true, pColumn->offset, idx);
} else if (idx == 1) {
int32_t val1 = 10;
tdAppendColValToRow(&rb, pColumn->colId, pColumn->type, TD_VTYPE_NORM, &val1, true, pColumn->offset, idx);
} else {
tdAppendColValToRow(&rb, pColumn->colId, pColumn->type, TD_VTYPE_NONE, NULL, true, pColumn->offset, idx);
}
}
// print
tdSRowPrint(row, pTSChema, __func__);
taosMemoryFree(pTSChema);
}
int vnodeProcessWriteReq(SVnode *pVnode, SRpcMsg *pMsg, int64_t version, SRpcMsg *pRsp) { int vnodeProcessWriteReq(SVnode *pVnode, SRpcMsg *pMsg, int64_t version, SRpcMsg *pRsp) {
void *ptr = NULL; void *ptr = NULL;

View File

@ -221,7 +221,7 @@ static int32_t translateSpread(SFunctionNode* pFunc, char* pErrBuf, int32_t len)
return invaildFuncParaTypeErrMsg(pErrBuf, len, pFunc->functionName); return invaildFuncParaTypeErrMsg(pErrBuf, len, pFunc->functionName);
} }
pFunc->node.resType = (SDataType) { .bytes = tDataTypes[TSDB_DATA_TYPE_DOUBLE].bytes, .type = TSDB_DATA_TYPE_DOUBLE }; pFunc->node.resType = (SDataType){.bytes = tDataTypes[TSDB_DATA_TYPE_DOUBLE].bytes, .type = TSDB_DATA_TYPE_DOUBLE};
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
@ -256,8 +256,7 @@ static int32_t translateLength(SFunctionNode* pFunc, char* pErrBuf, int32_t len)
return invaildFuncParaTypeErrMsg(pErrBuf, len, pFunc->functionName); return invaildFuncParaTypeErrMsg(pErrBuf, len, pFunc->functionName);
} }
pFunc->node.resType = pFunc->node.resType = (SDataType){.bytes = tDataTypes[TSDB_DATA_TYPE_BIGINT].bytes, .type = TSDB_DATA_TYPE_BIGINT};
(SDataType){.bytes = tDataTypes[TSDB_DATA_TYPE_BIGINT].bytes, .type = TSDB_DATA_TYPE_BIGINT};
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
@ -439,6 +438,7 @@ static int32_t translateToJson(SFunctionNode* pFunc, char* pErrBuf, int32_t len)
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
// clang-format off
const SBuiltinFuncDefinition funcMgtBuiltins[] = { const SBuiltinFuncDefinition funcMgtBuiltins[] = {
{ {
.name = "count", .name = "count",
@ -568,7 +568,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
{ {
.name = "first", .name = "first",
.type = FUNCTION_TYPE_FIRST, .type = FUNCTION_TYPE_FIRST,
.classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_MULTI_RES_FUNC, .classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_MULTI_RES_FUNC | FUNC_MGT_TIMELINE_FUNC,
.translateFunc = translateFirstLast, .translateFunc = translateFirstLast,
.getEnvFunc = getFirstLastFuncEnv, .getEnvFunc = getFirstLastFuncEnv,
.initFunc = functionSetup, .initFunc = functionSetup,
@ -578,7 +578,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
{ {
.name = "last", .name = "last",
.type = FUNCTION_TYPE_LAST, .type = FUNCTION_TYPE_LAST,
.classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_MULTI_RES_FUNC, .classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_MULTI_RES_FUNC | FUNC_MGT_TIMELINE_FUNC,
.translateFunc = translateFirstLast, .translateFunc = translateFirstLast,
.getEnvFunc = getFirstLastFuncEnv, .getEnvFunc = getFirstLastFuncEnv,
.initFunc = functionSetup, .initFunc = functionSetup,
@ -588,7 +588,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
{ {
.name = "diff", .name = "diff",
.type = FUNCTION_TYPE_DIFF, .type = FUNCTION_TYPE_DIFF,
.classification = FUNC_MGT_NONSTANDARD_SQL_FUNC, .classification = FUNC_MGT_NONSTANDARD_SQL_FUNC | FUNC_MGT_TIMELINE_FUNC,
.translateFunc = translateInOutNum, .translateFunc = translateInOutNum,
.getEnvFunc = getDiffFuncEnv, .getEnvFunc = getDiffFuncEnv,
.initFunc = diffFunctionSetup, .initFunc = diffFunctionSetup,
@ -976,5 +976,6 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
.finalizeFunc = NULL .finalizeFunc = NULL
} }
}; };
// clang-format on
const int32_t funcMgtBuiltinsNum = (sizeof(funcMgtBuiltins) / sizeof(SBuiltinFuncDefinition)); const int32_t funcMgtBuiltinsNum = (sizeof(funcMgtBuiltins) / sizeof(SBuiltinFuncDefinition));

View File

@ -21,11 +21,8 @@
#include "taos.h" #include "taos.h"
#include "taoserror.h" #include "taoserror.h"
#include "thash.h" #include "thash.h"
#include "builtins.h"
#include "catalog.h"
#include "tudf.h" #include "tudf.h"
typedef struct SFuncMgtService { typedef struct SFuncMgtService {
SHashObj* pFuncNameHashTable; SHashObj* pFuncNameHashTable;
} SFuncMgtService; } SFuncMgtService;
@ -148,6 +145,8 @@ bool fmIsAggFunc(int32_t funcId) { return isSpecificClassifyFunc(funcId, FUNC_MG
bool fmIsScalarFunc(int32_t funcId) { return isSpecificClassifyFunc(funcId, FUNC_MGT_SCALAR_FUNC); } bool fmIsScalarFunc(int32_t funcId) { return isSpecificClassifyFunc(funcId, FUNC_MGT_SCALAR_FUNC); }
bool fmIsTimelineFunc(int32_t funcId) { return isSpecificClassifyFunc(funcId, FUNC_MGT_TIMELINE_FUNC); }
bool fmIsPseudoColumnFunc(int32_t funcId) { return isSpecificClassifyFunc(funcId, FUNC_MGT_PSEUDO_COLUMN_FUNC); } bool fmIsPseudoColumnFunc(int32_t funcId) { return isSpecificClassifyFunc(funcId, FUNC_MGT_PSEUDO_COLUMN_FUNC); }
bool fmIsScanPseudoColumnFunc(int32_t funcId) { return isSpecificClassifyFunc(funcId, FUNC_MGT_SCAN_PC_FUNC); } bool fmIsScanPseudoColumnFunc(int32_t funcId) { return isSpecificClassifyFunc(funcId, FUNC_MGT_SCAN_PC_FUNC); }

View File

@ -1176,7 +1176,7 @@ int32_t callUdfAggMerge(UdfcFuncHandle handle, SUdfInterBuf *interBuf1, SUdfInte
// input: interBuf // input: interBuf
// output: resultData // output: resultData
int32_t callUdfAggFinalize(UdfcFuncHandle handle, SUdfInterBuf *interBuf, SUdfInterBuf *resultData) { int32_t callUdfAggFinalize(UdfcFuncHandle handle, SUdfInterBuf *interBuf, SUdfInterBuf *resultData) {
int8_t callType = TSDB_UDF_CALL_AGG_PROC; int8_t callType = TSDB_UDF_CALL_AGG_FIN;
int32_t err = callUdf(handle, callType, NULL, interBuf, NULL, NULL, resultData); int32_t err = callUdf(handle, callType, NULL, interBuf, NULL, NULL, resultData);
return err; return err;
} }
@ -1243,12 +1243,12 @@ bool udfAggInit(struct SqlFunctionCtx *pCtx, struct SResultRowEntryInfo* pResult
} }
SUdfUvSession *session = (SUdfUvSession *)handle; SUdfUvSession *session = (SUdfUvSession *)handle;
SUdfAggRes *udfRes = (SUdfAggRes*)GET_ROWCELL_INTERBUF(pResultCellInfo); SUdfAggRes *udfRes = (SUdfAggRes*)GET_ROWCELL_INTERBUF(pResultCellInfo);
udfRes->finalResBuf = (char*)udfRes + sizeof(SUdfAggRes);
udfRes->interResBuf = (char*)udfRes + sizeof(SUdfAggRes) + session->outputLen;
int32_t envSize = sizeof(SUdfAggRes) + session->outputLen + session->bufSize; int32_t envSize = sizeof(SUdfAggRes) + session->outputLen + session->bufSize;
memset(udfRes, 0, envSize); memset(udfRes, 0, envSize);
udfRes->finalResBuf = (char*)udfRes + sizeof(SUdfAggRes);
udfRes->interResBuf = (char*)udfRes + sizeof(SUdfAggRes) + session->outputLen;
udfRes->session = (SUdfUvSession *)handle; udfRes->session = (SUdfUvSession *)handle;
SUdfInterBuf buf = {0}; SUdfInterBuf buf = {0};
if (callUdfAggInit(handle, &buf) != 0) { if (callUdfAggInit(handle, &buf) != 0) {
@ -1260,7 +1260,6 @@ bool udfAggInit(struct SqlFunctionCtx *pCtx, struct SResultRowEntryInfo* pResult
} }
int32_t udfAggProcess(struct SqlFunctionCtx *pCtx) { int32_t udfAggProcess(struct SqlFunctionCtx *pCtx) {
SInputColumnInfoData* pInput = &pCtx->input; SInputColumnInfoData* pInput = &pCtx->input;
int32_t numOfCols = pInput->numOfInputCols; int32_t numOfCols = pInput->numOfInputCols;
@ -1320,13 +1319,15 @@ int32_t udfAggFinalize(struct SqlFunctionCtx *pCtx, SSDataBlock* pBlock) {
udfRes->interResBuf = (char*)udfRes + sizeof(SUdfAggRes) + session->outputLen; udfRes->interResBuf = (char*)udfRes + sizeof(SUdfAggRes) + session->outputLen;
SUdfInterBuf resultBuf = {.buf = udfRes->finalResBuf, SUdfInterBuf resultBuf = {0};
.bufLen = session->outputLen,
.numOfResult = udfRes->finalResNum};
SUdfInterBuf state = {.buf = udfRes->interResBuf, SUdfInterBuf state = {.buf = udfRes->interResBuf,
.bufLen = session->bufSize, .bufLen = session->bufSize,
.numOfResult = udfRes->interResNum}; .numOfResult = udfRes->interResNum};
callUdfAggFinalize(session, &state, &resultBuf); callUdfAggFinalize(session, &state, &resultBuf);
udfRes->finalResBuf = resultBuf.buf;
udfRes->finalResNum = resultBuf.numOfResult;
teardownUdf(session); teardownUdf(session);
if (resultBuf.numOfResult == 1) { if (resultBuf.numOfResult == 1) {

View File

@ -124,7 +124,7 @@ int32_t udfdLoadUdf(char *udfName, SUdf *udf) {
char *finishSuffix = "_finish"; char *finishSuffix = "_finish";
strncpy(finishFuncName, processFuncName, strlen(processFuncName)); strncpy(finishFuncName, processFuncName, strlen(processFuncName));
strncat(finishFuncName, finishSuffix, strlen(finishSuffix)); strncat(finishFuncName, finishSuffix, strlen(finishSuffix));
uv_dlsym(&udf->lib, startFuncName, (void **)(&udf->aggFinishFunc)); uv_dlsym(&udf->lib, finishFuncName, (void **)(&udf->aggFinishFunc));
//TODO: merge //TODO: merge
} }
return 0; return 0;

View File

@ -27,7 +27,7 @@ int32_t udf2_start(SUdfInterBuf *buf) {
int32_t udf2(SUdfDataBlock* block, SUdfInterBuf *interBuf, SUdfInterBuf *newInterBuf) { int32_t udf2(SUdfDataBlock* block, SUdfInterBuf *interBuf, SUdfInterBuf *newInterBuf) {
int64_t sumSquares = *(int64_t*)interBuf->buf; int64_t sumSquares = *(int64_t*)interBuf->buf;
for (int32_t i = 0; i < block->numOfCols; ++i) { for (int32_t i = 0; i < block->numOfCols; ++i) {
for (int32_t j = 0; j < block->numOfRows; ++i) { for (int32_t j = 0; j < block->numOfRows; ++j) {
SUdfColumn* col = block->udfCols[i]; SUdfColumn* col = block->udfCols[i];
//TODO: check the bitmap for null value //TODO: check the bitmap for null value
int32_t* rows = (int32_t*)col->colData.fixLenCol.data; int32_t* rows = (int32_t*)col->colData.fixLenCol.data;
@ -35,7 +35,7 @@ int32_t udf2(SUdfDataBlock* block, SUdfInterBuf *interBuf, SUdfInterBuf *newInte
} }
} }
*(int64_t*)newInterBuf = sumSquares; *(int64_t*)(newInterBuf->buf) = sumSquares;
newInterBuf->bufLen = sizeof(int64_t); newInterBuf->bufLen = sizeof(int64_t);
//TODO: if all null value, numOfResult = 0; //TODO: if all null value, numOfResult = 0;
newInterBuf->numOfResult = 1; newInterBuf->numOfResult = 1;

View File

@ -154,6 +154,7 @@ static SNode* logicConditionNodeCopy(const SLogicConditionNode* pSrc, SLogicCond
} }
static SNode* functionNodeCopy(const SFunctionNode* pSrc, SFunctionNode* pDst) { static SNode* functionNodeCopy(const SFunctionNode* pSrc, SFunctionNode* pDst) {
COPY_ALL_SCALAR_FIELDS;
exprNodeCopy((const SExprNode*)pSrc, (SExprNode*)pDst); exprNodeCopy((const SExprNode*)pSrc, (SExprNode*)pDst);
COPY_CHAR_ARRAY_FIELD(functionName); COPY_CHAR_ARRAY_FIELD(functionName);
COPY_SCALAR_FIELD(funcId); COPY_SCALAR_FIELD(funcId);

View File

@ -2077,6 +2077,7 @@ static const char* jkFunctionName = "Name";
static const char* jkFunctionId = "Id"; static const char* jkFunctionId = "Id";
static const char* jkFunctionType = "Type"; static const char* jkFunctionType = "Type";
static const char* jkFunctionParameter = "Parameters"; static const char* jkFunctionParameter = "Parameters";
static const char* jkFunctionUdfBufSize = "UdfBufSize";
static int32_t functionNodeToJson(const void* pObj, SJson* pJson) { static int32_t functionNodeToJson(const void* pObj, SJson* pJson) {
const SFunctionNode* pNode = (const SFunctionNode*)pObj; const SFunctionNode* pNode = (const SFunctionNode*)pObj;
@ -2094,6 +2095,9 @@ static int32_t functionNodeToJson(const void* pObj, SJson* pJson) {
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
code = nodeListToJson(pJson, jkFunctionParameter, pNode->pParameterList); code = nodeListToJson(pJson, jkFunctionParameter, pNode->pParameterList);
} }
if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddIntegerToObject(pJson, jkFunctionUdfBufSize, pNode->udfBufSize);
}
return code; return code;
} }
@ -2114,6 +2118,9 @@ static int32_t jsonToFunctionNode(const SJson* pJson, void* pObj) {
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
code = jsonToNodeList(pJson, jkFunctionParameter, &pNode->pParameterList); code = jsonToNodeList(pJson, jkFunctionParameter, &pNode->pParameterList);
} }
if (TSDB_CODE_SUCCESS == code) {
code = tjsonGetIntValue(pJson, jkFunctionUdfBufSize, &pNode->udfBufSize);
}
return code; return code;
} }

View File

@ -586,6 +586,7 @@ SNode* createSelectStmt(SAstCreateContext* pCxt, bool isDistinct, SNodeList* pPr
select->pProjectionList = pProjectionList; select->pProjectionList = pProjectionList;
select->pFromTable = pTable; select->pFromTable = pTable;
sprintf(select->stmtName, "%p", select); sprintf(select->stmtName, "%p", select);
select->isTimeOrderQuery = true;
return (SNode*)select; return (SNode*)select;
} }

View File

@ -319,7 +319,7 @@ static void setColumnInfoByExpr(const STableNode* pTable, SExprNode* pExpr, SCol
pCol->node.resType = pExpr->resType; pCol->node.resType = pExpr->resType;
} }
static int32_t createColumnNodeByTable(STranslateContext* pCxt, const STableNode* pTable, SNodeList* pList) { static int32_t createColumnsByTable(STranslateContext* pCxt, const STableNode* pTable, SNodeList* pList) {
if (QUERY_NODE_REAL_TABLE == nodeType(pTable)) { if (QUERY_NODE_REAL_TABLE == nodeType(pTable)) {
const STableMeta* pMeta = ((SRealTableNode*)pTable)->pMeta; const STableMeta* pMeta = ((SRealTableNode*)pTable)->pMeta;
int32_t nums = int32_t nums =
@ -643,6 +643,7 @@ static EDealRes translateFunction(STranslateContext* pCxt, SFunctionNode* pFunc)
return generateDealNodeErrMsg(pCxt, TSDB_CODE_PAR_AGG_FUNC_NESTING); return generateDealNodeErrMsg(pCxt, TSDB_CODE_PAR_AGG_FUNC_NESTING);
} }
pCxt->pCurrStmt->hasAggFuncs = true; pCxt->pCurrStmt->hasAggFuncs = true;
pCxt->pCurrStmt->isTimeOrderQuery = false;
} }
return DEAL_RES_CONTINUE; return DEAL_RES_CONTINUE;
@ -939,7 +940,7 @@ static int32_t createAllColumns(STranslateContext* pCxt, SNodeList** pCols) {
size_t nums = taosArrayGetSize(pTables); size_t nums = taosArrayGetSize(pTables);
for (size_t i = 0; i < nums; ++i) { for (size_t i = 0; i < nums; ++i) {
STableNode* pTable = taosArrayGetP(pTables, i); STableNode* pTable = taosArrayGetP(pTables, i);
int32_t code = createColumnNodeByTable(pCxt, pTable, *pCols); int32_t code = createColumnsByTable(pCxt, pTable, *pCols);
if (TSDB_CODE_SUCCESS != code) { if (TSDB_CODE_SUCCESS != code) {
return code; return code;
} }
@ -1011,7 +1012,7 @@ static int32_t createTableAllCols(STranslateContext* pCxt, SColumnNode* pCol, SN
} }
} }
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
code = createColumnNodeByTable(pCxt, pTable, *pOutput); code = createColumnsByTable(pCxt, pTable, *pOutput);
} }
return code; return code;
} }
@ -1093,52 +1094,33 @@ static int32_t createMultiResFuncsFromStar(STranslateContext* pCxt, SFunctionNod
return code; return code;
} }
static bool isCountStar(SNode* pNode) {
if (QUERY_NODE_FUNCTION != nodeType(pNode) || 1 != LIST_LENGTH(((SFunctionNode*)pNode)->pParameterList)) {
return false;
}
SNode* pPara = nodesListGetNode(((SFunctionNode*)pNode)->pParameterList, 0);
return (QUERY_NODE_COLUMN == nodeType(pPara) && 0 == strcmp(((SColumnNode*)pPara)->colName, "*"));
}
static int32_t rewriteCountStar(STranslateContext* pCxt, SFunctionNode* pCount) {
SColumnNode* pCol = nodesListGetNode(pCount->pParameterList, 0);
STableNode* pTable = NULL;
int32_t code = findTable(pCxt, ('\0' == pCol->tableAlias[0] ? NULL : pCol->tableAlias), &pTable);
if (TSDB_CODE_SUCCESS == code && QUERY_NODE_REAL_TABLE == nodeType(pTable)) {
setColumnInfoBySchema((SRealTableNode*)pTable, ((SRealTableNode*)pTable)->pMeta->schema, false, pCol);
}
return code;
}
static int32_t translateStar(STranslateContext* pCxt, SSelectStmt* pSelect) { static int32_t translateStar(STranslateContext* pCxt, SSelectStmt* pSelect) {
if (NULL == pSelect->pProjectionList) { // select * ... if (NULL == pSelect->pProjectionList) { // select * ...
return createAllColumns(pCxt, &pSelect->pProjectionList); return createAllColumns(pCxt, &pSelect->pProjectionList);
} else { } else {
SNode* pNode = NULL; SNode* pNode = NULL;
WHERE_EACH(pNode, pSelect->pProjectionList) { WHERE_EACH(pNode, pSelect->pProjectionList) {
int32_t code = TSDB_CODE_SUCCESS;
if (isMultiResFunc(pNode)) { if (isMultiResFunc(pNode)) {
SNodeList* pFuncs = NULL; SNodeList* pFuncs = NULL;
if (TSDB_CODE_SUCCESS != createMultiResFuncsFromStar(pCxt, (SFunctionNode*)pNode, &pFuncs)) { code = createMultiResFuncsFromStar(pCxt, (SFunctionNode*)pNode, &pFuncs);
return TSDB_CODE_OUT_OF_MEMORY; if (TSDB_CODE_SUCCESS == code) {
INSERT_LIST(pSelect->pProjectionList, pFuncs);
ERASE_NODE(pSelect->pProjectionList);
continue;
} }
INSERT_LIST(pSelect->pProjectionList, pFuncs);
ERASE_NODE(pSelect->pProjectionList);
continue;
} else if (isTableStar(pNode)) { } else if (isTableStar(pNode)) {
SNodeList* pCols = NULL; SNodeList* pCols = NULL;
if (TSDB_CODE_SUCCESS != createTableAllCols(pCxt, (SColumnNode*)pNode, &pCols)) { code = createTableAllCols(pCxt, (SColumnNode*)pNode, &pCols);
return TSDB_CODE_OUT_OF_MEMORY; if (TSDB_CODE_SUCCESS == code) {
} INSERT_LIST(pSelect->pProjectionList, pCols);
INSERT_LIST(pSelect->pProjectionList, pCols); ERASE_NODE(pSelect->pProjectionList);
ERASE_NODE(pSelect->pProjectionList); continue;
continue;
} else if (isCountStar(pNode)) {
int32_t code = rewriteCountStar(pCxt, (SFunctionNode*)pNode);
if (TSDB_CODE_SUCCESS != code) {
return code;
} }
} }
if (TSDB_CODE_SUCCESS != code) {
return code;
}
WHERE_NEXT; WHERE_NEXT;
} }
} }
@ -1254,12 +1236,14 @@ static int32_t translateGroupBy(STranslateContext* pCxt, SSelectStmt* pSelect) {
if (NULL != pSelect->pGroupByList && NULL != pSelect->pWindow) { if (NULL != pSelect->pGroupByList && NULL != pSelect->pWindow) {
return generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_GROUPBY_WINDOW_COEXIST); return generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_GROUPBY_WINDOW_COEXIST);
} }
pCxt->currClause = SQL_CLAUSE_GROUP_BY; if (NULL != pSelect->pGroupByList) {
return translateExprList(pCxt, pSelect->pGroupByList); pCxt->currClause = SQL_CLAUSE_GROUP_BY;
pSelect->isTimeOrderQuery = false;
return translateExprList(pCxt, pSelect->pGroupByList);
}
return TSDB_CODE_SUCCESS;
} }
static bool isValTimeUnit(char unit) { return ('n' == unit || 'y' == unit); }
static int64_t getMonthsFromTimeVal(int64_t val, int32_t fromPrecision, char unit) { static int64_t getMonthsFromTimeVal(int64_t val, int32_t fromPrecision, char unit) {
int64_t days = convertTimeFromPrecisionToUnit(val, fromPrecision, 'd'); int64_t days = convertTimeFromPrecisionToUnit(val, fromPrecision, 'd');
switch (unit) { switch (unit) {
@ -1286,7 +1270,7 @@ static int32_t checkIntervalWindow(STranslateContext* pCxt, SIntervalWindowNode*
uint8_t precision = ((SColumnNode*)pInterval->pCol)->node.resType.precision; uint8_t precision = ((SColumnNode*)pInterval->pCol)->node.resType.precision;
SValueNode* pInter = (SValueNode*)pInterval->pInterval; SValueNode* pInter = (SValueNode*)pInterval->pInterval;
bool valInter = isValTimeUnit(pInter->unit); bool valInter = TIME_IS_VAR_DURATION(pInter->unit);
if (pInter->datum.i <= 0 || if (pInter->datum.i <= 0 ||
(!valInter && convertTimePrecision(pInter->datum.i, precision, TSDB_TIME_PRECISION_MICRO) < tsMinIntervalTime)) { (!valInter && convertTimePrecision(pInter->datum.i, precision, TSDB_TIME_PRECISION_MICRO) < tsMinIntervalTime)) {
return generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_INTER_VALUE_TOO_SMALL, tsMinIntervalTime); return generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_INTER_VALUE_TOO_SMALL, tsMinIntervalTime);
@ -1300,7 +1284,7 @@ static int32_t checkIntervalWindow(STranslateContext* pCxt, SIntervalWindowNode*
if (pInter->unit == 'n' && pOffset->unit == 'y') { if (pInter->unit == 'n' && pOffset->unit == 'y') {
return generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_INTER_OFFSET_UNIT); return generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_INTER_OFFSET_UNIT);
} }
bool fixed = !isValTimeUnit(pOffset->unit) && !valInter; bool fixed = !TIME_IS_VAR_DURATION(pOffset->unit) && !valInter;
if ((fixed && pOffset->datum.i >= pInter->datum.i) || if ((fixed && pOffset->datum.i >= pInter->datum.i) ||
(!fixed && getMonthsFromTimeVal(pOffset->datum.i, precision, pOffset->unit) >= (!fixed && getMonthsFromTimeVal(pOffset->datum.i, precision, pOffset->unit) >=
getMonthsFromTimeVal(pInter->datum.i, precision, pInter->unit))) { getMonthsFromTimeVal(pInter->datum.i, precision, pInter->unit))) {
@ -1312,7 +1296,7 @@ static int32_t checkIntervalWindow(STranslateContext* pCxt, SIntervalWindowNode*
const static int32_t INTERVAL_SLIDING_FACTOR = 100; const static int32_t INTERVAL_SLIDING_FACTOR = 100;
SValueNode* pSliding = (SValueNode*)pInterval->pSliding; SValueNode* pSliding = (SValueNode*)pInterval->pSliding;
if (pInter->unit == 'n' || pInter->unit == 'y') { if (TIME_IS_VAR_DURATION(pSliding->unit)) {
return generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_INTER_SLIDING_UNIT); return generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_INTER_SLIDING_UNIT);
} }
if ((pSliding->datum.i < convertTimePrecision(tsMinSlidingTime, TSDB_TIME_PRECISION_MILLI, precision)) || if ((pSliding->datum.i < convertTimePrecision(tsMinSlidingTime, TSDB_TIME_PRECISION_MILLI, precision)) ||
@ -1419,6 +1403,78 @@ static int32_t checkLimit(STranslateContext* pCxt, SSelectStmt* pSelect) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
static bool isCountStar(SFunctionNode* pFunc) {
if (1 != LIST_LENGTH(pFunc->pParameterList)) {
return false;
}
SNode* pPara = nodesListGetNode(pFunc->pParameterList, 0);
return (QUERY_NODE_COLUMN == nodeType(pPara) && 0 == strcmp(((SColumnNode*)pPara)->colName, "*"));
}
// count(*) is rewritten as count(ts) for scannning optimization
static int32_t rewriteCountStar(STranslateContext* pCxt, SFunctionNode* pCount) {
SColumnNode* pCol = nodesListGetNode(pCount->pParameterList, 0);
STableNode* pTable = NULL;
int32_t code = findTable(pCxt, ('\0' == pCol->tableAlias[0] ? NULL : pCol->tableAlias), &pTable);
if (TSDB_CODE_SUCCESS == code && QUERY_NODE_REAL_TABLE == nodeType(pTable)) {
setColumnInfoBySchema((SRealTableNode*)pTable, ((SRealTableNode*)pTable)->pMeta->schema, false, pCol);
}
return code;
}
static int32_t createPrimaryKeyColByTable(STranslateContext* pCxt, STableNode* pTable, SNode** pPrimaryKey) {
SColumnNode* pCol = nodesMakeNode(QUERY_NODE_COLUMN);
if (NULL == pCol) {
return TSDB_CODE_OUT_OF_MEMORY;
}
if (QUERY_NODE_REAL_TABLE == nodeType(pTable)) {
setColumnInfoBySchema((SRealTableNode*)pTable, ((SRealTableNode*)pTable)->pMeta->schema, false, pCol);
} else {
// todo
}
*pPrimaryKey = (SNode*)pCol;
return TSDB_CODE_SUCCESS;
}
static int32_t createPrimaryKeyCol(STranslateContext* pCxt, SNode** pPrimaryKey) {
STableNode* pTable = NULL;
int32_t code = findTable(pCxt, NULL, &pTable);
if (TSDB_CODE_SUCCESS == code) {
code = createPrimaryKeyColByTable(pCxt, pTable, pPrimaryKey);
}
return code;
}
static int32_t rewriteTimelineFunc(STranslateContext* pCxt, SFunctionNode* pFunc) {
SNode* pPrimaryKey = NULL;
int32_t code = createPrimaryKeyCol(pCxt, &pPrimaryKey);
if (TSDB_CODE_SUCCESS == code) {
code = nodesListMakeStrictAppend(&pFunc->pParameterList, pPrimaryKey);
}
return code;
}
EDealRes rewriteFuncForSelectImpl(SNode* pNode, void* pContext) {
if (QUERY_NODE_FUNCTION == nodeType(pNode)) {
STranslateContext* pCxt = pContext;
SFunctionNode* pFunc = (SFunctionNode*)pNode;
if (isCountStar(pFunc)) {
pCxt->errCode = rewriteCountStar(pCxt, pFunc);
} else if (fmIsTimelineFunc(pFunc->funcId)) {
pCxt->errCode = rewriteTimelineFunc(pCxt, pFunc);
}
if (TSDB_CODE_SUCCESS != pCxt->errCode) {
return DEAL_RES_ERROR;
}
}
return DEAL_RES_CONTINUE;
}
static int32_t rewriteFuncForSelect(STranslateContext* pCxt, SSelectStmt* pSelect) {
nodesWalkSelectStmt(pSelect, SQL_CLAUSE_FROM, rewriteFuncForSelectImpl, pCxt);
return pCxt->errCode;
}
static int32_t translateSelect(STranslateContext* pCxt, SSelectStmt* pSelect) { static int32_t translateSelect(STranslateContext* pCxt, SSelectStmt* pSelect) {
pCxt->pCurrStmt = pSelect; pCxt->pCurrStmt = pSelect;
int32_t code = translateFrom(pCxt, pSelect); int32_t code = translateFrom(pCxt, pSelect);
@ -1449,6 +1505,9 @@ static int32_t translateSelect(STranslateContext* pCxt, SSelectStmt* pSelect) {
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
code = checkLimit(pCxt, pSelect); code = checkLimit(pCxt, pSelect);
} }
if (TSDB_CODE_SUCCESS == code) {
code = rewriteFuncForSelect(pCxt, pSelect);
}
return code; return code;
} }
@ -1703,13 +1762,17 @@ static int32_t checkDbRetentionsOption(STranslateContext* pCxt, SNodeList* pRete
return generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_RETENTIONS_OPTION); return generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_RETENTIONS_OPTION);
} }
SNode* pNode = NULL; SNode* pRetention = NULL;
FOREACH(pNode, pRetentions) { FOREACH(pRetention, pRetentions) {
SNode* pVal = NULL; SNode* pNode = NULL;
FOREACH(pVal, ((SNodeListNode*)pNode)->pNodeList) { FOREACH(pNode, ((SNodeListNode*)pRetention)->pNodeList) {
if (DEAL_RES_ERROR == translateValue(pCxt, (SValueNode*)pVal)) { SValueNode* pVal = (SValueNode*)pNode;
if (DEAL_RES_ERROR == translateValue(pCxt, pVal)) {
return pCxt->errCode; return pCxt->errCode;
} }
if (!TIME_IS_VAR_DURATION(pVal->unit)) {
pVal->datum.i = convertTimeFromPrecisionToUnit(pVal->datum.i, pVal->node.resType.precision, pVal->unit);
}
} }
} }

View File

@ -80,6 +80,18 @@ TEST_F(ParserSelectTest, multiResFunc) {
run("select last(t2.*), first(t1.c1, t2.*), last_row(t1.*, t2.*) from st1s1 t1, st1s2 t2 where t1.ts = t2.ts"); run("select last(t2.*), first(t1.c1, t2.*), last_row(t1.*, t2.*) from st1s1 t1, st1s2 t2 where t1.ts = t2.ts");
} }
TEST_F(ParserSelectTest, timelineFunc) {
useDb("root", "test");
run("select last(*), first(*) from t1");
run("select last(*), first(*) from t1 group by c1");
run("select last(*), first(*) from t1 interval(10s)");
run("select diff(c1) from t1");
}
TEST_F(ParserSelectTest, clause) { TEST_F(ParserSelectTest, clause) {
useDb("root", "test"); useDb("root", "test");

View File

@ -23,9 +23,9 @@ class PlanBasicTest : public PlannerTestBase {};
TEST_F(PlanBasicTest, select) { TEST_F(PlanBasicTest, select) {
useDb("root", "test"); useDb("root", "test");
// run("select * from t1"); run("select * from t1");
// run("select 1 from t1"); run("select 1 from t1");
// run("select * from st1"); run("select * from st1");
run("select 1 from st1"); run("select 1 from st1");
} }
@ -40,4 +40,10 @@ TEST_F(PlanBasicTest, join) {
run("select t1.c1, t2.c2 from st1s1 t1, st1s2 t2 where t1.ts = t2.ts"); run("select t1.c1, t2.c2 from st1s1 t1, st1s2 t2 where t1.ts = t2.ts");
run("select t1.c1, t2.c2 from st1s1 t1 join st1s2 t2 on t1.ts = t2.ts"); run("select t1.c1, t2.c2 from st1s1 t1 join st1s2 t2 on t1.ts = t2.ts");
} }
TEST_F(PlanBasicTest, func) {
useDb("root", "test");
run("select diff(c1) from t1");
}

View File

@ -42,3 +42,11 @@ TEST_F(PlanGroupByTest, withOrderBy) {
// order by alias of aggfunc // order by alias of aggfunc
// run("select count(*), sum(c1) a from t1 order by a"); // run("select count(*), sum(c1) a from t1 order by a");
} }
TEST_F(PlanGroupByTest, aggFunc) {
useDb("root", "test");
run("select last(*), first(*) from t1");
run("select last(*), first(*) from t1 group by c1");
}

View File

@ -20,6 +20,7 @@
extern "C" { extern "C" {
#endif #endif
#include "osDef.h"
#include "qworker.h" #include "qworker.h"
#include "tlockfree.h" #include "tlockfree.h"
#include "ttimer.h" #include "ttimer.h"
@ -301,9 +302,9 @@ typedef struct SQWorkerMgmt {
extern SQWorkerMgmt gQwMgmt; extern SQWorkerMgmt gQwMgmt;
FORCE_INLINE SQWorker *qwAcquire(int64_t refId) { return (SQWorker *)taosAcquireRef(atomic_load_32(&gQwMgmt.qwRef), refId); } static FORCE_INLINE SQWorker *qwAcquire(int64_t refId) { return (SQWorker *)taosAcquireRef(atomic_load_32(&gQwMgmt.qwRef), refId); }
FORCE_INLINE int32_t qwRelease(int64_t refId) { return taosReleaseRef(gQwMgmt.qwRef, refId); } static FORCE_INLINE int32_t qwRelease(int64_t refId) { return taosReleaseRef(gQwMgmt.qwRef, refId); }
#ifdef __cplusplus #ifdef __cplusplus

View File

@ -28,6 +28,7 @@ static int32_t streamBuildDispatchMsg(SStreamTask* pTask, SArray* data, SRpcMsg*
if (buf == NULL) { if (buf == NULL) {
return -1; return -1;
} }
if (pTask->dispatchType == TASK_DISPATCH__INPLACE) { if (pTask->dispatchType == TASK_DISPATCH__INPLACE) {
((SMsgHead*)buf)->vgId = 0; ((SMsgHead*)buf)->vgId = 0;
req.taskId = pTask->inplaceDispatcher.taskId; req.taskId = pTask->inplaceDispatcher.taskId;