Merge branch '3.0' of https://github.com/taosdata/TDengine into fix/TD-30837
This commit is contained in:
commit
6a2a5b80a1
|
@ -236,7 +236,7 @@ typedef struct {
|
|||
void* vnode; // not available to encoder and decoder
|
||||
FTbSink* tbSinkFunc;
|
||||
STSchema* pTSchema;
|
||||
SSHashObj* pTblInfo;
|
||||
SSHashObj* pTbInfo;
|
||||
} STaskSinkTb;
|
||||
|
||||
typedef struct {
|
||||
|
|
|
@ -748,13 +748,13 @@ int32_t tqBuildStreamTask(void* pTqObj, SStreamTask* pTask, int64_t nextProcessV
|
|||
return terrno;
|
||||
}
|
||||
|
||||
pOutputInfo->tbSink.pTblInfo = tSimpleHashInit(10240, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT));
|
||||
if (pOutputInfo->tbSink.pTblInfo == NULL) {
|
||||
pOutputInfo->tbSink.pTbInfo = tSimpleHashInit(10240, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT));
|
||||
if (pOutputInfo->tbSink.pTbInfo == NULL) {
|
||||
tqError("vgId:%d failed init sink tableInfo, code:%s", vgId, tstrerror(terrno));
|
||||
return terrno;
|
||||
}
|
||||
|
||||
tSimpleHashSetFreeFp(pOutputInfo->tbSink.pTblInfo, freePtr);
|
||||
tSimpleHashSetFreeFp(pOutputInfo->tbSink.pTbInfo, freePtr);
|
||||
}
|
||||
|
||||
if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) {
|
||||
|
|
|
@ -18,6 +18,8 @@
|
|||
#include "tmsg.h"
|
||||
#include "tq.h"
|
||||
|
||||
#define IS_NEW_SUBTB_RULE(_t) (((_t)->ver >= SSTREAM_TASK_SUBTABLE_CHANGED_VER) && ((_t)->subtableWithoutMd5 != 1))
|
||||
|
||||
typedef struct STableSinkInfo {
|
||||
uint64_t uid;
|
||||
tstr name;
|
||||
|
@ -35,16 +37,22 @@ static int32_t doConvertRows(SSubmitTbData* pTableData, const STSchema* pTSchema
|
|||
int64_t earlyTs, const char* id);
|
||||
static int32_t doWaitForDstTableCreated(SVnode* pVnode, SStreamTask* pTask, STableSinkInfo* pTableSinkInfo,
|
||||
const char* dstTableName, int64_t* uid);
|
||||
static int32_t doPutIntoCache(SSHashObj* pSinkTableMap, STableSinkInfo* pTableSinkInfo, uint64_t groupId,
|
||||
const char* id);
|
||||
static int32_t doRemoveFromCache(SSHashObj* pSinkTableMap, uint64_t groupId, const char* id);
|
||||
|
||||
static bool isValidDstChildTable(SMetaReader* pReader, int32_t vgId, const char* ctbName, int64_t suid);
|
||||
static int32_t initCreateTableMsg(SVCreateTbReq* pCreateTableReq, uint64_t suid, const char* stbFullName,
|
||||
int32_t numOfTags);
|
||||
static int32_t createDefaultTagColName(SArray** pColNameList);
|
||||
static int32_t setCreateTableMsgTableName(SVCreateTbReq* pCreateTableReq, SSDataBlock* pDataBlock, const char* stbFullName,
|
||||
int64_t gid, bool newSubTableRule);
|
||||
static int32_t doCreateSinkInfo(const char* pDstTableName, STableSinkInfo** pInfo);
|
||||
static int32_t setCreateTableMsgTableName(SVCreateTbReq* pCreateTableReq, SSDataBlock* pDataBlock,
|
||||
const char* stbFullName, int64_t gid, bool newSubTableRule);
|
||||
static int32_t doCreateSinkTableInfo(const char* pDstTableName, STableSinkInfo** pInfo);
|
||||
static int32_t doPutSinkTableInfoIntoCache(SSHashObj* pSinkTableMap, STableSinkInfo* pTableSinkInfo, uint64_t groupId,
|
||||
const char* id);
|
||||
static bool doGetSinkTableInfoFromCache(SSHashObj* pTableInfoMap, uint64_t groupId, STableSinkInfo** pInfo);
|
||||
static int32_t doRemoveSinkTableInfoInCache(SSHashObj* pSinkTableMap, uint64_t groupId, const char* id);
|
||||
static int32_t checkTagSchema(SStreamTask* pTask, SVnode* pVnode);
|
||||
static void reubuildAndSendMultiResBlock(SStreamTask* pTask, const SArray* pBlocks, SVnode* pVnode, int64_t earlyTs);
|
||||
static int32_t handleResultBlockMsg(SStreamTask* pTask, SSDataBlock* pDataBlock, int32_t index, SVnode* pVnode,
|
||||
int64_t earlyTs);
|
||||
|
||||
int32_t tqBuildDeleteReq(STQ* pTq, const char* stbFullName, const SSDataBlock* pDataBlock, SBatchDeleteReq* deleteReq,
|
||||
const char* pIdStr, bool newSubTableRule) {
|
||||
|
@ -81,7 +89,8 @@ int32_t tqBuildDeleteReq(STQ* pTq, const char* stbFullName, const SSDataBlock* p
|
|||
|
||||
memcpy(name, varDataVal(varTbName), varDataLen(varTbName));
|
||||
name[varDataLen(varTbName)] = '\0';
|
||||
if (newSubTableRule && !isAutoTableName(name) && !alreadyAddGroupId(name, groupId) && groupId != 0 && stbFullName) {
|
||||
if (newSubTableRule && !isAutoTableName(name) && !alreadyAddGroupId(name, groupId) && groupId != 0 &&
|
||||
stbFullName) {
|
||||
int32_t code = buildCtbNameAddGroupId(stbFullName, name, groupId, cap);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
return code;
|
||||
|
@ -161,16 +170,6 @@ end:
|
|||
return ret;
|
||||
}
|
||||
|
||||
static bool tqGetTableInfo(SSHashObj* pTableInfoMap, uint64_t groupId, STableSinkInfo** pInfo) {
|
||||
void* pVal = tSimpleHashGet(pTableInfoMap, &groupId, sizeof(uint64_t));
|
||||
if (pVal) {
|
||||
*pInfo = *(STableSinkInfo**)pVal;
|
||||
return true;
|
||||
}
|
||||
|
||||
return false;
|
||||
}
|
||||
|
||||
static int32_t tqPutReqToQueue(SVnode* pVnode, SVCreateTbBatchReq* pReqs) {
|
||||
void* buf = NULL;
|
||||
int32_t tlen = 0;
|
||||
|
@ -201,7 +200,7 @@ int32_t initCreateTableMsg(SVCreateTbReq* pCreateTableReq, uint64_t suid, const
|
|||
int32_t code = tNameFromString(&name, stbFullName, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
|
||||
if (code == 0) {
|
||||
pCreateTableReq->ctb.stbName = taosStrdup((char*)tNameGetTableName(&name));
|
||||
if (pCreateTableReq->ctb.stbName == NULL) { // ignore this error code
|
||||
if (pCreateTableReq->ctb.stbName == NULL) { // ignore this error code
|
||||
tqError("failed to duplicate the stb name:%s, failed to init create-table msg and create req table", stbFullName);
|
||||
code = terrno;
|
||||
}
|
||||
|
@ -231,7 +230,7 @@ int32_t createDefaultTagColName(SArray** pColNameList) {
|
|||
}
|
||||
|
||||
int32_t setCreateTableMsgTableName(SVCreateTbReq* pCreateTableReq, SSDataBlock* pDataBlock, const char* stbFullName,
|
||||
int64_t gid, bool newSubTableRule) {
|
||||
int64_t gid, bool newSubTableRule) {
|
||||
if (pDataBlock->info.parTbName[0]) {
|
||||
if (newSubTableRule && !isAutoTableName(pDataBlock->info.parTbName) &&
|
||||
!alreadyAddGroupId(pDataBlock->info.parTbName, gid) && gid != 0 && stbFullName) {
|
||||
|
@ -245,18 +244,17 @@ int32_t setCreateTableMsgTableName(SVCreateTbReq* pCreateTableReq, SSDataBlock*
|
|||
if (code != TSDB_CODE_SUCCESS) {
|
||||
return code;
|
||||
}
|
||||
// tqDebug("gen name from:%s", pDataBlock->info.parTbName);
|
||||
// tqDebug("gen name from:%s", pDataBlock->info.parTbName);
|
||||
} else {
|
||||
pCreateTableReq->name = taosStrdup(pDataBlock->info.parTbName);
|
||||
if (pCreateTableReq->name == NULL) {
|
||||
return terrno;
|
||||
}
|
||||
// tqDebug("copy name:%s", pDataBlock->info.parTbName);
|
||||
// tqDebug("copy name:%s", pDataBlock->info.parTbName);
|
||||
}
|
||||
} else {
|
||||
int32_t code = buildCtbNameByGroupId(stbFullName, gid, &pCreateTableReq->name);
|
||||
return code;
|
||||
// tqDebug("gen name from stbFullName:%s gid:%"PRId64, stbFullName, gid);
|
||||
}
|
||||
|
||||
return 0;
|
||||
|
@ -264,16 +262,20 @@ int32_t setCreateTableMsgTableName(SVCreateTbReq* pCreateTableReq, SSDataBlock*
|
|||
|
||||
static int32_t doBuildAndSendCreateTableMsg(SVnode* pVnode, char* stbFullName, SSDataBlock* pDataBlock,
|
||||
SStreamTask* pTask, int64_t suid) {
|
||||
STSchema* pTSchema = pTask->outputInfo.tbSink.pTSchema;
|
||||
int32_t rows = pDataBlock->info.rows;
|
||||
SArray* tagArray = taosArrayInit(4, sizeof(STagVal));
|
||||
const char* id = pTask->id.idStr;
|
||||
int32_t vgId = pTask->pMeta->vgId;
|
||||
int32_t code = 0;
|
||||
STSchema* pTSchema = pTask->outputInfo.tbSink.pTSchema;
|
||||
int32_t rows = pDataBlock->info.rows;
|
||||
SArray* tagArray = NULL;
|
||||
const char* id = pTask->id.idStr;
|
||||
int32_t vgId = pTask->pMeta->vgId;
|
||||
int32_t code = 0;
|
||||
STableSinkInfo* pInfo = NULL;
|
||||
SVCreateTbBatchReq reqs = {0};
|
||||
SArray* crTblArray = NULL;
|
||||
|
||||
tqDebug("s-task:%s build create %d table(s) msg", id, rows);
|
||||
SVCreateTbBatchReq reqs = {0};
|
||||
SArray* crTblArray = reqs.pArray = taosArrayInit(1, sizeof(SVCreateTbReq));
|
||||
|
||||
tagArray = taosArrayInit(4, sizeof(STagVal));
|
||||
crTblArray = reqs.pArray = taosArrayInit(1, sizeof(SVCreateTbReq));
|
||||
if ((NULL == reqs.pArray) || (tagArray == NULL)) {
|
||||
tqError("s-task:%s failed to init create table msg, code:%s", id, tstrerror(terrno));
|
||||
code = terrno;
|
||||
|
@ -291,6 +293,7 @@ static int32_t doBuildAndSendCreateTableMsg(SVnode* pVnode, char* stbFullName, S
|
|||
tqError("s-task:%s vgId:%d failed to init create table msg", id, vgId);
|
||||
continue;
|
||||
}
|
||||
|
||||
taosArrayClear(tagArray);
|
||||
|
||||
if (size == 2) {
|
||||
|
@ -356,8 +359,7 @@ static int32_t doBuildAndSendCreateTableMsg(SVnode* pVnode, char* stbFullName, S
|
|||
}
|
||||
}
|
||||
|
||||
code = setCreateTableMsgTableName(pCreateTbReq, pDataBlock, stbFullName, gid,
|
||||
pTask->ver >= SSTREAM_TASK_SUBTABLE_CHANGED_VER && pTask->subtableWithoutMd5 != 1);
|
||||
code = setCreateTableMsgTableName(pCreateTbReq, pDataBlock, stbFullName, gid, IS_NEW_SUBTB_RULE(pTask));
|
||||
if (code) {
|
||||
goto _end;
|
||||
}
|
||||
|
@ -368,16 +370,15 @@ static int32_t doBuildAndSendCreateTableMsg(SVnode* pVnode, char* stbFullName, S
|
|||
goto _end;
|
||||
}
|
||||
|
||||
STableSinkInfo* pInfo = NULL;
|
||||
bool alreadyCached = tqGetTableInfo(pTask->outputInfo.tbSink.pTblInfo, gid, &pInfo);
|
||||
bool alreadyCached = doGetSinkTableInfoFromCache(pTask->outputInfo.tbSink.pTbInfo, gid, &pInfo);
|
||||
if (!alreadyCached) {
|
||||
code = doCreateSinkInfo(pCreateTbReq->name, &pInfo);
|
||||
code = doCreateSinkTableInfo(pCreateTbReq->name, &pInfo);
|
||||
if (code) {
|
||||
tqError("vgId:%d failed to create sink tableInfo for table:%s, s-task:%s", vgId, pCreateTbReq->name, id);
|
||||
continue;
|
||||
}
|
||||
|
||||
code = doPutIntoCache(pTask->outputInfo.tbSink.pTblInfo, pInfo, gid, id);
|
||||
code = doPutSinkTableInfoIntoCache(pTask->outputInfo.tbSink.pTbInfo, pInfo, gid, id);
|
||||
if (code) {
|
||||
tqError("vgId:%d failed to put sink tableInfo:%s into cache, s-task:%s", vgId, pCreateTbReq->name, id);
|
||||
}
|
||||
|
@ -465,45 +466,45 @@ int32_t doMergeExistedRows(SSubmitTbData* pExisted, const SSubmitTbData* pNew, c
|
|||
|
||||
k += 1;
|
||||
} else {
|
||||
// check for the existance of primary key
|
||||
if (pNewRow->numOfPKs == 0) {
|
||||
// check for the existance of primary key
|
||||
if (pNewRow->numOfPKs == 0) {
|
||||
void* p = taosArrayPush(pFinal, &pNewRow);
|
||||
if (p == NULL) {
|
||||
return terrno;
|
||||
}
|
||||
|
||||
k += 1;
|
||||
j += 1;
|
||||
tRowDestroy(pOldRow);
|
||||
} else {
|
||||
numOfPk = pNewRow->numOfPKs;
|
||||
|
||||
SRowKey kNew, kOld;
|
||||
tRowGetKey(pNewRow, &kNew);
|
||||
tRowGetKey(pOldRow, &kOld);
|
||||
|
||||
int32_t ret = tRowKeyCompare(&kNew, &kOld);
|
||||
if (ret <= 0) {
|
||||
void* p = taosArrayPush(pFinal, &pNewRow);
|
||||
if (p == NULL) {
|
||||
return terrno;
|
||||
}
|
||||
|
||||
k += 1;
|
||||
j += 1;
|
||||
tRowDestroy(pOldRow);
|
||||
} else {
|
||||
numOfPk = pNewRow->numOfPKs;
|
||||
|
||||
SRowKey kNew, kOld;
|
||||
tRowGetKey(pNewRow, &kNew);
|
||||
tRowGetKey(pOldRow, &kOld);
|
||||
|
||||
int32_t ret = tRowKeyCompare(&kNew, &kOld);
|
||||
if (ret <= 0) {
|
||||
void* p = taosArrayPush(pFinal, &pNewRow);
|
||||
if (p == NULL) {
|
||||
return terrno;
|
||||
}
|
||||
|
||||
j += 1;
|
||||
|
||||
if (ret == 0) {
|
||||
k += 1;
|
||||
tRowDestroy(pOldRow);
|
||||
}
|
||||
} else {
|
||||
void* p = taosArrayPush(pFinal, &pOldRow);
|
||||
if (p == NULL) {
|
||||
return terrno;
|
||||
}
|
||||
|
||||
if (ret == 0) {
|
||||
k += 1;
|
||||
tRowDestroy(pOldRow);
|
||||
}
|
||||
} else {
|
||||
void* p = taosArrayPush(pFinal, &pOldRow);
|
||||
if (p == NULL) {
|
||||
return terrno;
|
||||
}
|
||||
|
||||
k += 1;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -527,8 +528,8 @@ int32_t doMergeExistedRows(SSubmitTbData* pExisted, const SSubmitTbData* pNew, c
|
|||
taosArrayDestroy(pExisted->aRowP);
|
||||
pExisted->aRowP = pFinal;
|
||||
|
||||
tqTrace("s-task:%s rows merged, final rows:%d, pk:%d uid:%" PRId64 ", existed auto-create table:%d, new-block:%d",
|
||||
id, (int32_t)taosArrayGetSize(pFinal), numOfPk, pExisted->uid, (pExisted->pCreateTbReq != NULL),
|
||||
tqTrace("s-task:%s rows merged, final rows:%d, pk:%d uid:%" PRId64 ", existed auto-create table:%d, new-block:%d", id,
|
||||
(int32_t)taosArrayGetSize(pFinal), numOfPk, pExisted->uid, (pExisted->pCreateTbReq != NULL),
|
||||
(pNew->pCreateTbReq != NULL));
|
||||
|
||||
tdDestroySVCreateTbReq(pNew->pCreateTbReq);
|
||||
|
@ -727,7 +728,7 @@ int32_t doConvertRows(SSubmitTbData* pTableData, const STSchema* pTSchema, SSDat
|
|||
dataIndex++;
|
||||
} else {
|
||||
void* colData = colDataGetData(pColData, j);
|
||||
if (IS_VAR_DATA_TYPE(pCol->type)) { // address copy, no value
|
||||
if (IS_VAR_DATA_TYPE(pCol->type)) { // address copy, no value
|
||||
SValue sv =
|
||||
(SValue){.type = pCol->type, .nData = varDataLen(colData), .pData = (uint8_t*)varDataVal(colData)};
|
||||
SColVal cv = COL_VAL_VALUE(pCol->colId, sv);
|
||||
|
@ -806,7 +807,7 @@ int32_t doWaitForDstTableCreated(SVnode* pVnode, SStreamTask* pTask, STableSinkI
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t doCreateSinkInfo(const char* pDstTableName, STableSinkInfo** pInfo) {
|
||||
int32_t doCreateSinkTableInfo(const char* pDstTableName, STableSinkInfo** pInfo) {
|
||||
int32_t nameLen = strlen(pDstTableName);
|
||||
(*pInfo) = taosMemoryCalloc(1, sizeof(STableSinkInfo) + nameLen + 1);
|
||||
if (*pInfo == NULL) {
|
||||
|
@ -830,7 +831,7 @@ int32_t setDstTableDataUid(SVnode* pVnode, SStreamTask* pTask, SSDataBlock* pDat
|
|||
STableSinkInfo* pTableSinkInfo = NULL;
|
||||
int32_t code = 0;
|
||||
|
||||
bool alreadyCached = tqGetTableInfo(pTask->outputInfo.tbSink.pTblInfo, groupId, &pTableSinkInfo);
|
||||
bool alreadyCached = doGetSinkTableInfoFromCache(pTask->outputInfo.tbSink.pTbInfo, groupId, &pTableSinkInfo);
|
||||
|
||||
if (alreadyCached) {
|
||||
if (dstTableName[0] == 0) { // data block does not set the destination table name
|
||||
|
@ -870,7 +871,7 @@ int32_t setDstTableDataUid(SVnode* pVnode, SStreamTask* pTask, SSDataBlock* pDat
|
|||
}
|
||||
}
|
||||
|
||||
code = doCreateSinkInfo(dstTableName, &pTableSinkInfo);
|
||||
code = doCreateSinkTableInfo(dstTableName, &pTableSinkInfo);
|
||||
if (code == 0) {
|
||||
tqDebug("s-task:%s build new sinkTableInfo to add cache, dstTable:%s", id, dstTableName);
|
||||
} else {
|
||||
|
@ -906,14 +907,14 @@ int32_t setDstTableDataUid(SVnode* pVnode, SStreamTask* pTask, SSDataBlock* pDat
|
|||
|
||||
SArray* pTagArray = taosArrayInit(pTSchema->numOfCols + 1, sizeof(STagVal));
|
||||
if (pTagArray == NULL) {
|
||||
tqError("s-task:%s failed to build auto create submit msg in sink, vgId:%d, due to %s", id, vgId,
|
||||
tstrerror(terrno));
|
||||
return terrno;
|
||||
}
|
||||
|
||||
pTableData->flags = SUBMIT_REQ_AUTO_CREATE_TABLE;
|
||||
code =
|
||||
buildAutoCreateTableReq(stbFullName, suid, pTSchema->numOfCols + 1, pDataBlock, pTagArray,
|
||||
(pTask->ver >= SSTREAM_TASK_SUBTABLE_CHANGED_VER && pTask->subtableWithoutMd5 != 1),
|
||||
&pTableData->pCreateTbReq);
|
||||
code = buildAutoCreateTableReq(stbFullName, suid, pTSchema->numOfCols + 1, pDataBlock, pTagArray,
|
||||
IS_NEW_SUBTB_RULE(pTask), &pTableData->pCreateTbReq);
|
||||
taosArrayDestroy(pTagArray);
|
||||
|
||||
if (code) {
|
||||
|
@ -923,12 +924,12 @@ int32_t setDstTableDataUid(SVnode* pVnode, SStreamTask* pTask, SSDataBlock* pDat
|
|||
}
|
||||
|
||||
pTableSinkInfo->uid = 0;
|
||||
code = doPutIntoCache(pTask->outputInfo.tbSink.pTblInfo, pTableSinkInfo, groupId, id);
|
||||
code = doPutSinkTableInfoIntoCache(pTask->outputInfo.tbSink.pTbInfo, pTableSinkInfo, groupId, id);
|
||||
} else {
|
||||
metaReaderClear(&mr);
|
||||
|
||||
tqError("s-task:%s vgId:%d dst-table:%s not auto-created, and not create in tsdb, discard data", id,
|
||||
vgId, dstTableName);
|
||||
tqError("s-task:%s vgId:%d dst-table:%s not auto-created, and not create in tsdb, discard data", id, vgId,
|
||||
dstTableName);
|
||||
return TSDB_CODE_TDB_TABLE_NOT_EXIST;
|
||||
}
|
||||
} else {
|
||||
|
@ -944,7 +945,7 @@ int32_t setDstTableDataUid(SVnode* pVnode, SStreamTask* pTask, SSDataBlock* pDat
|
|||
pTableSinkInfo->uid = mr.me.uid;
|
||||
|
||||
metaReaderClear(&mr);
|
||||
code = doPutIntoCache(pTask->outputInfo.tbSink.pTblInfo, pTableSinkInfo, groupId, id);
|
||||
code = doPutSinkTableInfoIntoCache(pTask->outputInfo.tbSink.pTbInfo, pTableSinkInfo, groupId, id);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -952,8 +953,8 @@ int32_t setDstTableDataUid(SVnode* pVnode, SStreamTask* pTask, SSDataBlock* pDat
|
|||
return code;
|
||||
}
|
||||
|
||||
int32_t tqSetDstTableDataPayload(uint64_t suid, const STSchema *pTSchema, int32_t blockIndex, SSDataBlock* pDataBlock,
|
||||
SSubmitTbData* pTableData, int64_t earlyTs, const char* id) {
|
||||
int32_t tqSetDstTableDataPayload(uint64_t suid, const STSchema* pTSchema, int32_t blockIndex, SSDataBlock* pDataBlock,
|
||||
SSubmitTbData* pTableData, int64_t earlyTs, const char* id) {
|
||||
int32_t numOfRows = pDataBlock->info.rows;
|
||||
char* dstTableName = pDataBlock->info.parTbName;
|
||||
|
||||
|
@ -975,6 +976,43 @@ int32_t tqSetDstTableDataPayload(uint64_t suid, const STSchema *pTSchema, int32_
|
|||
return code;
|
||||
}
|
||||
|
||||
int32_t checkTagSchema(SStreamTask* pTask, SVnode* pVnode) {
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
const char* id = pTask->id.idStr;
|
||||
STaskOutputInfo* pOutputInfo = &pTask->outputInfo;
|
||||
int32_t vgId = pTask->pMeta->vgId;
|
||||
|
||||
if (pTask->outputInfo.tbSink.pTagSchema == NULL) {
|
||||
SMetaReader mer1 = {0};
|
||||
metaReaderDoInit(&mer1, pVnode->pMeta, META_READER_LOCK);
|
||||
|
||||
code = metaReaderGetTableEntryByUid(&mer1, pOutputInfo->tbSink.stbUid);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
tqError("s-task:%s vgId:%d failed to get the dst stable, failed to sink results", id, vgId);
|
||||
metaReaderClear(&mer1);
|
||||
return code;
|
||||
}
|
||||
|
||||
pOutputInfo->tbSink.pTagSchema = tCloneSSchemaWrapper(&mer1.me.stbEntry.schemaTag);
|
||||
metaReaderClear(&mer1);
|
||||
|
||||
if (pOutputInfo->tbSink.pTagSchema == NULL) {
|
||||
tqError("s-task:%s failed to clone tag schema, code:%s, failed to sink results", id, tstrerror(terrno));
|
||||
return terrno;
|
||||
}
|
||||
|
||||
SSchemaWrapper* pTagSchema = pOutputInfo->tbSink.pTagSchema;
|
||||
SSchema* pCol1 = &pTagSchema->pSchema[0];
|
||||
if (pTagSchema->nCols == 1 && pCol1->type == TSDB_DATA_TYPE_UBIGINT && strcmp(pCol1->name, "group_id") == 0) {
|
||||
pOutputInfo->tbSink.autoCreateCtb = true;
|
||||
} else {
|
||||
pOutputInfo->tbSink.autoCreateCtb = false;
|
||||
}
|
||||
}
|
||||
|
||||
return code;
|
||||
}
|
||||
|
||||
void tqSinkDataIntoDstTable(SStreamTask* pTask, void* vnode, void* data) {
|
||||
const SArray* pBlocks = (const SArray*)data;
|
||||
SVnode* pVnode = (SVnode*)vnode;
|
||||
|
@ -988,27 +1026,9 @@ void tqSinkDataIntoDstTable(SStreamTask* pTask, void* vnode, void* data) {
|
|||
int64_t earlyTs = tsdbGetEarliestTs(pVnode->pTsdb);
|
||||
STaskOutputInfo* pOutputInfo = &pTask->outputInfo;
|
||||
|
||||
if (pTask->outputInfo.tbSink.pTagSchema == NULL) {
|
||||
SMetaReader mer1 = {0};
|
||||
metaReaderDoInit(&mer1, pVnode->pMeta, META_READER_LOCK);
|
||||
|
||||
code = metaReaderGetTableEntryByUid(&mer1, pOutputInfo->tbSink.stbUid);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
tqError("s-task:%s vgId:%d failed to get the dst stable, failed to sink results", id, vgId);
|
||||
metaReaderClear(&mer1);
|
||||
return;
|
||||
}
|
||||
|
||||
pOutputInfo->tbSink.pTagSchema = tCloneSSchemaWrapper(&mer1.me.stbEntry.schemaTag);
|
||||
metaReaderClear(&mer1);
|
||||
|
||||
SSchemaWrapper* pTagSchema = pOutputInfo->tbSink.pTagSchema;
|
||||
SSchema* pCol1 = &pTagSchema->pSchema[0];
|
||||
if (pTagSchema->nCols == 1 && pCol1->type == TSDB_DATA_TYPE_UBIGINT && strcmp(pCol1->name, "group_id") == 0) {
|
||||
pOutputInfo->tbSink.autoCreateCtb = true;
|
||||
} else {
|
||||
pOutputInfo->tbSink.autoCreateCtb = false;
|
||||
}
|
||||
code = checkTagSchema(pTask, pVnode);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
return;
|
||||
}
|
||||
|
||||
bool onlySubmitData = hasOnlySubmitData(pBlocks, numOfBlocks);
|
||||
|
@ -1033,144 +1053,16 @@ void tqSinkDataIntoDstTable(SStreamTask* pTask, void* vnode, void* data) {
|
|||
} else if (pDataBlock->info.type == STREAM_CHECKPOINT) {
|
||||
continue;
|
||||
} else {
|
||||
pTask->execInfo.sink.numOfBlocks += 1;
|
||||
|
||||
SSubmitReq2 submitReq = {.aSubmitTbData = taosArrayInit(1, sizeof(SSubmitTbData))};
|
||||
if (submitReq.aSubmitTbData == NULL) {
|
||||
code = terrno;
|
||||
tqError("s-task:%s vgId:%d failed to prepare submit msg in sink task, code:%s", id, vgId, tstrerror(code));
|
||||
return;
|
||||
}
|
||||
|
||||
SSubmitTbData tbData = {.suid = suid, .uid = 0, .sver = pTSchema->version, .flags = TD_REQ_FROM_APP};
|
||||
code = setDstTableDataUid(pVnode, pTask, pDataBlock, stbFullName, &tbData);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
tqError("vgId:%d s-task:%s dst-table not exist, stb:%s discard stream results", vgId, id, stbFullName);
|
||||
continue;
|
||||
}
|
||||
|
||||
code = tqSetDstTableDataPayload(suid, pTSchema, i, pDataBlock, &tbData, earlyTs, id);
|
||||
if (code != TSDB_CODE_SUCCESS || tbData.aRowP == NULL) {
|
||||
if (tbData.pCreateTbReq != NULL) {
|
||||
tdDestroySVCreateTbReq(tbData.pCreateTbReq);
|
||||
(void) doRemoveFromCache(pTask->outputInfo.tbSink.pTblInfo, pDataBlock->info.id.groupId, id);
|
||||
tbData.pCreateTbReq = NULL;
|
||||
}
|
||||
continue;
|
||||
}
|
||||
|
||||
void* p = taosArrayPush(submitReq.aSubmitTbData, &tbData);
|
||||
if (p == NULL) {
|
||||
tqDebug("vgId:%d, s-task:%s failed to build submit msg, data lost", vgId, id);
|
||||
}
|
||||
|
||||
code = doBuildAndSendSubmitMsg(pVnode, pTask, &submitReq, 1);
|
||||
if (code) { // failed and continue
|
||||
tqDebug("vgId:%d, s-task:%s submit msg failed, data lost", vgId, id);
|
||||
}
|
||||
code = handleResultBlockMsg(pTask, pDataBlock, i, pVnode, earlyTs);
|
||||
}
|
||||
}
|
||||
} else {
|
||||
tqDebug("vgId:%d, s-task:%s write %d stream resBlock(s) into table, merge submit msg", vgId, id, numOfBlocks);
|
||||
SHashObj* pTableIndexMap =
|
||||
taosHashInit(numOfBlocks, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK);
|
||||
|
||||
SSubmitReq2 submitReq = {.aSubmitTbData = taosArrayInit(1, sizeof(SSubmitTbData))};
|
||||
if (submitReq.aSubmitTbData == NULL) {
|
||||
code = terrno;
|
||||
tqError("s-task:%s vgId:%d failed to prepare submit msg in sink task, code:%s", id, vgId, tstrerror(code));
|
||||
taosHashCleanup(pTableIndexMap);
|
||||
if (streamTaskShouldStop(pTask)) {
|
||||
return;
|
||||
}
|
||||
|
||||
bool hasSubmit = false;
|
||||
for (int32_t i = 0; i < numOfBlocks; i++) {
|
||||
if (streamTaskShouldStop(pTask)) {
|
||||
taosHashCleanup(pTableIndexMap);
|
||||
tDestroySubmitReq(&submitReq, TSDB_MSG_FLG_ENCODE);
|
||||
return;
|
||||
}
|
||||
|
||||
SSDataBlock* pDataBlock = taosArrayGet(pBlocks, i);
|
||||
if (pDataBlock == NULL) {
|
||||
continue;
|
||||
}
|
||||
|
||||
if (pDataBlock->info.type == STREAM_CHECKPOINT) {
|
||||
continue;
|
||||
}
|
||||
|
||||
hasSubmit = true;
|
||||
pTask->execInfo.sink.numOfBlocks += 1;
|
||||
uint64_t groupId = pDataBlock->info.id.groupId;
|
||||
|
||||
SSubmitTbData tbData = {.suid = suid, .uid = 0, .sver = pTSchema->version, .flags = TD_REQ_FROM_APP};
|
||||
|
||||
int32_t* index = taosHashGet(pTableIndexMap, &groupId, sizeof(groupId));
|
||||
if (index == NULL) { // no data yet, append it
|
||||
code = setDstTableDataUid(pVnode, pTask, pDataBlock, stbFullName, &tbData);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
tqError("vgId:%d dst-table gid:%" PRId64 " not exist, discard stream results", vgId, groupId);
|
||||
continue;
|
||||
}
|
||||
|
||||
code = tqSetDstTableDataPayload(suid, pTSchema, i, pDataBlock, &tbData, earlyTs, id);
|
||||
if (code != TSDB_CODE_SUCCESS || tbData.aRowP == NULL) {
|
||||
if (tbData.pCreateTbReq != NULL) {
|
||||
tdDestroySVCreateTbReq(tbData.pCreateTbReq);
|
||||
(void) doRemoveFromCache(pTask->outputInfo.tbSink.pTblInfo, groupId, id);
|
||||
tbData.pCreateTbReq = NULL;
|
||||
}
|
||||
continue;
|
||||
}
|
||||
|
||||
void* p = taosArrayPush(submitReq.aSubmitTbData, &tbData);
|
||||
if (p == NULL) {
|
||||
tqError("vgId:%d, s-task:%s failed to build submit msg, data lost", vgId, id);
|
||||
continue;
|
||||
}
|
||||
|
||||
int32_t size = (int32_t)taosArrayGetSize(submitReq.aSubmitTbData) - 1;
|
||||
code = taosHashPut(pTableIndexMap, &groupId, sizeof(groupId), &size, sizeof(size));
|
||||
if (code) {
|
||||
tqError("vgId:%d, s-task:%s failed to put group into index map, code:%s", vgId, id, tstrerror(code));
|
||||
continue;
|
||||
}
|
||||
} else {
|
||||
code = tqSetDstTableDataPayload(suid, pTSchema, i, pDataBlock, &tbData, earlyTs, id);
|
||||
if (code != TSDB_CODE_SUCCESS || tbData.aRowP == NULL) {
|
||||
if (tbData.pCreateTbReq != NULL) {
|
||||
tdDestroySVCreateTbReq(tbData.pCreateTbReq);
|
||||
tbData.pCreateTbReq = NULL;
|
||||
}
|
||||
continue;
|
||||
}
|
||||
|
||||
SSubmitTbData* pExisted = taosArrayGet(submitReq.aSubmitTbData, *index);
|
||||
if (pExisted == NULL) {
|
||||
continue;
|
||||
}
|
||||
|
||||
code = doMergeExistedRows(pExisted, &tbData, id);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
continue;
|
||||
}
|
||||
}
|
||||
|
||||
pTask->execInfo.sink.numOfRows += pDataBlock->info.rows;
|
||||
}
|
||||
|
||||
taosHashCleanup(pTableIndexMap);
|
||||
|
||||
if (hasSubmit) {
|
||||
code = doBuildAndSendSubmitMsg(pVnode, pTask, &submitReq, numOfBlocks);
|
||||
if (code) { // failed and continue
|
||||
tqError("vgId:%d failed to build and send submit msg", vgId);
|
||||
}
|
||||
} else {
|
||||
tDestroySubmitReq(&submitReq, TSDB_MSG_FLG_ENCODE);
|
||||
tqDebug("vgId:%d, s-task:%s write results completed", vgId, id);
|
||||
}
|
||||
reubuildAndSendMultiResBlock(pTask, pBlocks, pVnode, earlyTs);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -1190,7 +1082,7 @@ bool hasOnlySubmitData(const SArray* pBlocks, int32_t numOfBlocks) {
|
|||
return true;
|
||||
}
|
||||
|
||||
int32_t doPutIntoCache(SSHashObj* pSinkTableMap, STableSinkInfo* pTableSinkInfo, uint64_t groupId, const char* id) {
|
||||
int32_t doPutSinkTableInfoIntoCache(SSHashObj* pSinkTableMap, STableSinkInfo* pTableSinkInfo, uint64_t groupId, const char* id) {
|
||||
int32_t code = tSimpleHashPut(pSinkTableMap, &groupId, sizeof(uint64_t), &pTableSinkInfo, POINTER_BYTES);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
taosMemoryFreeClear(pTableSinkInfo);
|
||||
|
@ -1202,7 +1094,17 @@ int32_t doPutIntoCache(SSHashObj* pSinkTableMap, STableSinkInfo* pTableSinkInfo,
|
|||
return code;
|
||||
}
|
||||
|
||||
int32_t doRemoveFromCache(SSHashObj* pSinkTableMap, uint64_t groupId, const char* id) {
|
||||
bool doGetSinkTableInfoFromCache(SSHashObj* pTableInfoMap, uint64_t groupId, STableSinkInfo** pInfo) {
|
||||
void* pVal = tSimpleHashGet(pTableInfoMap, &groupId, sizeof(uint64_t));
|
||||
if (pVal) {
|
||||
*pInfo = *(STableSinkInfo**)pVal;
|
||||
return true;
|
||||
}
|
||||
|
||||
return false;
|
||||
}
|
||||
|
||||
int32_t doRemoveSinkTableInfoInCache(SSHashObj* pSinkTableMap, uint64_t groupId, const char* id) {
|
||||
if (tSimpleHashGetSize(pSinkTableMap) == 0) {
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
@ -1223,8 +1125,8 @@ int32_t doBuildAndSendDeleteMsg(SVnode* pVnode, char* stbFullName, SSDataBlock*
|
|||
return terrno;
|
||||
}
|
||||
|
||||
int32_t code = tqBuildDeleteReq(pVnode->pTq, stbFullName, pDataBlock, &deleteReq, pTask->id.idStr,
|
||||
pTask->ver >= SSTREAM_TASK_SUBTABLE_CHANGED_VER && pTask->subtableWithoutMd5 != 1);
|
||||
int32_t code =
|
||||
tqBuildDeleteReq(pVnode->pTq, stbFullName, pDataBlock, &deleteReq, pTask->id.idStr, IS_NEW_SUBTB_RULE(pTask));
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
return code;
|
||||
}
|
||||
|
@ -1262,3 +1164,155 @@ int32_t doBuildAndSendDeleteMsg(SVnode* pVnode, char* stbFullName, SSDataBlock*
|
|||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
void reubuildAndSendMultiResBlock(SStreamTask* pTask, const SArray* pBlocks, SVnode* pVnode, int64_t earlyTs) {
|
||||
int32_t code = 0;
|
||||
const char* id = pTask->id.idStr;
|
||||
int32_t vgId = pTask->pMeta->vgId;
|
||||
int32_t numOfBlocks = taosArrayGetSize(pBlocks);
|
||||
int64_t suid = pTask->outputInfo.tbSink.stbUid;
|
||||
STSchema* pTSchema = pTask->outputInfo.tbSink.pTSchema;
|
||||
char* stbFullName = pTask->outputInfo.tbSink.stbFullName;
|
||||
|
||||
SHashObj* pTableIndexMap =
|
||||
taosHashInit(numOfBlocks, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK);
|
||||
|
||||
SSubmitReq2 submitReq = {.aSubmitTbData = taosArrayInit(1, sizeof(SSubmitTbData))};
|
||||
if (submitReq.aSubmitTbData == NULL) {
|
||||
code = terrno;
|
||||
tqError("s-task:%s vgId:%d failed to prepare submit msg in sink task, code:%s", id, vgId, tstrerror(code));
|
||||
taosHashCleanup(pTableIndexMap);
|
||||
return;
|
||||
}
|
||||
|
||||
bool hasSubmit = false;
|
||||
for (int32_t i = 0; i < numOfBlocks; i++) {
|
||||
SSDataBlock* pDataBlock = taosArrayGet(pBlocks, i);
|
||||
if (pDataBlock == NULL) {
|
||||
continue;
|
||||
}
|
||||
|
||||
if (pDataBlock->info.type == STREAM_CHECKPOINT) {
|
||||
continue;
|
||||
}
|
||||
|
||||
hasSubmit = true;
|
||||
pTask->execInfo.sink.numOfBlocks += 1;
|
||||
uint64_t groupId = pDataBlock->info.id.groupId;
|
||||
|
||||
SSubmitTbData tbData = {.suid = suid, .uid = 0, .sver = pTSchema->version, .flags = TD_REQ_FROM_APP};
|
||||
|
||||
int32_t* index = taosHashGet(pTableIndexMap, &groupId, sizeof(groupId));
|
||||
if (index == NULL) { // no data yet, append it
|
||||
code = setDstTableDataUid(pVnode, pTask, pDataBlock, stbFullName, &tbData);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
tqError("vgId:%d dst-table gid:%" PRId64 " not exist, discard stream results", vgId, groupId);
|
||||
continue;
|
||||
}
|
||||
|
||||
code = tqSetDstTableDataPayload(suid, pTSchema, i, pDataBlock, &tbData, earlyTs, id);
|
||||
if (code != TSDB_CODE_SUCCESS || tbData.aRowP == NULL) {
|
||||
if (tbData.pCreateTbReq != NULL) {
|
||||
tdDestroySVCreateTbReq(tbData.pCreateTbReq);
|
||||
(void)doRemoveSinkTableInfoInCache(pTask->outputInfo.tbSink.pTbInfo, groupId, id);
|
||||
tbData.pCreateTbReq = NULL;
|
||||
}
|
||||
continue;
|
||||
}
|
||||
|
||||
void* p = taosArrayPush(submitReq.aSubmitTbData, &tbData);
|
||||
if (p == NULL) {
|
||||
tqError("vgId:%d, s-task:%s failed to build submit msg, data lost", vgId, id);
|
||||
continue;
|
||||
}
|
||||
|
||||
int32_t size = (int32_t)taosArrayGetSize(submitReq.aSubmitTbData) - 1;
|
||||
code = taosHashPut(pTableIndexMap, &groupId, sizeof(groupId), &size, sizeof(size));
|
||||
if (code) {
|
||||
tqError("vgId:%d, s-task:%s failed to put group into index map, code:%s", vgId, id, tstrerror(code));
|
||||
continue;
|
||||
}
|
||||
} else {
|
||||
code = tqSetDstTableDataPayload(suid, pTSchema, i, pDataBlock, &tbData, earlyTs, id);
|
||||
if (code != TSDB_CODE_SUCCESS || tbData.aRowP == NULL) {
|
||||
if (tbData.pCreateTbReq != NULL) {
|
||||
tdDestroySVCreateTbReq(tbData.pCreateTbReq);
|
||||
tbData.pCreateTbReq = NULL;
|
||||
}
|
||||
continue;
|
||||
}
|
||||
|
||||
SSubmitTbData* pExisted = taosArrayGet(submitReq.aSubmitTbData, *index);
|
||||
if (pExisted == NULL) {
|
||||
continue;
|
||||
}
|
||||
|
||||
code = doMergeExistedRows(pExisted, &tbData, id);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
continue;
|
||||
}
|
||||
}
|
||||
|
||||
pTask->execInfo.sink.numOfRows += pDataBlock->info.rows;
|
||||
}
|
||||
|
||||
taosHashCleanup(pTableIndexMap);
|
||||
|
||||
if (hasSubmit) {
|
||||
code = doBuildAndSendSubmitMsg(pVnode, pTask, &submitReq, numOfBlocks);
|
||||
if (code) { // failed and continue
|
||||
tqError("vgId:%d failed to build and send submit msg", vgId);
|
||||
}
|
||||
} else {
|
||||
tDestroySubmitReq(&submitReq, TSDB_MSG_FLG_ENCODE);
|
||||
tqDebug("vgId:%d, s-task:%s write results completed", vgId, id);
|
||||
}
|
||||
}
|
||||
|
||||
int32_t handleResultBlockMsg(SStreamTask* pTask, SSDataBlock* pDataBlock, int32_t index, SVnode* pVnode, int64_t earlyTs) {
|
||||
int32_t code = 0;
|
||||
STSchema* pTSchema = pTask->outputInfo.tbSink.pTSchema;
|
||||
int64_t suid = pTask->outputInfo.tbSink.stbUid;
|
||||
const char* id = pTask->id.idStr;
|
||||
int32_t vgId = TD_VID(pVnode);
|
||||
char* stbFullName = pTask->outputInfo.tbSink.stbFullName;
|
||||
|
||||
pTask->execInfo.sink.numOfBlocks += 1;
|
||||
|
||||
SSubmitReq2 submitReq = {.aSubmitTbData = taosArrayInit(1, sizeof(SSubmitTbData))};
|
||||
if (submitReq.aSubmitTbData == NULL) {
|
||||
tqError("s-task:%s vgId:%d failed to prepare submit msg in sink task, code:%s", id, vgId, tstrerror(terrno));
|
||||
return terrno;
|
||||
}
|
||||
|
||||
SSubmitTbData tbData = {.suid = suid, .uid = 0, .sver = pTSchema->version, .flags = TD_REQ_FROM_APP};
|
||||
code = setDstTableDataUid(pVnode, pTask, pDataBlock, stbFullName, &tbData);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
tqError("vgId:%d s-task:%s dst-table not exist, stb:%s discard stream results", vgId, id, stbFullName);
|
||||
return code;
|
||||
}
|
||||
|
||||
code = tqSetDstTableDataPayload(suid, pTSchema, index, pDataBlock, &tbData, earlyTs, id);
|
||||
if (code != TSDB_CODE_SUCCESS || tbData.aRowP == NULL) {
|
||||
if (tbData.pCreateTbReq != NULL) {
|
||||
tdDestroySVCreateTbReq(tbData.pCreateTbReq);
|
||||
(void)doRemoveSinkTableInfoInCache(pTask->outputInfo.tbSink.pTbInfo, pDataBlock->info.id.groupId, id);
|
||||
tbData.pCreateTbReq = NULL;
|
||||
}
|
||||
|
||||
return code;
|
||||
}
|
||||
|
||||
void* p = taosArrayPush(submitReq.aSubmitTbData, &tbData);
|
||||
if (p == NULL) {
|
||||
tqDebug("vgId:%d, s-task:%s failed to build submit msg, code:%s, data lost", vgId, id, tstrerror(terrno));
|
||||
return terrno;
|
||||
}
|
||||
|
||||
code = doBuildAndSendSubmitMsg(pVnode, pTask, &submitReq, 1);
|
||||
if (code) { // failed and continue
|
||||
tqDebug("vgId:%d, s-task:%s submit msg failed, code:%s data lost", vgId, id, tstrerror(code));
|
||||
}
|
||||
|
||||
return code;
|
||||
}
|
||||
|
|
|
@ -224,19 +224,18 @@ int32_t tBucketDoubleHash(tMemBucket *pBucket, const void *value, int32_t *index
|
|||
|
||||
*index = -1;
|
||||
|
||||
if (v > pBucket->range.dMaxVal || v < pBucket->range.dMinVal) {
|
||||
if (v > pBucket->range.dMaxVal || v < pBucket->range.dMinVal || isnan(v)) {
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
// divide a range of [dMinVal, dMaxVal] into 1024 buckets
|
||||
double span = pBucket->range.dMaxVal - pBucket->range.dMinVal;
|
||||
if (span < pBucket->numOfSlots) {
|
||||
int32_t delta = (int32_t)(v - pBucket->range.dMinVal);
|
||||
*index = (delta % pBucket->numOfSlots);
|
||||
if (fabs(span) < DBL_EPSILON) {
|
||||
*index = 0;
|
||||
} else {
|
||||
double slotSpan = span / pBucket->numOfSlots;
|
||||
*index = (int32_t)((v - pBucket->range.dMinVal) / slotSpan);
|
||||
if (v == pBucket->range.dMaxVal) {
|
||||
if (fabs(v - pBucket->range.dMaxVal) < DBL_EPSILON) {
|
||||
*index -= 1;
|
||||
}
|
||||
}
|
||||
|
@ -583,48 +582,52 @@ int32_t getPercentileImpl(tMemBucket *pMemBucket, int32_t count, double fraction
|
|||
*result = getIdenticalDataVal(pMemBucket, i);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
// try next round
|
||||
pMemBucket->times += 1;
|
||||
// qDebug("MemBucket:%p, start next round data bucketing, time:%d", pMemBucket, pMemBucket->times);
|
||||
|
||||
pMemBucket->range = pSlot->range;
|
||||
pMemBucket->total = 0;
|
||||
|
||||
resetSlotInfo(pMemBucket);
|
||||
|
||||
int32_t groupId = getGroupId(pMemBucket->numOfSlots, i, pMemBucket->times - 1);
|
||||
tMemBucket *tmpBucket = NULL;
|
||||
int32_t code = tMemBucketCreate(pMemBucket->bytes, pMemBucket->type, pSlot->range.dMinVal, pSlot->range.dMaxVal,
|
||||
false, &tmpBucket);
|
||||
if (TSDB_CODE_SUCCESS != code) {
|
||||
tMemBucketDestroy(&tmpBucket);
|
||||
return code;
|
||||
}
|
||||
int32_t groupId = getGroupId(pMemBucket->numOfSlots, i, pMemBucket->times);
|
||||
|
||||
SArray* list;
|
||||
void *p = taosHashGet(pMemBucket->groupPagesMap, &groupId, sizeof(groupId));
|
||||
if (p != NULL) {
|
||||
list = *(SArray **)p;
|
||||
if (list == NULL || list->size <= 0) {
|
||||
tMemBucketDestroy(&tmpBucket);
|
||||
return -1;
|
||||
}
|
||||
} else {
|
||||
tMemBucketDestroy(&tmpBucket);
|
||||
return -1;
|
||||
}
|
||||
|
||||
for (int32_t f = 0; f < list->size; ++f) {
|
||||
int32_t *pageId = taosArrayGet(list, f);
|
||||
if (NULL == pageId) {
|
||||
tMemBucketDestroy(&tmpBucket);
|
||||
return TSDB_CODE_OUT_OF_RANGE;
|
||||
}
|
||||
SFilePage *pg = getBufPage(pMemBucket->pBuffer, *pageId);
|
||||
if (pg == NULL) {
|
||||
tMemBucketDestroy(&tmpBucket);
|
||||
return terrno;
|
||||
}
|
||||
|
||||
int32_t code = tMemBucketPut(pMemBucket, pg->data, (int32_t)pg->num);
|
||||
code = tMemBucketPut(tmpBucket, pg->data, (int32_t)pg->num);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
tMemBucketDestroy(&tmpBucket);
|
||||
return code;
|
||||
}
|
||||
setBufPageDirty(pg, true);
|
||||
releaseBufPage(pMemBucket->pBuffer, pg);
|
||||
}
|
||||
|
||||
return getPercentileImpl(pMemBucket, count - num, fraction, result);
|
||||
code = getPercentileImpl(tmpBucket, count - num, fraction, result);
|
||||
tMemBucketDestroy(&tmpBucket);
|
||||
return code;
|
||||
}
|
||||
} else {
|
||||
num += pSlot->info.size;
|
||||
|
|
|
@ -887,7 +887,8 @@ _err:
|
|||
}
|
||||
|
||||
static int32_t addParamToLogicConditionNode(SLogicConditionNode* pCond, SNode* pParam) {
|
||||
if (QUERY_NODE_LOGIC_CONDITION == nodeType(pParam) && pCond->condType == ((SLogicConditionNode*)pParam)->condType) {
|
||||
if (QUERY_NODE_LOGIC_CONDITION == nodeType(pParam) && pCond->condType == ((SLogicConditionNode*)pParam)->condType &&
|
||||
((SLogicConditionNode*)pParam)->condType != LOGIC_COND_TYPE_NOT) {
|
||||
int32_t code = nodesListAppendList(pCond->pParameterList, ((SLogicConditionNode*)pParam)->pParameterList);
|
||||
((SLogicConditionNode*)pParam)->pParameterList = NULL;
|
||||
nodesDestroyNode(pParam);
|
||||
|
|
|
@ -4679,6 +4679,9 @@ EDealRes fltReviseRewriter(SNode **pNode, void *pContext) {
|
|||
|
||||
cell = cell->pNext;
|
||||
}
|
||||
if (node->condType == LOGIC_COND_TYPE_NOT) {
|
||||
stat->scalarMode = true;
|
||||
}
|
||||
|
||||
return DEAL_RES_CONTINUE;
|
||||
}
|
||||
|
|
|
@ -291,7 +291,7 @@ void tFreeStreamTask(SStreamTask* pTask) {
|
|||
if (pTask->outputInfo.type == TASK_OUTPUT__TABLE) {
|
||||
tDeleteSchemaWrapper(pTask->outputInfo.tbSink.pSchemaWrapper);
|
||||
taosMemoryFree(pTask->outputInfo.tbSink.pTSchema);
|
||||
tSimpleHashCleanup(pTask->outputInfo.tbSink.pTblInfo);
|
||||
tSimpleHashCleanup(pTask->outputInfo.tbSink.pTbInfo);
|
||||
tDeleteSchemaWrapper(pTask->outputInfo.tbSink.pTagSchema);
|
||||
} else if (pTask->outputInfo.type == TASK_OUTPUT__SHUFFLE_DISPATCH) {
|
||||
taosArrayDestroy(pTask->outputInfo.shuffleDispatcher.dbInfo.pVgroupInfos);
|
||||
|
|
|
@ -225,6 +225,7 @@ python3 test.py -f query/distinctOneColTb.py
|
|||
python3 ./test.py -f query/filter.py
|
||||
python3 ./test.py -f query/filterCombo.py
|
||||
python3 ./test.py -f query/queryNormal.py
|
||||
python3 ./test.py -f query/not.py
|
||||
python3 ./test.py -f query/queryError.py
|
||||
python3 ./test.py -f query/filterAllIntTypes.py
|
||||
python3 ./test.py -f query/filterFloatAndDouble.py
|
||||
|
|
|
@ -139,6 +139,7 @@ python3 ./test.py -f query/querySort.py
|
|||
python3 ./test.py -f query/queryJoin.py
|
||||
python3 ./test.py -f query/filterCombo.py
|
||||
python3 ./test.py -f query/queryNormal.py
|
||||
python3 ./test.py -f query/not.py
|
||||
python3 ./test.py -f query/select_last_crash.py
|
||||
python3 ./test.py -f query/queryNullValueTest.py
|
||||
python3 ./test.py -f query/queryInsertValue.py
|
||||
|
|
|
@ -0,0 +1,132 @@
|
|||
from wsgiref.headers import tspecials
|
||||
from util.log import *
|
||||
from util.cases import *
|
||||
from util.sql import *
|
||||
from util.common import tdCom
|
||||
import numpy as np
|
||||
|
||||
|
||||
class TDTestCase:
|
||||
def init(self, conn, logSql, replicaVar=1):
|
||||
self.replicaVar = int(replicaVar)
|
||||
tdLog.debug("start to execute %s" % __file__)
|
||||
tdSql.init(conn.cursor())
|
||||
|
||||
self.dbname = "db"
|
||||
self.rowNum = 10
|
||||
self.ts = 1537146000000
|
||||
|
||||
def notConditionTest(self):
|
||||
dbname = "nottest"
|
||||
stbname = "st1"
|
||||
|
||||
tdsql = tdCom.newTdSql()
|
||||
tdsql.execute(f"create database if not exists {dbname}")
|
||||
|
||||
stype = ["INT", "INT UNSIGNED", "BIGINT", "BIGINT UNSIGNED", "DOUBLE", "FLOAT", "SMALLINT", "SMALLINT UNSIGNED", "TINYINT", "TINYINT UNSIGNED"]
|
||||
|
||||
for type_name in stype:
|
||||
tdsql.execute(f"drop table if exists {dbname}.{stbname}")
|
||||
tdsql.execute(f"create table if not exists {dbname}.{stbname} (ts timestamp, v1 {type_name}) tags(t1 {type_name})")
|
||||
tdsql.execute(f"insert into {dbname}.sub_1 using {dbname}.{stbname} tags(1) values({self.ts}, 10)")
|
||||
tdsql.execute(f"insert into {dbname}.sub_2 using {dbname}.{stbname} tags(2) values({self.ts + 1000}, 20)")
|
||||
tdsql.execute(f"insert into {dbname}.sub_3 using {dbname}.{stbname} tags(3) values({self.ts + 2000}, 30)")
|
||||
|
||||
# Test case 1: NOT IN
|
||||
tdsql.query(f"select t1, * from {dbname}.{stbname} where t1 not in (1, 2) order by t1")
|
||||
tdsql.checkRows(1)
|
||||
tdsql.checkData(0, 0, 3)
|
||||
|
||||
# Test case 2: NOT BETWEEN
|
||||
tdsql.query(f"select * from {dbname}.{stbname} where v1 not between 10 and 20 order by t1")
|
||||
tdsql.checkRows(1)
|
||||
tdsql.checkData(0, 1, 30)
|
||||
tdsql.query(f"select * from {dbname}.{stbname} where not(v1 not between 10 and 20) order by t1")
|
||||
tdsql.checkRows(2)
|
||||
|
||||
# Test case 4: NOT EQUAL
|
||||
tdsql.query(f"select * from {dbname}.{stbname} where v1 != 20 order by t1")
|
||||
tdsql.checkRows(2)
|
||||
tdsql.checkData(0, 1, 10)
|
||||
tdsql.checkData(1, 1, 30)
|
||||
|
||||
# Test case 8: NOT (v1 < 20 OR v1 > 30)
|
||||
tdsql.query(f"select * from {dbname}.{stbname} where not (v1 < 20 or v1 > 30) order by t1")
|
||||
tdsql.checkRows(2)
|
||||
tdsql.checkData(0, 1, 20)
|
||||
tdsql.checkData(1, 1, 30)
|
||||
|
||||
tdsql.query(f"select * from {dbname}.{stbname} where not (v1 < 20 or v1 >= 30) order by t1")
|
||||
tdsql.checkRows(1)
|
||||
|
||||
# Test case 9: NOT (t1 != 1)
|
||||
tdsql.query(f"select * from {dbname}.{stbname} where not (t1 != 1) order by t1")
|
||||
tdsql.checkRows(1)
|
||||
tdsql.checkData(0, 1, 10)
|
||||
|
||||
tdsql.query(f"select * from {dbname}.{stbname} where (t1 != 1) or not (v1 == 20) order by t1")
|
||||
tdsql.checkRows(3)
|
||||
tdsql.checkData(0, 1, 10)
|
||||
tdsql.checkData(1, 1, 20)
|
||||
tdsql.checkData(2, 1, 30)
|
||||
|
||||
tdsql.query(f"select * from {dbname}.{stbname} where not((t1 != 1) or not (v1 == 20)) order by t1")
|
||||
tdsql.checkRows(0)
|
||||
|
||||
tdsql.query(f"select * from {dbname}.{stbname} where not (t1 != 1) and not (v1 != 20) order by t1")
|
||||
tdsql.checkRows(0)
|
||||
|
||||
tdsql.query(f"select * from {dbname}.{stbname} where not(not (t1 != 1) and not (v1 != 20)) order by t1")
|
||||
tdsql.checkRows(3)
|
||||
|
||||
tdsql.query(f"select * from {dbname}.{stbname} where not (t1 != 1) and not (v1 != 10) order by t1")
|
||||
tdsql.checkRows(1)
|
||||
tdsql.checkData(0, 1, 10)
|
||||
|
||||
tdsql.query(f"select * from {dbname}.{stbname} where not (t1 > 2) order by t1")
|
||||
tdsql.checkRows(2)
|
||||
tdsql.checkData(0, 1, 10)
|
||||
tdsql.checkData(1, 1, 20)
|
||||
|
||||
tdsql.query(f"select * from {dbname}.{stbname} where not (t1 == 2) order by t1")
|
||||
tdsql.checkRows(2)
|
||||
tdsql.checkData(0, 1, 10)
|
||||
tdsql.checkData(1, 1, 30)
|
||||
|
||||
tdsql.query(f"select * from {dbname}.{stbname} where not (v1 > 10 and v1 < 30) order by t1")
|
||||
tdsql.checkRows(2)
|
||||
tdsql.checkData(0, 1, 10)
|
||||
tdsql.checkData(1, 1, 30)
|
||||
|
||||
tdsql.query(f"select * from {dbname}.{stbname} where not(not (v1 < 20 or v1 > 30)) order by t1")
|
||||
tdsql.checkRows(1)
|
||||
|
||||
tdsql.query(f"select * from {dbname}.{stbname} where not(not (v1 < 20 or v1 >= 30)) order by t1")
|
||||
tdsql.checkRows(2)
|
||||
|
||||
tdsql.query(f"select * from {dbname}.{stbname} where not(not (t1 != 1)) order by t1")
|
||||
tdsql.checkRows(2)
|
||||
|
||||
tdsql.query(f"select * from {dbname}.{stbname} where not(not (t1 > 2)) order by t1")
|
||||
tdsql.checkRows(1)
|
||||
|
||||
tdsql.query(f"select * from {dbname}.{stbname} where not(not (t1 == 2)) order by t1")
|
||||
tdsql.checkRows(1)
|
||||
|
||||
tdsql.query(f"select * from {dbname}.{stbname} where not(not (v1 > 10 and v1 < 30)) order by t1")
|
||||
tdsql.checkRows(1)
|
||||
|
||||
def run(self):
|
||||
dbname = "db"
|
||||
tdSql.prepare()
|
||||
|
||||
self.notConditionTest()
|
||||
|
||||
|
||||
def stop(self):
|
||||
tdSql.close()
|
||||
tdLog.success("%s successfully executed" % __file__)
|
||||
|
||||
tdCases.addWindows(__file__, TDTestCase())
|
||||
|
||||
tdCases.addLinux(__file__, TDTestCase())
|
|
@ -245,6 +245,8 @@ python3 ./test.py -f 2-query/min.py -P
|
|||
python3 ./test.py -f 2-query/min.py -P -R
|
||||
python3 ./test.py -f 2-query/normal.py -P
|
||||
python3 ./test.py -f 2-query/normal.py -P -R
|
||||
python3 ./test.py -f 2-query/not.py -P
|
||||
python3 ./test.py -f 2-query/not.py -P -R
|
||||
python3 ./test.py -f 2-query/mode.py -P
|
||||
python3 ./test.py -f 2-query/mode.py -P -R
|
||||
python3 ./test.py -f 2-query/Now.py -P
|
||||
|
@ -427,6 +429,7 @@ python3 ./test.py -f 2-query/Today.py -P -Q 2
|
|||
python3 ./test.py -f 2-query/max.py -P -Q 2
|
||||
python3 ./test.py -f 2-query/min.py -P -Q 2
|
||||
python3 ./test.py -f 2-query/normal.py -P -Q 2
|
||||
python3 ./test.py -f 2-query/not.py -P -Q 2
|
||||
python3 ./test.py -f 2-query/mode.py -P -Q 2
|
||||
python3 ./test.py -f 2-query/count.py -P -Q 2
|
||||
python3 ./test.py -f 2-query/countAlwaysReturnValue.py -P -Q 2
|
||||
|
@ -526,6 +529,7 @@ python3 ./test.py -f 2-query/Today.py -P -Q 3
|
|||
python3 ./test.py -f 2-query/max.py -P -Q 3
|
||||
python3 ./test.py -f 2-query/min.py -P -Q 3
|
||||
python3 ./test.py -f 2-query/normal.py -P -Q 3
|
||||
python3 ./test.py -f 2-query/not.py -P -Q 3
|
||||
python3 ./test.py -f 2-query/mode.py -P -Q 3
|
||||
python3 ./test.py -f 2-query/count.py -P -Q 3
|
||||
python3 ./test.py -f 2-query/countAlwaysReturnValue.py -P -Q 3
|
||||
|
@ -624,6 +628,7 @@ python3 ./test.py -f 2-query/Today.py -P -Q 4
|
|||
python3 ./test.py -f 2-query/max.py -P -Q 4
|
||||
python3 ./test.py -f 2-query/min.py -P -Q 4
|
||||
python3 ./test.py -f 2-query/normal.py -P -Q 4
|
||||
python3 ./test.py -f 2-query/not.py -P -Q 4
|
||||
python3 ./test.py -f 2-query/mode.py -P -Q 4
|
||||
python3 ./test.py -f 2-query/count.py -P -Q 4
|
||||
python3 ./test.py -f 2-query/countAlwaysReturnValue.py -P -Q 4
|
||||
|
|
Loading…
Reference in New Issue