Merge branch '3.0' of github.com:taosdata/TDengine into 3.0

This commit is contained in:
plum-lihui 2022-05-12 16:36:42 +08:00
commit 4059beac26
32 changed files with 340 additions and 74 deletions

View File

@ -32,6 +32,11 @@ static void msg_process(TAOS_RES* msg) {
int32_t numOfFields = taos_field_count(msg);
taos_print_row(buf, row, fields, numOfFields);
printf("%s\n", buf);
const char* tbName = tmq_get_table_name(msg);
if (tbName) {
printf("from tb: %s\n", tbName);
}
}
}
@ -101,8 +106,8 @@ int32_t create_topic() {
}
taos_free_result(pRes);
pRes = taos_query(pConn, "create topic topic_ctb_column as abc1");
/*pRes = taos_query(pConn, "create topic topic_ctb_column as select ts, c1, c2, c3 from ct1");*/
/*pRes = taos_query(pConn, "create topic topic_ctb_column as abc1");*/
pRes = taos_query(pConn, "create topic topic_ctb_column with table as select ts, c1, c2, c3 from st1");
if (taos_errno(pRes) != 0) {
printf("failed to create topic topic_ctb_column, reason:%s\n", taos_errstr(pRes));
return -1;

View File

@ -257,10 +257,7 @@ DLL_EXPORT void tmq_conf_set_offset_commit_cb(tmq_conf_t *conf, tmq_co
DLL_EXPORT const char *tmq_get_topic_name(TAOS_RES *res);
DLL_EXPORT int32_t tmq_get_vgroup_id(TAOS_RES *res);
// TODO
#if 0
DLL_EXPORT char *tmq_get_table_name(TAOS_RES *res);
#endif
DLL_EXPORT const char *tmq_get_table_name(TAOS_RES *res);
#if 0
DLL_EXPORT int64_t tmq_get_request_offset(tmq_message_t *message);

View File

@ -252,6 +252,7 @@ STSRow* tGetSubmitBlkNext(SSubmitBlkIter* pIter);
int32_t tPrintFixedSchemaSubmitReq(const SSubmitReq* pReq, STSchema* pSchema);
typedef struct {
int32_t code;
int8_t hashMeta;
int64_t uid;
char* tblFName;
@ -271,7 +272,7 @@ typedef struct {
int32_t tEncodeSSubmitRsp(SEncoder* pEncoder, const SSubmitRsp* pRsp);
int32_t tDecodeSSubmitRsp(SDecoder* pDecoder, SSubmitRsp* pRsp);
void tFreeSSubmitRsp(SSubmitRsp *pRsp);
void tFreeSSubmitRsp(SSubmitRsp* pRsp);
#define COL_SMA_ON ((int8_t)0x1)
#define COL_IDX_ON ((int8_t)0x2)
@ -2489,6 +2490,10 @@ static FORCE_INLINE int32_t tEncodeSMqDataBlkRsp(void** buf, const SMqDataBlkRsp
SSchemaWrapper* pSW = (SSchemaWrapper*)taosArrayGetP(pRsp->blockSchema, i);
tlen += taosEncodeSSchemaWrapper(buf, pSW);
}
if (pRsp->withTbName) {
char* tbName = (char*)taosArrayGetP(pRsp->blockTbName, i);
tlen += taosEncodeString(buf, tbName);
}
}
}
return tlen;
@ -2501,6 +2506,7 @@ static FORCE_INLINE void* tDecodeSMqDataBlkRsp(const void* buf, SMqDataBlkRsp* p
buf = taosDecodeFixedI32(buf, &pRsp->blockNum);
pRsp->blockData = taosArrayInit(pRsp->blockNum, sizeof(void*));
pRsp->blockDataLen = taosArrayInit(pRsp->blockNum, sizeof(void*));
pRsp->blockTbName = taosArrayInit(pRsp->blockNum, sizeof(void*));
pRsp->blockSchema = taosArrayInit(pRsp->blockNum, sizeof(void*));
if (pRsp->blockNum != 0) {
buf = taosDecodeFixedI8(buf, &pRsp->withTbName);
@ -2519,6 +2525,11 @@ static FORCE_INLINE void* tDecodeSMqDataBlkRsp(const void* buf, SMqDataBlkRsp* p
buf = taosDecodeSSchemaWrapper(buf, pSW);
taosArrayPush(pRsp->blockSchema, &pSW);
}
if (pRsp->withTbName) {
char* name = NULL;
buf = taosDecodeString(buf, &name);
taosArrayPush(pRsp->blockTbName, &name);
}
}
}
return (void*)buf;

View File

@ -89,6 +89,7 @@ typedef struct SUdfColumnData {
typedef struct SUdfColumn {
SUdfColumnMeta colMeta;
bool hasNull;
SUdfColumnData colData;
} SUdfColumn;
@ -232,6 +233,7 @@ static FORCE_INLINE void udfColDataSetNull(SUdfColumn* pColumn, int32_t row) {
} else {
udfColDataSetNull_f(pColumn, row);
}
pColumn->hasNull = true;
}
static FORCE_INLINE int32_t udfColDataSet(SUdfColumn* pColumn, uint32_t currentRow, const char* pData, bool isNull) {

View File

@ -59,6 +59,21 @@ static FORCE_INLINE void *taosDecodeFixedI8(const void *buf, int8_t *value) {
static FORCE_INLINE void *taosSkipFixedLen(const void *buf, size_t len) { return POINTER_SHIFT(buf, len); }
// --- Bool
static FORCE_INLINE int32_t taosEncodeFixedBool(void **buf, bool value) {
if (buf != NULL) {
((int8_t *)(*buf))[0] = value ? 1 : 0;
*buf = POINTER_SHIFT(*buf, sizeof(int8_t));
}
return (int32_t)sizeof(int8_t);
}
static FORCE_INLINE void *taosDecodeFixedBool(const void *buf, bool *value) {
*value = ((int8_t *)buf)[0] == 0 ? false : true;
return POINTER_SHIFT(buf, sizeof(int8_t));
}
// ---- Fixed U16
static FORCE_INLINE int32_t taosEncodeFixedU16(void **buf, uint16_t value) {
if (buf != NULL) {

View File

@ -1346,3 +1346,16 @@ int32_t tmq_get_vgroup_id(TAOS_RES* res) {
return -1;
}
}
const char* tmq_get_table_name(TAOS_RES* res) {
if (TD_RES_TMQ(res)) {
SMqRspObj* pRspObj = (SMqRspObj*)res;
if (!pRspObj->rsp.withTbName || pRspObj->rsp.blockTbName == NULL || pRspObj->resIter < 0 ||
pRspObj->resIter >= pRspObj->rsp.blockNum) {
return NULL;
}
const char* name = taosArrayGetP(pRspObj->rsp.blockTbName, pRspObj->resIter);
return name;
}
return NULL;
}

View File

@ -1311,6 +1311,7 @@ int32_t tEncodeDataBlock(void** buf, const SSDataBlock* pBlock) {
tlen += taosEncodeFixedI16(buf, pColData->info.colId);
tlen += taosEncodeFixedI16(buf, pColData->info.type);
tlen += taosEncodeFixedI32(buf, pColData->info.bytes);
tlen += taosEncodeFixedBool(buf, pColData->hasNull);
if (IS_VAR_DATA_TYPE(pColData->info.type)) {
tlen += taosEncodeBinary(buf, pColData->varmeta.offset, sizeof(int32_t) * rows);
@ -1340,6 +1341,7 @@ void* tDecodeDataBlock(const void* buf, SSDataBlock* pBlock) {
buf = taosDecodeFixedI16(buf, &data.info.colId);
buf = taosDecodeFixedI16(buf, &data.info.type);
buf = taosDecodeFixedI32(buf, &data.info.bytes);
buf = taosDecodeFixedBool(buf, &data.hasNull);
if (IS_VAR_DATA_TYPE(data.info.type)) {
buf = taosDecodeBinary(buf, (void**)&data.varmeta.offset, pBlock->info.rows * sizeof(int32_t));

View File

@ -4032,6 +4032,7 @@ int32_t tDecodeSVSubmitReq(SDecoder *pCoder, SVSubmitReq *pReq) {
static int32_t tEncodeSSubmitBlkRsp(SEncoder *pEncoder, const SSubmitBlkRsp *pBlock) {
if (tStartEncode(pEncoder) < 0) return -1;
if (tEncodeI32(pEncoder, pBlock->code) < 0) return -1;
if (tEncodeI8(pEncoder, pBlock->hashMeta) < 0) return -1;
if (pBlock->hashMeta) {
if (tEncodeI64(pEncoder, pBlock->uid) < 0) return -1;
@ -4047,10 +4048,11 @@ static int32_t tEncodeSSubmitBlkRsp(SEncoder *pEncoder, const SSubmitBlkRsp *pBl
static int32_t tDecodeSSubmitBlkRsp(SDecoder *pDecoder, SSubmitBlkRsp *pBlock) {
if (tStartDecode(pDecoder) < 0) return -1;
if (tDecodeI32(pDecoder, &pBlock->code) < 0) return -1;
if (tDecodeI8(pDecoder, &pBlock->hashMeta) < 0) return -1;
if (pBlock->hashMeta) {
if (tDecodeI64(pDecoder, &pBlock->uid) < 0) return -1;
pBlock->tblFName= taosMemoryCalloc(TSDB_TABLE_FNAME_LEN, 1);
pBlock->tblFName = taosMemoryCalloc(TSDB_TABLE_FNAME_LEN, 1);
if (NULL == pBlock->tblFName) return -1;
if (tDecodeCStrTo(pDecoder, pBlock->tblFName) < 0) return -1;
}
@ -4089,7 +4091,7 @@ int32_t tDecodeSSubmitRsp(SDecoder *pDecoder, SSubmitRsp *pRsp) {
if (tDecodeSSubmitBlkRsp(pDecoder, pRsp->pBlocks + iBlock) < 0) return -1;
}
tEndDecode(pDecoder);
tEndDecode(pDecoder);
tDecoderClear(pDecoder);
return 0;
}
@ -4108,4 +4110,3 @@ void tFreeSSubmitRsp(SSubmitRsp *pRsp) {
taosMemoryFree(pRsp);
}

View File

@ -318,6 +318,7 @@ static int32_t mndStbActionUpdate(SSdb *pSdb, SStbObj *pOld, SStbObj *pNew) {
pOld->updateTime = pNew->updateTime;
pOld->version = pNew->version;
pOld->nextColId = pNew->nextColId;
pOld->ttl = pNew->ttl;
pOld->numOfColumns = pNew->numOfColumns;
pOld->numOfTags = pNew->numOfTags;
memcpy(pOld->pColumns, pNew->pColumns, pOld->numOfColumns * sizeof(SSchema));
@ -832,7 +833,7 @@ static int32_t mndProcessVCreateStbRsp(SNodeMsg *pRsp) {
}
static int32_t mndCheckAlterStbReq(SMAlterStbReq *pAlter) {
if (pAlter->commentLen != 0) return 0;
if (pAlter->commentLen != 0 || pAlter->ttl != 0) return 0;
if (pAlter->numOfFields < 1 || pAlter->numOfFields != (int32_t)taosArrayGetSize(pAlter->pFields)) {
terrno = TSDB_CODE_MND_INVALID_STB_OPTION;
@ -883,7 +884,8 @@ static int32_t mndAllocStbSchemas(const SStbObj *pOld, SStbObj *pNew) {
return 0;
}
static int32_t mndUpdateStbComment(const SStbObj *pOld, SStbObj *pNew, char *pComment, int32_t commentLen) {
static int32_t mndUpdateStbCommentAndTTL(const SStbObj *pOld, SStbObj *pNew, char *pComment, int32_t commentLen,
int32_t ttl) {
if (commentLen > 0) {
pNew->commentLen = commentLen;
pNew->comment = taosMemoryCalloc(1, commentLen);
@ -893,6 +895,9 @@ static int32_t mndUpdateStbComment(const SStbObj *pOld, SStbObj *pNew, char *pCo
}
memcpy(pNew->comment, pComment, commentLen);
}
if (ttl >= 0) {
pNew->ttl = ttl;
}
if (mndAllocStbSchemas(pOld, pNew) != 0) {
return -1;
@ -1232,7 +1237,7 @@ static int32_t mndAlterStb(SMnode *pMnode, SNodeMsg *pReq, const SMAlterStbReq *
code = mndAlterStbColumnBytes(pOld, &stbObj, pField0);
break;
case TSDB_ALTER_TABLE_UPDATE_OPTIONS:
code = mndUpdateStbComment(pOld, &stbObj, pAlter->comment, pAlter->commentLen);
code = mndUpdateStbCommentAndTTL(pOld, &stbObj, pAlter->comment, pAlter->commentLen, pAlter->ttl);
break;
default:
terrno = TSDB_CODE_OPS_NOT_SUPPORT;
@ -1723,7 +1728,7 @@ static int32_t mndRetrieveStb(SNodeMsg *pReq, SShowObj *pShow, SSDataBlock *pBlo
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataAppend(pColInfo, numOfRows, (const char *)&pStb->updateTime, false); // number of tables
char *p = taosMemoryMalloc(pStb->commentLen + VARSTR_HEADER_SIZE); // check malloc failures
char *p = taosMemoryCalloc(1, pStb->commentLen + 1 + VARSTR_HEADER_SIZE); // check malloc failures
if (p != NULL) {
if (pStb->commentLen != 0) {
STR_TO_VARSTR(p, pStb->comment);

View File

@ -297,8 +297,8 @@ static int32_t mndCreateTopic(SMnode *pMnode, SNodeMsg *pReq, SCMCreateTopicReq
topicObj.ast = strdup(pCreate->ast);
topicObj.astLen = strlen(pCreate->ast) + 1;
topicObj.subType = TOPIC_SUB_TYPE__TABLE;
topicObj.withTbName = 0;
topicObj.withSchema = 0;
topicObj.withTbName = pCreate->withTbName;
topicObj.withSchema = pCreate->withSchema;
SNode *pAst = NULL;
if (nodesStringToNode(pCreate->ast, &pAst) != 0) {

View File

@ -116,10 +116,10 @@ int64_t metaSmaCursorNext(SMSmaCursor* pSmaCur);
// SMetaDB
int metaOpenDB(SMeta* pMeta);
void metaCloseDB(SMeta* pMeta);
int metaSaveTableToDB(SMeta* pMeta, STbCfg* pTbCfg, STbDdlH* pHandle);
int metaRemoveTableFromDb(SMeta* pMeta, tb_uid_t uid);
int metaSaveSmaToDB(SMeta* pMeta, STSma* pTbCfg);
int metaRemoveSmaFromDb(SMeta* pMeta, int64_t indexUid);
// int metaSaveTableToDB(SMeta* pMeta, STbCfg* pTbCfg, STbDdlH* pHandle);
int metaRemoveTableFromDb(SMeta* pMeta, tb_uid_t uid);
int metaSaveSmaToDB(SMeta* pMeta, STSma* pTbCfg);
int metaRemoveSmaFromDb(SMeta* pMeta, int64_t indexUid);
#endif
#endif
@ -128,4 +128,4 @@ int metaRemoveSmaFromDb(SMeta* pMeta, int64_t indexUid);
}
#endif
#endif /*_TD_VNODE_META_H_*/
#endif /*_TD_VNODE_META_H_*/

View File

@ -158,7 +158,9 @@ SSchemaWrapper *metaGetTableSchema(SMeta *pMeta, tb_uid_t uid, int32_t sver, boo
skmDbKey.sver = sver;
pKey = &skmDbKey;
kLen = sizeof(skmDbKey);
metaRLock(pMeta);
ret = tdbDbGet(pMeta->pSkmDb, pKey, kLen, &pVal, &vLen);
metaULock(pMeta);
if (ret < 0) {
return NULL;
}
@ -181,6 +183,7 @@ SSchemaWrapper *metaGetTableSchema(SMeta *pMeta, tb_uid_t uid, int32_t sver, boo
}
struct SMCtbCursor {
SMeta *pMeta;
TDBC *pCur;
tb_uid_t suid;
void *pKey;
@ -200,9 +203,13 @@ SMCtbCursor *metaOpenCtbCursor(SMeta *pMeta, tb_uid_t uid) {
return NULL;
}
pCtbCur->pMeta = pMeta;
pCtbCur->suid = uid;
metaRLock(pMeta);
ret = tdbDbcOpen(pMeta->pCtbIdx, &pCtbCur->pCur, NULL);
if (ret < 0) {
metaULock(pMeta);
taosMemoryFree(pCtbCur);
return NULL;
}
@ -220,6 +227,7 @@ SMCtbCursor *metaOpenCtbCursor(SMeta *pMeta, tb_uid_t uid) {
void metaCloseCtbCurosr(SMCtbCursor *pCtbCur) {
if (pCtbCur) {
if (pCtbCur->pMeta) metaULock(pCtbCur->pMeta);
if (pCtbCur->pCur) {
tdbDbcClose(pCtbCur->pCur);
@ -269,7 +277,7 @@ STSchema *metaGetTbTSchema(SMeta *pMeta, tb_uid_t uid, int32_t sver) {
pSW = metaGetTableSchema(pMeta, quid, sver, 0);
if (!pSW) return NULL;
tdInitTSchemaBuilder(&sb, 0);
for (int i = 0; i < pSW->nCols; i++) {
pSchema = pSW->pSchema + i;

View File

@ -427,9 +427,12 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) {
SMqDataBlkRsp rsp = {0};
rsp.reqOffset = pReq->currentOffset;
rsp.withSchema = pExec->withSchema;
rsp.withTbName = pExec->withTbName;
rsp.blockData = taosArrayInit(0, sizeof(void*));
rsp.blockDataLen = taosArrayInit(0, sizeof(int32_t));
rsp.blockSchema = taosArrayInit(0, sizeof(void*));
rsp.blockTbName = taosArrayInit(0, sizeof(void*));
while (1) {
consumerEpoch = atomic_load_32(&pExec->epoch);
@ -535,6 +538,18 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) {
taosArrayPush(rsp.blockSchema, &pSW);
}
if (pExec->withTbName) {
SMetaReader mr = {0};
metaReaderInit(&mr, pTq->pVnode->pMeta, 0);
int64_t uid = pExec->pExecReader[workerId]->msgIter.uid;
if (metaGetTableEntryByUid(&mr, uid) < 0) {
ASSERT(0);
}
char* tbName = strdup(mr.me.name);
taosArrayPush(rsp.blockTbName, &tbName);
metaReaderClear(&mr);
}
rsp.blockNum++;
}
// db subscribe
@ -563,6 +578,16 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) {
ASSERT(actualLen <= dataStrLen);
taosArrayPush(rsp.blockDataLen, &actualLen);
taosArrayPush(rsp.blockData, &buf);
if (pExec->withTbName) {
SMetaReader mr = {0};
metaReaderInit(&mr, pTq->pVnode->pMeta, 0);
if (metaGetTableEntryByUid(&mr, block.info.uid) < 0) {
ASSERT(0);
}
char* tbName = strdup(mr.me.name);
taosArrayPush(rsp.blockTbName, &tbName);
metaReaderClear(&mr);
}
SSchemaWrapper* pSW = tCloneSSchemaWrapper(pExec->pExecReader[workerId]->pSchemaWrapper);
taosArrayPush(rsp.blockSchema, &pSW);
@ -614,6 +639,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) {
taosArrayDestroy(rsp.blockData);
taosArrayDestroy(rsp.blockDataLen);
taosArrayDestroyP(rsp.blockSchema, (FDelete)tDeleteSSchemaWrapper);
taosArrayDestroyP(rsp.blockTbName, (FDelete)taosMemoryFree);
return 0;
}

View File

@ -70,6 +70,7 @@ static int tsdbCommitToFile(SCommitH *pCommith, SDFileSet *pSet, int fid);
static void tsdbResetCommitFile(SCommitH *pCommith);
static int tsdbSetAndOpenCommitFile(SCommitH *pCommith, SDFileSet *pSet, int fid);
static int tsdbCommitToTable(SCommitH *pCommith, int tid);
static bool tsdbCommitIsSameFile(SCommitH *pCommith, int bidx);
static int tsdbMoveBlkIdx(SCommitH *pCommith, SBlockIdx *pIdx);
static int tsdbSetCommitTable(SCommitH *pCommith, STable *pTable);
static int tsdbComparKeyBlock(const void *arg1, const void *arg2);
@ -485,8 +486,10 @@ static void tsdbDestroyCommitIters(SCommitH *pCommith) {
for (int i = 1; i < pCommith->niters; i++) {
tSkipListDestroyIter(pCommith->iters[i].pIter);
tdFreeSchema(pCommith->iters[i].pTable->pSchema);
taosMemoryFree(pCommith->iters[i].pTable);
if (pCommith->iters[i].pTable) {
tdFreeSchema(pCommith->iters[i].pTable->pSchema);
taosMemoryFreeClear(pCommith->iters[i].pTable);
}
}
taosMemoryFree(pCommith->iters);
@ -889,9 +892,11 @@ static int tsdbCommitToTable(SCommitH *pCommith, int tid) {
}
static int tsdbMoveBlkIdx(SCommitH *pCommith, SBlockIdx *pIdx) {
SReadH *pReadh = &pCommith->readh;
int nBlocks = pIdx->numOfBlocks;
int bidx = 0;
SReadH *pReadh = &pCommith->readh;
STsdb *pTsdb = TSDB_READ_REPO(pReadh);
STSchema *pTSchema = NULL;
int nBlocks = pIdx->numOfBlocks;
int bidx = 0;
tsdbResetCommitTable(pCommith);
@ -901,30 +906,49 @@ static int tsdbMoveBlkIdx(SCommitH *pCommith, SBlockIdx *pIdx) {
return -1;
}
STable table = {.tid = pIdx->uid, .uid = pIdx->uid, .pSchema = NULL};
pCommith->pTable = &table;
while (bidx < nBlocks) {
if (!pTSchema && !tsdbCommitIsSameFile(pCommith, bidx)) {
// Set commit table
pTSchema = metaGetTbTSchema(REPO_META(pTsdb), pIdx->uid, 0); // TODO: schema version
if (!pTSchema) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
table.pSchema = pTSchema;
if (tsdbSetCommitTable(pCommith, &table) < 0) {
taosMemoryFreeClear(pTSchema);
return -1;
}
}
if (tsdbMoveBlock(pCommith, bidx) < 0) {
tsdbError("vgId:%d failed to move block into file %s since %s", TSDB_COMMIT_REPO_ID(pCommith),
TSDB_FILE_FULL_NAME(TSDB_COMMIT_HEAD_FILE(pCommith)), tstrerror(terrno));
taosMemoryFreeClear(pTSchema);
return -1;
}
++bidx;
}
STable table = {.tid = pIdx->uid, .uid = pIdx->uid, .pSchema = NULL};
TSDB_COMMIT_TABLE(pCommith) = &table;
if (tsdbWriteBlockInfo(pCommith) < 0) {
tsdbError("vgId:%d failed to write SBlockInfo part into file %s since %s", TSDB_COMMIT_REPO_ID(pCommith),
TSDB_FILE_FULL_NAME(TSDB_COMMIT_HEAD_FILE(pCommith)), tstrerror(terrno));
taosMemoryFreeClear(pTSchema);
return -1;
}
taosMemoryFreeClear(pTSchema);
return 0;
}
static int tsdbSetCommitTable(SCommitH *pCommith, STable *pTable) {
STSchema *pSchema = tsdbGetTableSchemaImpl(pTable, false, false, -1);
pCommith->pTable = pTable;
if (tdInitDataCols(pCommith->pDataCols, pSchema) < 0) {
@ -1321,6 +1345,14 @@ static int tsdbMergeMemData(SCommitH *pCommith, SCommitIter *pIter, int bidx) {
return 0;
}
static bool tsdbCommitIsSameFile(SCommitH *pCommith, int bidx) {
SBlock *pBlock = pCommith->readh.pBlkInfo->blocks + bidx;
if (pBlock->last) {
return pCommith->isLFileSame;
}
return pCommith->isDFileSame;
}
static int tsdbMoveBlock(SCommitH *pCommith, int bidx) {
SBlock *pBlock = pCommith->readh.pBlkInfo->blocks + bidx;
SDFile *pDFile;

View File

@ -62,6 +62,16 @@ int tsdbMemTableCreate(STsdb *pTsdb, STsdbMemTable **ppMemTable) {
void tsdbMemTableDestroy(STsdb *pTsdb, STsdbMemTable *pMemTable) {
if (pMemTable) {
taosHashCleanup(pMemTable->pHashIdx);
SSkipListIterator *pIter = tSkipListCreateIter(pMemTable->pSlIdx);
SSkipListNode *pNode = NULL;
STbData *pTbData = NULL;
for (;;) {
if (!tSkipListIterNext(pIter)) break;
pNode = tSkipListIterGet(pIter);
pTbData = (STbData *)pNode->pData;
tsdbFreeTbData(pTbData);
}
tSkipListDestroyIter(pIter);
tSkipListDestroy(pMemTable->pSlIdx);
taosMemoryFree(pMemTable);
}
@ -300,6 +310,17 @@ int tsdbInsertTableData(STsdb *pTsdb, SSubmitMsgIter *pMsgIter, SSubmitBlk *pBlo
TSKEY keyMax;
SSubmitBlk *pBlkCopy;
// check if table exists
SMetaReader mr = {0};
SMetaEntry me = {0};
metaReaderInit(&mr, pTsdb->pVnode->pMeta, 0);
if (metaGetTableEntryByUid(&mr, pMsgIter->uid) < 0) {
metaReaderClear(&mr);
terrno = TSDB_CODE_PAR_TABLE_NOT_EXIST;
return -1;
}
metaReaderClear(&mr);
// create container is nedd
tptr = taosHashGet(pMemTable->pHashIdx, &(pMsgIter->uid), sizeof(pMsgIter->uid));
if (tptr == NULL) {

View File

@ -502,7 +502,7 @@ _exit:
return 0;
}
static int vnodeDebugPrintSingleSubmitMsg(SMeta *pMeta, SSubmitBlk *pBlock, SSubmitMsgIter *msgIter, const char *tags) {
static int vnodeDebugPrintSingleSubmitMsg(SMeta *pMeta, SSubmitBlk *pBlock, SSubmitMsgIter *msgIter, const char *tags) {
SSubmitBlkIter blkIter = {0};
STSchema *pSchema = NULL;
tb_uid_t suid = 0;
@ -544,7 +544,7 @@ static int vnodeDebugPrintSubmitMsg(SVnode *pVnode, SSubmitReq *pMsg, const char
while (true) {
if (tGetSubmitMsgNext(&msgIter, &pBlock) < 0) return -1;
if (pBlock == NULL) break;
vnodeDebugPrintSingleSubmitMsg(pMeta, pBlock, &msgIter, tags);
}
@ -595,7 +595,7 @@ static int vnodeProcessSubmitReq(SVnode *pVnode, int64_t version, void *pReq, in
if (metaCreateTable(pVnode->pMeta, version, &createTbReq) < 0) {
if (terrno != TSDB_CODE_TDB_TABLE_ALREADY_EXIST) {
pRsp->code = terrno;
submitBlkRsp.code = terrno;
tDecoderClear(&decoder);
goto _exit;
}
@ -617,8 +617,7 @@ static int vnodeProcessSubmitReq(SVnode *pVnode, int64_t version, void *pReq, in
}
if (tsdbInsertTableData(pVnode->pTsdb, &msgIter, pBlock, &submitBlkRsp) < 0) {
pRsp->code = terrno;
goto _exit;
submitBlkRsp.code = terrno;
}
submitRsp.numOfRows += submitBlkRsp.numOfRows;

View File

@ -72,6 +72,7 @@ int32_t vnodeSendMsg(void *rpcHandle, const SEpSet *pEpSet, SRpcMsg *pMsg) {
int32_t ret = 0;
SMsgCb *pMsgCb = rpcHandle;
if (pMsgCb->queueFps[SYNC_QUEUE] != NULL) {
pMsg->noResp = 1;
tmsgSendReq(rpcHandle, pEpSet, pMsg);
} else {
vError("vnodeSendMsg queue is NULL, SYNC_QUEUE:%d", SYNC_QUEUE);

View File

@ -616,10 +616,10 @@ int32_t appendDownstream(SOperatorInfo* p, SOperatorInfo** pDownstream, int32_t
int32_t initAggInfo(SOptrBasicInfo* pBasicInfo, SAggSupporter* pAggSup, SExprInfo* pExprInfo, int32_t numOfCols,
SSDataBlock* pResultBlock, size_t keyBufSize, const char* pkey);
void initResultSizeInfo(SOperatorInfo* pOperator, int32_t numOfRows);
void doBuildResultDatablock(SOptrBasicInfo *pbInfo, SGroupResInfo* pGroupResInfo, SExprInfo* pExprInfo, SDiskbasedBuf* pBuf);
void doBuildResultDatablock(SExecTaskInfo *taskInfo, SOptrBasicInfo *pbInfo, SGroupResInfo* pGroupResInfo, SExprInfo* pExprInfo, SDiskbasedBuf* pBuf);
void finalizeMultiTupleQueryResult(int32_t numOfOutput, SDiskbasedBuf* pBuf, SResultRowInfo* pResultRowInfo, int32_t* rowCellInfoOffset);
void doApplyFunctions(SqlFunctionCtx* pCtx, STimeWindow* pWin, SColumnInfoData* pTimeWindowData, int32_t offset,
void doApplyFunctions(SExecTaskInfo* taskInfo, SqlFunctionCtx* pCtx, STimeWindow* pWin, SColumnInfoData* pTimeWindowData, int32_t offset,
int32_t forwardStep, TSKEY* tsCol, int32_t numOfTotal, int32_t numOfOutput, int32_t order);
int32_t setGroupResultOutputBuf(SOptrBasicInfo* binfo, int32_t numOfCols, char* pData, int16_t type, int16_t bytes,
int32_t groupId, SDiskbasedBuf* pBuf, SExecTaskInfo* pTaskInfo, SAggSupporter* pAggSup);

View File

@ -155,7 +155,7 @@ SOperatorFpSet createOperatorFpSet(__optr_open_fn_t openFn, __optr_fn_t nextFn,
void operatorDummyCloseFn(void* param, int32_t numOfCols) {}
static int32_t doCopyToSDataBlock(SSDataBlock* pBlock, SExprInfo* pExprInfo, SDiskbasedBuf* pBuf,
static int32_t doCopyToSDataBlock(SExecTaskInfo *taskInfo, SSDataBlock* pBlock, SExprInfo* pExprInfo, SDiskbasedBuf* pBuf,
SGroupResInfo* pGroupResInfo, int32_t orderType, int32_t* rowCellOffset,
SqlFunctionCtx* pCtx);
@ -579,7 +579,7 @@ void initExecTimeWindowInfo(SColumnInfoData* pColData, STimeWindow* pQueryWindow
colDataAppendInt64(pColData, 4, &pQueryWindow->ekey);
}
void doApplyFunctions(SqlFunctionCtx* pCtx, STimeWindow* pWin, SColumnInfoData* pTimeWindowData, int32_t offset,
void doApplyFunctions(SExecTaskInfo* taskInfo, SqlFunctionCtx* pCtx, STimeWindow* pWin, SColumnInfoData* pTimeWindowData, int32_t offset,
int32_t forwardStep, TSKEY* tsCol, int32_t numOfTotal, int32_t numOfOutput, int32_t order) {
for (int32_t k = 0; k < numOfOutput; ++k) {
pCtx[k].startTs = pWin->skey;
@ -618,9 +618,14 @@ void doApplyFunctions(SqlFunctionCtx* pCtx, STimeWindow* pWin, SColumnInfoData*
pEntryInfo->numOfRes = 1;
continue;
}
int32_t code = TSDB_CODE_SUCCESS;
if (functionNeedToExecute(&pCtx[k]) && pCtx[k].fpSet.process != NULL) {
pCtx[k].fpSet.process(&pCtx[k]);
code = pCtx[k].fpSet.process(&pCtx[k]);
if (code != TSDB_CODE_SUCCESS) {
qError("%s apply functions error, code: %s", GET_TASKID(taskInfo), tstrerror(code));
taskInfo->code = code;
longjmp(taskInfo->env, code);
}
}
// restore it
@ -806,7 +811,13 @@ static void doAggregateImpl(SOperatorInfo* pOperator, TSKEY startTs, SqlFunction
// this can be set during create the struct
// todo add a dummy funtion to avoid process check
if (pCtx[k].fpSet.process != NULL) {
pCtx[k].fpSet.process(&pCtx[k]);
int32_t code = pCtx[k].fpSet.process(&pCtx[k]);
if (code != TSDB_CODE_SUCCESS) {
qError("%s call aggregate function error happens, code : %s",
GET_TASKID(pOperator->pTaskInfo), tstrerror(code));
pOperator->pTaskInfo->code = code;
longjmp(pOperator->pTaskInfo->env, code);
}
}
}
}
@ -2176,7 +2187,7 @@ void setExecutionContext(int32_t numOfOutput, uint64_t groupId, SExecTaskInfo* p
* @param pQInfo
* @param result
*/
int32_t doCopyToSDataBlock(SSDataBlock* pBlock, SExprInfo* pExprInfo, SDiskbasedBuf* pBuf, SGroupResInfo* pGroupResInfo,
int32_t doCopyToSDataBlock(SExecTaskInfo* taskInfo, SSDataBlock* pBlock, SExprInfo* pExprInfo, SDiskbasedBuf* pBuf, SGroupResInfo* pGroupResInfo,
int32_t orderType, int32_t* rowCellOffset, SqlFunctionCtx* pCtx) {
int32_t numOfRows = getNumOfTotalRes(pGroupResInfo);
int32_t numOfResult = pBlock->info.rows; // there are already exists result rows
@ -2215,8 +2226,14 @@ int32_t doCopyToSDataBlock(SSDataBlock* pBlock, SExprInfo* pExprInfo, SDiskbased
int32_t slotId = pExprInfo[j].base.resSchema.slotId;
pCtx[j].resultInfo = getResultCell(pRow, j, rowCellOffset);
if (pCtx[j].fpSet.process) {
pCtx[j].fpSet.finalize(&pCtx[j], pBlock);
if (pCtx[j].fpSet.finalize) {
int32_t code = TSDB_CODE_SUCCESS;
code = pCtx[j].fpSet.finalize(&pCtx[j], pBlock);
if (TAOS_FAILED(code)) {
qError("%s build result data block error, code %s", GET_TASKID(taskInfo), tstrerror(code));
taskInfo->code = code;
longjmp(taskInfo->env, code);
}
} else if (strcmp(pCtx[j].pExpr->pExpr->_function.functionName, "_select_value") == 0) {
// do nothing, todo refactor
} else {
@ -2243,7 +2260,7 @@ int32_t doCopyToSDataBlock(SSDataBlock* pBlock, SExprInfo* pExprInfo, SDiskbased
return 0;
}
void doBuildResultDatablock(SOptrBasicInfo* pbInfo, SGroupResInfo* pGroupResInfo, SExprInfo* pExprInfo,
void doBuildResultDatablock(SExecTaskInfo *taskInfo, SOptrBasicInfo* pbInfo, SGroupResInfo* pGroupResInfo, SExprInfo* pExprInfo,
SDiskbasedBuf* pBuf) {
assert(pGroupResInfo->currentGroup <= pGroupResInfo->totalGroup);
@ -2257,7 +2274,7 @@ void doBuildResultDatablock(SOptrBasicInfo* pbInfo, SGroupResInfo* pGroupResInfo
}
int32_t orderType = TSDB_ORDER_ASC;
doCopyToSDataBlock(pBlock, pExprInfo, pBuf, pGroupResInfo, orderType, rowCellOffset, pCtx);
doCopyToSDataBlock(taskInfo, pBlock, pExprInfo, pBuf, pGroupResInfo, orderType, rowCellOffset, pCtx);
// add condition (pBlock->info.rows >= 1) just to runtime happy
blockDataUpdateTsWindow(pBlock);
@ -3749,7 +3766,7 @@ static SSDataBlock* getAggregateResult(SOperatorInfo* pOperator) {
}
blockDataEnsureCapacity(pInfo->pRes, pOperator->resultInfo.capacity);
doBuildResultDatablock(pInfo, &pAggInfo->groupResInfo, pOperator->pExpr, pAggInfo->aggSup.pResultBuf);
doBuildResultDatablock(pTaskInfo, pInfo, &pAggInfo->groupResInfo, pOperator->pExpr, pAggInfo->aggSup.pResultBuf);
if (pInfo->pRes->info.rows == 0 || !hasRemainDataInCurrentGroup(&pAggInfo->groupResInfo)) {
doSetOperatorCompleted(pOperator);
}

View File

@ -234,7 +234,7 @@ static void doHashGroupbyAgg(SOperatorInfo* pOperator, SSDataBlock* pBlock) {
}
int32_t rowIndex = j - num;
doApplyFunctions(pCtx, &w, NULL, rowIndex, num, NULL, pBlock->info.rows, pOperator->numOfExprs, TSDB_ORDER_ASC);
doApplyFunctions(pTaskInfo, pCtx, &w, NULL, rowIndex, num, NULL, pBlock->info.rows, pOperator->numOfExprs, TSDB_ORDER_ASC);
// assign the group keys or user input constant values if required
doAssignGroupKeys(pCtx, pOperator->numOfExprs, pBlock->info.rows, rowIndex);
@ -252,7 +252,7 @@ static void doHashGroupbyAgg(SOperatorInfo* pOperator, SSDataBlock* pBlock) {
}
int32_t rowIndex = pBlock->info.rows - num;
doApplyFunctions(pCtx, &w, NULL, rowIndex, num, NULL, pBlock->info.rows, pOperator->numOfExprs, TSDB_ORDER_ASC);
doApplyFunctions(pTaskInfo, pCtx, &w, NULL, rowIndex, num, NULL, pBlock->info.rows, pOperator->numOfExprs, TSDB_ORDER_ASC);
doAssignGroupKeys(pCtx, pOperator->numOfExprs, pBlock->info.rows, rowIndex);
}
}
@ -268,7 +268,7 @@ static SSDataBlock* hashGroupbyAggregate(SOperatorInfo* pOperator) {
SSDataBlock* pRes = pInfo->binfo.pRes;
if (pOperator->status == OP_RES_TO_RETURN) {
doBuildResultDatablock(&pInfo->binfo, &pInfo->groupResInfo, pOperator->pExpr, pInfo->aggSup.pResultBuf);
doBuildResultDatablock(pTaskInfo, &pInfo->binfo, &pInfo->groupResInfo, pOperator->pExpr, pInfo->aggSup.pResultBuf);
if (pRes->info.rows == 0 || !hasRemainDataInCurrentGroup(&pInfo->groupResInfo)) {
pOperator->status = OP_EXEC_DONE;
}
@ -317,7 +317,7 @@ static SSDataBlock* hashGroupbyAggregate(SOperatorInfo* pOperator) {
initGroupedResultInfo(&pInfo->groupResInfo, pInfo->aggSup.pResultRowHashTable, false);
while(1) {
doBuildResultDatablock(&pInfo->binfo, &pInfo->groupResInfo, pOperator->pExpr, pInfo->aggSup.pResultBuf);
doBuildResultDatablock(pTaskInfo, &pInfo->binfo, &pInfo->groupResInfo, pOperator->pExpr, pInfo->aggSup.pResultBuf);
doFilter(pInfo->pCondition, pRes);
bool hasRemain = hasRemainDataInCurrentGroup(&pInfo->groupResInfo);

View File

@ -703,7 +703,7 @@ static SArray* hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pRe
pInfo->order, false);
updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &win, true);
doApplyFunctions(pInfo->binfo.pCtx, &win, &pInfo->twAggSup.timeWindowData, startPos, forwardStep, tsCols,
doApplyFunctions(pTaskInfo, pInfo->binfo.pCtx, &win, &pInfo->twAggSup.timeWindowData, startPos, forwardStep, tsCols,
pSDataBlock->info.rows, numOfOutput, TSDB_ORDER_ASC);
STimeWindow nextWin = win;
@ -740,7 +740,7 @@ static SArray* hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pRe
pInfo->order, false);
updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &nextWin, true);
doApplyFunctions(pInfo->binfo.pCtx, &nextWin, &pInfo->twAggSup.timeWindowData, startPos, forwardStep, tsCols,
doApplyFunctions(pTaskInfo, pInfo->binfo.pCtx, &nextWin, &pInfo->twAggSup.timeWindowData, startPos, forwardStep, tsCols,
pSDataBlock->info.rows, numOfOutput, TSDB_ORDER_ASC);
}
@ -855,7 +855,7 @@ static void doStateWindowAggImpl(SOperatorInfo* pOperator, SStateWindowOperatorI
}
updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &window, false);
doApplyFunctions(pInfo->binfo.pCtx, &window, &pInfo->twAggSup.timeWindowData, pRowSup->startRowIndex,
doApplyFunctions(pTaskInfo, pInfo->binfo.pCtx, &window, &pInfo->twAggSup.timeWindowData, pRowSup->startRowIndex,
pRowSup->numOfRows, NULL, pBlock->info.rows, numOfOutput, TSDB_ORDER_ASC);
// here we start a new session window
@ -874,7 +874,7 @@ static void doStateWindowAggImpl(SOperatorInfo* pOperator, SStateWindowOperatorI
}
updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pRowSup->win, false);
doApplyFunctions(pInfo->binfo.pCtx, &pRowSup->win, &pInfo->twAggSup.timeWindowData, pRowSup->startRowIndex,
doApplyFunctions(pTaskInfo, pInfo->binfo.pCtx, &pRowSup->win, &pInfo->twAggSup.timeWindowData, pRowSup->startRowIndex,
pRowSup->numOfRows, NULL, pBlock->info.rows, numOfOutput, TSDB_ORDER_ASC);
}
@ -888,7 +888,7 @@ static SSDataBlock* doStateWindowAgg(SOperatorInfo* pOperator) {
SOptrBasicInfo* pBInfo = &pInfo->binfo;
if (pOperator->status == OP_RES_TO_RETURN) {
doBuildResultDatablock(pBInfo, &pInfo->groupResInfo, pOperator->pExpr, pInfo->aggSup.pResultBuf);
doBuildResultDatablock(pTaskInfo, pBInfo, &pInfo->groupResInfo, pOperator->pExpr, pInfo->aggSup.pResultBuf);
if (pBInfo->pRes->info.rows == 0 || !hasRemainDataInCurrentGroup(&pInfo->groupResInfo)) {
doSetOperatorCompleted(pOperator);
return NULL;
@ -921,7 +921,7 @@ static SSDataBlock* doStateWindowAgg(SOperatorInfo* pOperator) {
initGroupedResultInfo(&pInfo->groupResInfo, pInfo->aggSup.pResultRowHashTable, true);
blockDataEnsureCapacity(pBInfo->pRes, pOperator->resultInfo.capacity);
doBuildResultDatablock(pBInfo, &pInfo->groupResInfo, pOperator->pExpr, pInfo->aggSup.pResultBuf);
doBuildResultDatablock(pTaskInfo, pBInfo, &pInfo->groupResInfo, pOperator->pExpr, pInfo->aggSup.pResultBuf);
if (pBInfo->pRes->info.rows == 0 || !hasRemainDataInCurrentGroup(&pInfo->groupResInfo)) {
doSetOperatorCompleted(pOperator);
}
@ -948,7 +948,7 @@ static SSDataBlock* doBuildIntervalResult(SOperatorInfo* pOperator) {
}
blockDataEnsureCapacity(pBlock, pOperator->resultInfo.capacity);
doBuildResultDatablock(&pInfo->binfo, &pInfo->groupResInfo, pOperator->pExpr, pInfo->aggSup.pResultBuf);
doBuildResultDatablock(pTaskInfo, &pInfo->binfo, &pInfo->groupResInfo, pOperator->pExpr, pInfo->aggSup.pResultBuf);
if (pBlock->info.rows == 0 || !hasRemainDataInCurrentGroup(&pInfo->groupResInfo)) {
doSetOperatorCompleted(pOperator);
@ -998,7 +998,7 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) {
}
if (pOperator->status == OP_RES_TO_RETURN) {
doBuildResultDatablock(&pInfo->binfo, &pInfo->groupResInfo, pOperator->pExpr, pInfo->aggSup.pResultBuf);
doBuildResultDatablock(pOperator->pTaskInfo, &pInfo->binfo, &pInfo->groupResInfo, pOperator->pExpr, pInfo->aggSup.pResultBuf);
if (pInfo->binfo.pRes->info.rows == 0 || !hasRemainDataInCurrentGroup(&pInfo->groupResInfo)) {
pOperator->status = OP_EXEC_DONE;
}
@ -1035,7 +1035,7 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) {
initMultiResInfoFromArrayList(&pInfo->groupResInfo, pUpdated);
blockDataEnsureCapacity(pInfo->binfo.pRes, pOperator->resultInfo.capacity);
doBuildResultDatablock(&pInfo->binfo, &pInfo->groupResInfo, pOperator->pExpr, pInfo->aggSup.pResultBuf);
doBuildResultDatablock(pOperator->pTaskInfo, &pInfo->binfo, &pInfo->groupResInfo, pOperator->pExpr, pInfo->aggSup.pResultBuf);
// TODO: remove for stream
/*ASSERT(pInfo->binfo.pRes->info.rows > 0);*/
@ -1233,7 +1233,7 @@ static void doSessionWindowAggImpl(SOperatorInfo* pOperator, SSessionAggOperator
// pInfo->numOfRows data belong to the current session window
updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &window, false);
doApplyFunctions(pInfo->binfo.pCtx, &window, &pInfo->twAggSup.timeWindowData, pRowSup->startRowIndex,
doApplyFunctions(pTaskInfo, pInfo->binfo.pCtx, &window, &pInfo->twAggSup.timeWindowData, pRowSup->startRowIndex,
pRowSup->numOfRows, NULL, pBlock->info.rows, numOfOutput, TSDB_ORDER_ASC);
// here we start a new session window
@ -1252,7 +1252,7 @@ static void doSessionWindowAggImpl(SOperatorInfo* pOperator, SSessionAggOperator
}
updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pRowSup->win, false);
doApplyFunctions(pInfo->binfo.pCtx, &pRowSup->win, &pInfo->twAggSup.timeWindowData, pRowSup->startRowIndex,
doApplyFunctions(pTaskInfo, pInfo->binfo.pCtx, &pRowSup->win, &pInfo->twAggSup.timeWindowData, pRowSup->startRowIndex,
pRowSup->numOfRows, NULL, pBlock->info.rows, numOfOutput, TSDB_ORDER_ASC);
}
@ -1265,7 +1265,7 @@ static SSDataBlock* doSessionWindowAgg(SOperatorInfo* pOperator) {
SOptrBasicInfo* pBInfo = &pInfo->binfo;
if (pOperator->status == OP_RES_TO_RETURN) {
doBuildResultDatablock(pBInfo, &pInfo->groupResInfo, pOperator->pExpr, pInfo->aggSup.pResultBuf);
doBuildResultDatablock(pOperator->pTaskInfo, pBInfo, &pInfo->groupResInfo, pOperator->pExpr, pInfo->aggSup.pResultBuf);
if (pBInfo->pRes->info.rows == 0 || !hasRemainDataInCurrentGroup(&pInfo->groupResInfo)) {
doSetOperatorCompleted(pOperator);
return NULL;
@ -1298,7 +1298,7 @@ static SSDataBlock* doSessionWindowAgg(SOperatorInfo* pOperator) {
initGroupedResultInfo(&pInfo->groupResInfo, pInfo->aggSup.pResultRowHashTable, true);
blockDataEnsureCapacity(pBInfo->pRes, pOperator->resultInfo.capacity);
doBuildResultDatablock(pBInfo, &pInfo->groupResInfo, pOperator->pExpr, pInfo->aggSup.pResultBuf);
doBuildResultDatablock(pOperator->pTaskInfo, pBInfo, &pInfo->groupResInfo, pOperator->pExpr, pInfo->aggSup.pResultBuf);
if (pBInfo->pRes->info.rows == 0 || !hasRemainDataInCurrentGroup(&pInfo->groupResInfo)) {
doSetOperatorCompleted(pOperator);
}

View File

@ -95,6 +95,9 @@ bool stateFunctionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResultInfo);
int32_t stateCountFunction(SqlFunctionCtx* pCtx);
int32_t stateDurationFunction(SqlFunctionCtx* pCtx);
bool getCsumFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv);
int32_t csumFunction(SqlFunctionCtx* pCtx);
bool getSelectivityFuncEnv(SFunctionNode* pFunc, SFuncExecEnv* pEnv);
#ifdef __cplusplus

View File

@ -308,6 +308,37 @@ static int32_t translateStateDuration(SFunctionNode* pFunc, char* pErrBuf, int32
return TSDB_CODE_SUCCESS;
}
static int32_t translateCsum(SFunctionNode* pFunc, char* pErrBuf, int32_t len) {
if (1 != LIST_LENGTH(pFunc->pParameterList)) {
return TSDB_CODE_SUCCESS;
}
SNode* pPara = nodesListGetNode(pFunc->pParameterList, 0);
if (QUERY_NODE_COLUMN != nodeType(pPara)) {
return buildFuncErrMsg(pErrBuf, len, TSDB_CODE_FUNC_FUNTION_ERROR,
"The input parameter of CSUM function can only be column");
}
uint8_t colType = ((SExprNode*)nodesListGetNode(pFunc->pParameterList, 0))->resType.type;
uint8_t resType;
if (!IS_NUMERIC_TYPE(colType)) {
return invaildFuncParaTypeErrMsg(pErrBuf, len, pFunc->functionName);
} else {
if (IS_SIGNED_NUMERIC_TYPE(colType)) {
resType = TSDB_DATA_TYPE_BIGINT;
} else if (IS_UNSIGNED_NUMERIC_TYPE(colType)) {
resType = TSDB_DATA_TYPE_UBIGINT;
} else if (IS_FLOAT_TYPE(colType)) {
resType = TSDB_DATA_TYPE_DOUBLE;
} else {
ASSERT(0);
}
}
pFunc->node.resType = (SDataType) { .bytes = tDataTypes[resType].bytes, .type = resType};
return TSDB_CODE_SUCCESS;
}
static int32_t translateLastRow(SFunctionNode* pFunc, char* pErrBuf, int32_t len) {
// todo
return TSDB_CODE_SUCCESS;
@ -742,6 +773,16 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
.processFunc = stateDurationFunction,
.finalizeFunc = NULL
},
{
.name = "csum",
.type = FUNCTION_TYPE_CSUM,
.classification = FUNC_MGT_NONSTANDARD_SQL_FUNC | FUNC_MGT_TIMELINE_FUNC,
.translateFunc = translateCsum,
.getEnvFunc = getCsumFuncEnv,
.initFunc = functionSetup,
.processFunc = csumFunction,
.finalizeFunc = NULL
},
{
.name = "abs",
.type = FUNCTION_TYPE_ABS,

View File

@ -2818,7 +2818,6 @@ int32_t stateCountFunction(SqlFunctionCtx* pCtx) {
SInputColumnInfoData* pInput = &pCtx->input;
SColumnInfoData* pInputCol = pInput->pData[0];
SColumnInfoData* pTsOutput = pCtx->pTsOutput;
int32_t numOfElems = 0;
SColumnInfoData* pOutput = (SColumnInfoData*)pCtx->pOutput;
@ -2856,7 +2855,6 @@ int32_t stateDurationFunction(SqlFunctionCtx* pCtx) {
TSKEY* tsList = (int64_t*)pInput->pPTS->pData;
SColumnInfoData* pInputCol = pInput->pData[0];
SColumnInfoData* pTsOutput = pCtx->pTsOutput;
int32_t numOfElems = 0;
SColumnInfoData* pOutput = (SColumnInfoData*)pCtx->pOutput;
@ -2896,3 +2894,58 @@ int32_t stateDurationFunction(SqlFunctionCtx* pCtx) {
return numOfElems;
}
bool getCsumFuncEnv(SFunctionNode* UNUSED_PARAM(pFunc), SFuncExecEnv* pEnv) {
pEnv->calcMemSize = sizeof(SSumRes);
return true;
}
int32_t csumFunction(SqlFunctionCtx* pCtx) {
SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx);
SSumRes* pSumRes = GET_ROWCELL_INTERBUF(pResInfo);
SInputColumnInfoData* pInput = &pCtx->input;
TSKEY* tsList = (int64_t*)pInput->pPTS->pData;
SColumnInfoData* pInputCol = pInput->pData[0];
SColumnInfoData* pTsOutput = pCtx->pTsOutput;
SColumnInfoData* pOutput = (SColumnInfoData*)pCtx->pOutput;
int32_t numOfElems = 0;
int32_t type = pInputCol->info.type;
int32_t startOffset = pCtx->offset;
for (int32_t i = pInput->startRowIndex; i < pInput->numOfRows + pInput->startRowIndex; i += 1) {
int32_t pos = startOffset + numOfElems;
if (colDataIsNull_f(pInputCol->nullbitmap, i)) {
//colDataAppendNULL(pOutput, i);
continue;
}
char* data = colDataGetData(pInputCol, i);
if (IS_SIGNED_NUMERIC_TYPE(type)) {
int64_t v;
GET_TYPED_DATA(v, int64_t, type, data);
pSumRes->isum += v;
colDataAppend(pOutput, pos, (char *)&pSumRes->isum, false);
} else if (IS_UNSIGNED_NUMERIC_TYPE(type)) {
uint64_t v;
GET_TYPED_DATA(v, uint64_t, type, data);
pSumRes->usum += v;
colDataAppend(pOutput, pos, (char *)&pSumRes->usum, false);
} else if (IS_FLOAT_TYPE(type)) {
double v;
GET_TYPED_DATA(v, double, type, data);
pSumRes->dsum += v;
colDataAppend(pOutput, pos, (char *)&pSumRes->dsum, false);
}
//TODO: remove this after pTsOutput is handled
if (pTsOutput != NULL) {
colDataAppendInt64(pTsOutput, pos, &tsList[i]);
}
numOfElems++;
}
return numOfElems;
}

View File

@ -695,6 +695,7 @@ int32_t convertDataBlockToUdfDataBlock(SSDataBlock *block, SUdfDataBlock *udfBlo
udfCol->colMeta.scale = col->info.scale;
udfCol->colMeta.precision = col->info.precision;
udfCol->colData.numOfRows = udfBlock->numOfRows;
udfCol->hasNull = col->hasNull;
if (IS_VAR_DATA_TYPE(udfCol->colMeta.type)) {
udfCol->colData.varLenCol.varOffsetsLen = sizeof(int32_t) * udfBlock->numOfRows;
udfCol->colData.varLenCol.varOffsets = taosMemoryMalloc(udfCol->colData.varLenCol.varOffsetsLen);
@ -731,6 +732,7 @@ int32_t convertUdfColumnToDataBlock(SUdfColumn *udfCol, SSDataBlock *block) {
col->info.bytes = meta->bytes;
col->info.scale = meta->scale;
col->info.type = meta->type;
col->hasNull = udfCol->hasNull;
SUdfColumnData *data = &udfCol->colData;
if (!IS_VAR_DATA_TYPE(meta->type)) {

View File

@ -114,6 +114,7 @@ int tdbBtreeOpen(int keyLen, int valLen, SPager *pPager, tdb_cmpr_fn_t kcmpr, SB
int tdbBtreeClose(SBTree *pBt) {
if (pBt) {
tdbFree(pBt->pBuf);
tdbOsFree(pBt);
}
return 0;

View File

@ -242,7 +242,7 @@ static void tdbPCacheRemovePageFromHash(SPCache *pCache, SPage *pPage) {
int h;
h = tdbPCachePageHash(&(pPage->pgid));
for (ppPage = &(pCache->pgHash[h % pCache->nHash]); *ppPage != pPage; ppPage = &((*ppPage)->pHashNext))
for (ppPage = &(pCache->pgHash[h % pCache->nHash]); (*ppPage) && *ppPage != pPage; ppPage = &((*ppPage)->pHashNext))
;
ASSERT(*ppPage == pPage);
*ppPage = pPage->pHashNext;

View File

@ -130,6 +130,7 @@ static int32_t walReadSeekVer(SWalReadHandle *pRead, int64_t ver) {
}
}
// code set inner
if (walReadSeekFilePos(pRead, pRet->firstVer, ver) < 0) {
return -1;
}
@ -249,16 +250,22 @@ int32_t walReadWithHandle(SWalReadHandle *pRead, int64_t ver) {
// TODO: check wal life
if (pRead->curVersion != ver) {
if (walReadSeekVer(pRead, ver) < 0) {
terrno = TSDB_CODE_WAL_INVALID_VER;
wError("unexpected wal log version: % " PRId64 ", since seek error", ver);
return -1;
}
}
if (!taosValidFile(pRead->pReadLogTFile)) {
return -1;
}
/*if (!taosValidFile(pRead->pReadLogTFile)) {*/
/*return -1;*/
/*}*/
code = taosReadFile(pRead->pReadLogTFile, pRead->pHead, sizeof(SWalHead));
if (code != sizeof(SWalHead)) {
if (code < 0)
terrno = TAOS_SYSTEM_ERROR(errno);
else
terrno = TSDB_CODE_WAL_FILE_CORRUPTED;
return -1;
}

View File

@ -225,6 +225,7 @@ int walRoll(SWal *pWal) {
terrno = TAOS_SYSTEM_ERROR(errno);
return -1;
}
// terrno set inner
code = walRollFileInfo(pWal);
if (code != 0) {
return -1;

View File

@ -159,6 +159,7 @@ sql alter table db.stb rename tag t1 tx
print ========== alter common
sql alter table db.stb comment 'abcde' ;
sql alter table db.stb ttl 10 ;
sql show db.stables;
if $data[0][6] != abcde then

View File

@ -346,8 +346,10 @@ class TDTestCase:
return
def test_case3(self):
self.taosBenchCreate("127.0.0.1","no","db1", "stb1", 1, 8, 1*10000)
# self.taosBenchCreate("test209","no","db2", "stb2", 1, 8, 1*10000)
# self.taosBenchCreate("chenhaoran02","no","db1", "stb1", 1, 8, 1*10000)
self.taosBenchCreate("chenhaoran02","no","db1", "stb1", 1, 8, 1*1000)
# self.taosBenchCreate("db1", "stb1", 4, 5, 100*10000)
# self.taosBenchCreate("db1", "stb1", 1, 5, 100*10000)

View File

@ -29,8 +29,8 @@
"batch_create_tbl_num": 50000,
"data_source": "rand",
"insert_mode": "taosc",
"insert_rows": 0,
"interlace_rows": 0,
"insert_rows": 10,
"interlace_rows": 100000,
"insert_interval": 0,
"max_sql_len": 10000000,
"disorder_ratio": 0,