Merge branch 'main' into enh/TD-22404-M

This commit is contained in:
kailixu 2023-05-16 15:17:42 +08:00
commit 9faefb7498
18 changed files with 305 additions and 172 deletions

View File

@ -52,7 +52,7 @@ TDengine 还提供一组辅助工具软件 taosTools目前它包含 taosBench
### Ubuntu 18.04 及以上版本 & Debian
```bash
sudo apt-get install -y gcc cmake build-essential git libssl-dev
sudo apt-get install -y gcc cmake build-essential git libssl-dev libgflags2.2 libgflags-dev
```
#### 为 taos-tools 安装编译需要的软件

View File

@ -60,7 +60,7 @@ To build TDengine, use [CMake](https://cmake.org/) 3.0.2 or higher versions in t
### Ubuntu 18.04 and above or Debian
```bash
sudo apt-get install -y gcc cmake build-essential git libssl-dev
sudo apt-get install -y gcc cmake build-essential git libssl-dev libgflags2.2 libgflags-dev
```
#### Install build dependencies for taosTools

View File

@ -2,7 +2,7 @@
# taosadapter
ExternalProject_Add(taosadapter
GIT_REPOSITORY https://github.com/taosdata/taosadapter.git
GIT_TAG ae8d51c
GIT_TAG 565ca21
SOURCE_DIR "${TD_SOURCE_DIR}/tools/taosadapter"
BINARY_DIR ""
#BUILD_IN_SOURCE TRUE

View File

@ -2742,7 +2742,7 @@ void mndExtractTbNameFromStbFullName(const char *stbFullName, char *dst, int32_t
// varDataSetLen(stbName, strlen(&stbName[VARSTR_HEADER_SIZE]));
//
// SColumnInfoData *pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
// colDataAppend(pColInfo, numOfRows, (const char *)stbName, false);
// colDataSetVal(pColInfo, numOfRows, (const char *)stbName, false);
//
// char db[TSDB_DB_NAME_LEN + VARSTR_HEADER_SIZE] = {0};
// tNameFromString(&name, pStb->db, T_NAME_ACCT | T_NAME_DB);
@ -2750,29 +2750,29 @@ void mndExtractTbNameFromStbFullName(const char *stbFullName, char *dst, int32_t
// varDataSetLen(db, strlen(varDataVal(db)));
//
// pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
// colDataAppend(pColInfo, numOfRows, (const char *)db, false);
// colDataSetVal(pColInfo, numOfRows, (const char *)db, false);
//
// pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
// colDataAppend(pColInfo, numOfRows, (const char *)&pStb->createdTime, false);
// colDataSetVal(pColInfo, numOfRows, (const char *)&pStb->createdTime, false);
//
// pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
// colDataAppend(pColInfo, numOfRows, (const char *)&pStb->numOfColumns, false);
// colDataSetVal(pColInfo, numOfRows, (const char *)&pStb->numOfColumns, false);
//
// pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
// colDataAppend(pColInfo, numOfRows, (const char *)&pStb->numOfTags, false);
// colDataSetVal(pColInfo, numOfRows, (const char *)&pStb->numOfTags, false);
//
// pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
// colDataAppend(pColInfo, numOfRows, (const char *)&pStb->updateTime, false); // number of tables
// colDataSetVal(pColInfo, numOfRows, (const char *)&pStb->updateTime, false); // number of tables
//
// pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
// if (pStb->commentLen > 0) {
// char comment[TSDB_TB_COMMENT_LEN + VARSTR_HEADER_SIZE] = {0};
// STR_TO_VARSTR(comment, pStb->comment);
// colDataAppend(pColInfo, numOfRows, comment, false);
// colDataSetVal(pColInfo, numOfRows, comment, false);
// } else if (pStb->commentLen == 0) {
// char comment[VARSTR_HEADER_SIZE + VARSTR_HEADER_SIZE] = {0};
// STR_TO_VARSTR(comment, "");
// colDataAppend(pColInfo, numOfRows, comment, false);
// colDataSetVal(pColInfo, numOfRows, comment, false);
// } else {
// colDataSetNULL(pColInfo, numOfRows);
// }
@ -2782,14 +2782,14 @@ void mndExtractTbNameFromStbFullName(const char *stbFullName, char *dst, int32_t
// varDataSetLen(watermark, strlen(varDataVal(watermark)));
//
// pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
// colDataAppend(pColInfo, numOfRows, (const char *)watermark, false);
// colDataSetVal(pColInfo, numOfRows, (const char *)watermark, false);
//
// char maxDelay[64 + VARSTR_HEADER_SIZE] = {0};
// sprintf(varDataVal(maxDelay), "%" PRId64 "a,%" PRId64 "a", pStb->maxdelay[0], pStb->maxdelay[1]);
// varDataSetLen(maxDelay, strlen(varDataVal(maxDelay)));
//
// pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
// colDataAppend(pColInfo, numOfRows, (const char *)maxDelay, false);
// colDataSetVal(pColInfo, numOfRows, (const char *)maxDelay, false);
//
// char rollup[160 + VARSTR_HEADER_SIZE] = {0};
// int32_t rollupNum = (int32_t)taosArrayGetSize(pStb->pFuncs);
@ -2808,7 +2808,7 @@ void mndExtractTbNameFromStbFullName(const char *stbFullName, char *dst, int32_t
// varDataSetLen(rollup, strlen(varDataVal(rollup)));
//
// pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
// colDataAppend(pColInfo, numOfRows, (const char *)rollup, false);
// colDataSetVal(pColInfo, numOfRows, (const char *)rollup, false);
//
// numOfRows++;
// sdbRelease(pSdb, pStb);
@ -3067,20 +3067,20 @@ static int32_t buildDbColsInfoBlock(const SSDataBlock *p, const SSysTableMeta *p
for (int32_t j = 0; j < pm->colNum; j++) {
// table name
SColumnInfoData *pColInfoData = taosArrayGet(p->pDataBlock, 0);
colDataAppend(pColInfoData, numOfRows, tName, false);
colDataSetVal(pColInfoData, numOfRows, tName, false);
// database name
pColInfoData = taosArrayGet(p->pDataBlock, 1);
colDataAppend(pColInfoData, numOfRows, dName, false);
colDataSetVal(pColInfoData, numOfRows, dName, false);
pColInfoData = taosArrayGet(p->pDataBlock, 2);
colDataAppend(pColInfoData, numOfRows, typeName, false);
colDataSetVal(pColInfoData, numOfRows, typeName, false);
// col name
char colName[TSDB_COL_NAME_LEN + VARSTR_HEADER_SIZE] = {0};
STR_TO_VARSTR(colName, pm->schema[j].name);
pColInfoData = taosArrayGet(p->pDataBlock, 3);
colDataAppend(pColInfoData, numOfRows, colName, false);
colDataSetVal(pColInfoData, numOfRows, colName, false);
// col type
int8_t colType = pm->schema[j].type;
@ -3095,10 +3095,10 @@ static int32_t buildDbColsInfoBlock(const SSDataBlock *p, const SSysTableMeta *p
(int32_t)((pm->schema[j].bytes - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE));
}
varDataSetLen(colTypeStr, colTypeLen);
colDataAppend(pColInfoData, numOfRows, (char *)colTypeStr, false);
colDataSetVal(pColInfoData, numOfRows, (char *)colTypeStr, false);
pColInfoData = taosArrayGet(p->pDataBlock, 5);
colDataAppend(pColInfoData, numOfRows, (const char *)&pm->schema[j].bytes, false);
colDataSetVal(pColInfoData, numOfRows, (const char *)&pm->schema[j].bytes, false);
for (int32_t k = 6; k <= 8; ++k) {
pColInfoData = taosArrayGet(p->pDataBlock, k);
colDataSetNULL(pColInfoData, numOfRows);
@ -3192,19 +3192,19 @@ static int32_t mndRetrieveStbCol(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pB
for (int i = 0; i < pStb->numOfColumns; i++) {
int32_t cols = 0;
SColumnInfoData *pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataAppend(pColInfo, numOfRows, (const char *)stbName, false);
colDataSetVal(pColInfo, numOfRows, (const char *)stbName, false);
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataAppend(pColInfo, numOfRows, (const char *)db, false);
colDataSetVal(pColInfo, numOfRows, (const char *)db, false);
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataAppend(pColInfo, numOfRows, typeName, false);
colDataSetVal(pColInfo, numOfRows, typeName, false);
// col name
char colName[TSDB_COL_NAME_LEN + VARSTR_HEADER_SIZE] = {0};
STR_TO_VARSTR(colName, pStb->pColumns[i].name);
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataAppend(pColInfo, numOfRows, colName, false);
colDataSetVal(pColInfo, numOfRows, colName, false);
// col type
int8_t colType = pStb->pColumns[i].type;
@ -3219,10 +3219,10 @@ static int32_t mndRetrieveStbCol(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pB
(int32_t)((pStb->pColumns[i].bytes - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE));
}
varDataSetLen(colTypeStr, colTypeLen);
colDataAppend(pColInfo, numOfRows, (char *)colTypeStr, false);
colDataSetVal(pColInfo, numOfRows, (char *)colTypeStr, false);
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataAppend(pColInfo, numOfRows, (const char *)&pStb->pColumns[i].bytes, false);
colDataSetVal(pColInfo, numOfRows, (const char *)&pStb->pColumns[i].bytes, false);
while (cols < pShow->numOfColumns) {
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataSetNULL(pColInfo, numOfRows);

View File

@ -102,6 +102,7 @@ typedef struct {
STqExecHandle execHandle; // exec
SRpcMsg* msg;
int32_t noDataPollCnt;
int8_t exec;
} STqHandle;
typedef struct {
@ -184,6 +185,7 @@ int32_t tqStreamTasksScanWal(STQ* pTq);
char* createStreamTaskIdStr(int64_t streamId, int32_t taskId);
int32_t tqAddInputBlockNLaunchTask(SStreamTask* pTask, SStreamQueueItem* pQueueItem, int64_t ver);
int32_t tqExtractDataForMq(STQ* pTq, STqHandle* pHandle, const SMqPollReq* pRequest, SRpcMsg* pMsg);
bool tqIsHandleExecuting(STqHandle* pHandle);
#ifdef __cplusplus
}

View File

@ -379,15 +379,10 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) {
int32_t tqProcessDeleteSubReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) {
SMqVDeleteReq* pReq = (SMqVDeleteReq*)msg;
int32_t vgId = TD_VID(pTq->pVnode);
tqDebug("vgId:%d, tq process delete sub req %s", pTq->pVnode->config.vgId, pReq->subKey);
int32_t code = 0;
// taosWLockLatch(&pTq->lock);
// int32_t code = taosHashRemove(pTq->pPushMgr, pReq->subKey, strlen(pReq->subKey));
// if (code != 0) {
// tqDebug("vgId:%d, tq remove push handle %s", pTq->pVnode->config.vgId, pReq->subKey);
// }
// taosWUnLockLatch(&pTq->lock);
STqHandle* pHandle = taosHashGet(pTq->pHandle, pReq->subKey, strlen(pReq->subKey));
if (pHandle) {
@ -395,6 +390,12 @@ int32_t tqProcessDeleteSubReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg
if (pHandle->pRef) {
walCloseRef(pTq->pVnode->pWal, pHandle->pRef->refId);
}
while (tqIsHandleExecuting(pHandle)) {
tqDebug("vgId:%d, topic:%s, subscription is executing, wait for 5ms and retry", vgId, pHandle->subKey);
taosMsleep(5);
}
code = taosHashRemove(pTq->pHandle, pReq->subKey, strlen(pReq->subKey));
if (code != 0) {
tqError("cannot process tq delete req %s, since no such handle", pReq->subKey);

View File

@ -55,6 +55,7 @@ int32_t tqRegisterPushHandle(STQ* pTq, void* handle, SRpcMsg* pMsg) {
memcpy(pHandle->msg, pMsg, sizeof(SRpcMsg));
pHandle->msg->pCont = rpcMallocCont(pMsg->contLen);
} else {
tqPushDataRsp(pTq, pHandle);
void* tmp = pHandle->msg->pCont;
memcpy(pHandle->msg, pMsg, sizeof(SRpcMsg));
pHandle->msg->pCont = tmp;

View File

@ -503,6 +503,74 @@ int32_t tqMaskBlock(SSchemaWrapper* pDst, SSDataBlock* pBlock, const SSchemaWrap
return 0;
}
static int32_t buildResSDataBlock(SSDataBlock* pBlock, SSchemaWrapper* pSchema, const SArray* pColIdList) {
if (blockDataGetNumOfCols(pBlock) > 0) {
return TSDB_CODE_SUCCESS;
}
int32_t numOfCols = taosArrayGetSize(pColIdList);
if (numOfCols == 0) { // all columns are required
for (int32_t i = 0; i < pSchema->nCols; ++i) {
SSchema* pColSchema = &pSchema->pSchema[i];
SColumnInfoData colInfo = createColumnInfoData(pColSchema->type, pColSchema->bytes, pColSchema->colId);
int32_t code = blockDataAppendColInfo(pBlock, &colInfo);
if (code != TSDB_CODE_SUCCESS) {
blockDataFreeRes(pBlock);
return TSDB_CODE_OUT_OF_MEMORY;
}
}
} else {
if (numOfCols > pSchema->nCols) {
numOfCols = pSchema->nCols;
}
int32_t i = 0;
int32_t j = 0;
while (i < pSchema->nCols && j < numOfCols) {
SSchema* pColSchema = &pSchema->pSchema[i];
col_id_t colIdSchema = pColSchema->colId;
col_id_t colIdNeed = *(col_id_t*)taosArrayGet(pColIdList, j);
if (colIdSchema < colIdNeed) {
i++;
} else if (colIdSchema > colIdNeed) {
j++;
} else {
SColumnInfoData colInfo = createColumnInfoData(pColSchema->type, pColSchema->bytes, pColSchema->colId);
int32_t code = blockDataAppendColInfo(pBlock, &colInfo);
if (code != TSDB_CODE_SUCCESS) {
return -1;
}
i++;
j++;
}
}
}
return TSDB_CODE_SUCCESS;
}
static int32_t doSetVal(SColumnInfoData* pColumnInfoData, int32_t rowIndex, SColVal* pColVal) {
int32_t code = TSDB_CODE_SUCCESS;
if (IS_STR_DATA_TYPE(pColVal->type)) {
char val[65535 + 2] = {0};
if (pColVal->value.pData != NULL) {
memcpy(varDataVal(val), pColVal->value.pData, pColVal->value.nData);
varDataSetLen(val, pColVal->value.nData);
code = colDataSetVal(pColumnInfoData, rowIndex, val, !COL_VAL_IS_VALUE(pColVal));
} else {
colDataSetNULL(pColumnInfoData, rowIndex);
}
} else {
code = colDataSetVal(pColumnInfoData, rowIndex, (void*)&pColVal->value.val, !COL_VAL_IS_VALUE(pColVal));
}
return code;
}
int32_t tqRetrieveDataBlock(STqReader* pReader, SSubmitTbData** pSubmitTbDataRet) {
tqDebug("tq reader retrieve data block %p, index:%d", pReader->msg.msgStr, pReader->nextBlk);
@ -538,53 +606,11 @@ int32_t tqRetrieveDataBlock(STqReader* pReader, SSubmitTbData** pSubmitTbDataRet
pReader->cachedSchemaSuid = suid;
pReader->cachedSchemaVer = sversion;
SSchemaWrapper* pSchemaWrapper = pReader->pSchemaWrapper;
if (blockDataGetNumOfCols(pBlock) > 0) {
blockDataDestroy(pReader->pResBlock);
pReader->pResBlock = createDataBlock();
pBlock = pReader->pResBlock;
pBlock->info.id.uid = uid;
pBlock->info.version = pReader->msg.ver;
}
int32_t numOfCols = taosArrayGetSize(pReader->pColIdList);
if (numOfCols == 0) { // all columns are required
for (int32_t i = 0; i < pSchemaWrapper->nCols; ++i) {
SSchema* pColSchema = &pSchemaWrapper->pSchema[i];
SColumnInfoData colInfo = createColumnInfoData(pColSchema->type, pColSchema->bytes, pColSchema->colId);
int32_t code = blockDataAppendColInfo(pBlock, &colInfo);
if (code != TSDB_CODE_SUCCESS) {
blockDataFreeRes(pBlock);
return -1;
}
}
} else {
if (numOfCols > pSchemaWrapper->nCols) {
numOfCols = pSchemaWrapper->nCols;
}
int32_t i = 0;
int32_t j = 0;
while (i < pSchemaWrapper->nCols && j < numOfCols) {
SSchema* pColSchema = &pSchemaWrapper->pSchema[i];
col_id_t colIdSchema = pColSchema->colId;
col_id_t colIdNeed = *(col_id_t*)taosArrayGet(pReader->pColIdList, j);
if (colIdSchema < colIdNeed) {
i++;
} else if (colIdSchema > colIdNeed) {
j++;
} else {
SColumnInfoData colInfo = createColumnInfoData(pColSchema->type, pColSchema->bytes, pColSchema->colId);
int32_t code = blockDataAppendColInfo(pBlock, &colInfo);
if (code != TSDB_CODE_SUCCESS) {
return -1;
}
i++;
j++;
}
ASSERT(pReader->cachedSchemaVer == pReader->pSchemaWrapper->version);
if (blockDataGetNumOfCols(pBlock) == 0) {
int32_t code = buildResSDataBlock(pReader->pResBlock, pReader->pSchemaWrapper, pReader->pColIdList);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
}
}
@ -632,30 +658,15 @@ int32_t tqRetrieveDataBlock(STqReader* pReader, SSubmitTbData** pSubmitTbDataRet
} else if (pCol->cid == pColData->info.colId) {
for (int32_t i = 0; i < pCol->nVal; i++) {
tColDataGetValue(pCol, i, &colVal);
if (IS_STR_DATA_TYPE(colVal.type)) {
if (colVal.value.pData != NULL) {
char val[65535 + 2] = {0};
memcpy(varDataVal(val), colVal.value.pData, colVal.value.nData);
varDataSetLen(val, colVal.value.nData);
if (colDataAppend(pColData, i, val, !COL_VAL_IS_VALUE(&colVal)) < 0) {
return -1;
}
} else {
colDataSetNULL(pColData, i);
}
} else {
if (colDataAppend(pColData, i, (void*)&colVal.value.val, !COL_VAL_IS_VALUE(&colVal)) < 0) {
return -1;
}
int32_t code = doSetVal(pColData, i, &colVal);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
}
sourceIdx++;
targetIdx++;
} else {
for (int32_t i = 0; i < pCol->nVal; i++) {
colDataSetNULL(pColData, i);
}
colDataSetNNULL(pColData, 0, pCol->nVal);
targetIdx++;
}
}
@ -681,21 +692,9 @@ int32_t tqRetrieveDataBlock(STqReader* pReader, SSubmitTbData** pSubmitTbDataRet
sourceIdx++;
continue;
} else if (colVal.cid == pColData->info.colId) {
if (IS_STR_DATA_TYPE(colVal.type)) {
if (colVal.value.pData != NULL) {
char val[65535 + 2] = {0};
memcpy(varDataVal(val), colVal.value.pData, colVal.value.nData);
varDataSetLen(val, colVal.value.nData);
if (colDataAppend(pColData, i, val, !COL_VAL_IS_VALUE(&colVal)) < 0) {
return -1;
}
} else {
colDataSetNULL(pColData, i);
}
} else {
if (colDataAppend(pColData, i, (void*)&colVal.value.val, !COL_VAL_IS_VALUE(&colVal)) < 0) {
return -1;
}
int32_t code = doSetVal(pColData, i, &colVal);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
sourceIdx++;
@ -833,14 +832,14 @@ int32_t tqRetrieveTaosxBlock(STqReader* pReader, SArray* blocks, SArray* schemas
char val[65535 + 2];
memcpy(varDataVal(val), colVal.value.pData, colVal.value.nData);
varDataSetLen(val, colVal.value.nData);
if (colDataAppend(pColData, curRow - lastRow, val, !COL_VAL_IS_VALUE(&colVal)) < 0) {
if (colDataSetVal(pColData, curRow - lastRow, val, !COL_VAL_IS_VALUE(&colVal)) < 0) {
goto FAIL;
}
} else {
colDataSetNULL(pColData, curRow - lastRow);
}
} else {
if (colDataAppend(pColData, curRow - lastRow, (void*)&colVal.value.val, !COL_VAL_IS_VALUE(&colVal)) < 0) {
if (colDataSetVal(pColData, curRow - lastRow, (void*)&colVal.value.val, !COL_VAL_IS_VALUE(&colVal)) < 0) {
goto FAIL;
}
}
@ -930,14 +929,14 @@ int32_t tqRetrieveTaosxBlock(STqReader* pReader, SArray* blocks, SArray* schemas
char val[65535 + 2];
memcpy(varDataVal(val), colVal.value.pData, colVal.value.nData);
varDataSetLen(val, colVal.value.nData);
if (colDataAppend(pColData, curRow - lastRow, val, !COL_VAL_IS_VALUE(&colVal)) < 0) {
if (colDataSetVal(pColData, curRow - lastRow, val, !COL_VAL_IS_VALUE(&colVal)) < 0) {
goto FAIL;
}
} else {
colDataSetNULL(pColData, curRow - lastRow);
}
} else {
if (colDataAppend(pColData, curRow - lastRow, (void*)&colVal.value.val, !COL_VAL_IS_VALUE(&colVal)) < 0) {
if (colDataSetVal(pColData, curRow - lastRow, (void*)&colVal.value.val, !COL_VAL_IS_VALUE(&colVal)) < 0) {
goto FAIL;
}
}

View File

@ -162,6 +162,8 @@ static int32_t extractResetOffsetVal(STqOffsetVal* pOffsetVal, STQ* pTq, STqHand
return 0;
}
bool tqIsHandleExecuting(STqHandle* pHandle) { return 1 == atomic_load_8(&pHandle->exec); }
static int32_t extractDataAndRspForNormalSubscribe(STQ* pTq, STqHandle* pHandle, const SMqPollReq* pRequest,
SRpcMsg* pMsg, STqOffsetVal* pOffset) {
uint64_t consumerId = pRequest->consumerId;
@ -170,12 +172,12 @@ static int32_t extractDataAndRspForNormalSubscribe(STQ* pTq, STqHandle* pHandle,
SMqDataRsp dataRsp = {0};
tqInitDataRsp(&dataRsp, pRequest, pHandle->execHandle.subType);
qTaskInfo_t task = pHandle->execHandle.task;
if(qTaskIsExecuting(task)){
code = tqSendDataRsp(pTq, pMsg, pRequest, &dataRsp, TMQ_MSG_TYPE__POLL_RSP);
tDeleteSMqDataRsp(&dataRsp);
return code;
while(tqIsHandleExecuting(pHandle)){
tqDebug("vgId:%d, topic:%s, subscription is executing, wait for 5ms and retry", vgId, pHandle->subKey);
taosMsleep(5);
}
atomic_store_8(&pHandle->exec, 1);
qSetTaskId(pHandle->execHandle.task, consumerId, pRequest->reqId);
code = tqScanData(pTq, pHandle, &dataRsp, pOffset);
@ -193,6 +195,7 @@ static int32_t extractDataAndRspForNormalSubscribe(STQ* pTq, STqHandle* pHandle,
code = tqRegisterPushHandle(pTq, pHandle, pMsg);
taosWUnLockLatch(&pTq->lock);
tDeleteSMqDataRsp(&dataRsp);
atomic_store_8(&pHandle->exec, 0);
return code;
}
else{
@ -200,10 +203,8 @@ static int32_t extractDataAndRspForNormalSubscribe(STQ* pTq, STqHandle* pHandle,
}
}
code = tqSendDataRsp(pTq, pMsg, pRequest, (SMqDataRsp*)&dataRsp, TMQ_MSG_TYPE__POLL_RSP);
// NOTE: this pHandle->consumerId may have been changed already.
code = tqSendDataRsp(pTq, pMsg, pRequest, (SMqDataRsp*)&dataRsp, TMQ_MSG_TYPE__POLL_RSP);
end:
{
@ -211,31 +212,33 @@ static int32_t extractDataAndRspForNormalSubscribe(STQ* pTq, STqHandle* pHandle,
tFormatOffset(buf, 80, &dataRsp.rspOffset);
tqDebug("tmq poll: consumer:0x%" PRIx64 ", subkey %s, vgId:%d, rsp block:%d, rsp offset type:%s, reqId:0x%" PRIx64 " code:%d",
consumerId, pHandle->subKey, vgId, dataRsp.blockNum, buf, pRequest->reqId, code);
// taosWUnLockLatch(&pTq->lock);
tDeleteSMqDataRsp(&dataRsp);
}
atomic_store_8(&pHandle->exec, 0);
return code;
}
static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle, const SMqPollReq* pRequest, SRpcMsg* pMsg, STqOffsetVal *offset) {
int code = 0;
int code = 0;
int32_t vgId = TD_VID(pTq->pVnode);
SWalCkHead* pCkHead = NULL;
SMqMetaRsp metaRsp = {0};
STaosxRsp taosxRsp = {0};
tqInitTaosxRsp(&taosxRsp, pRequest);
qTaskInfo_t task = pHandle->execHandle.task;
if(qTaskIsExecuting(task)){
code = tqSendDataRsp(pTq, pMsg, pRequest, (SMqDataRsp*)&taosxRsp, TMQ_MSG_TYPE__TAOSX_RSP);
tDeleteSTaosxRsp(&taosxRsp);
return code;
while(tqIsHandleExecuting(pHandle)){
tqDebug("vgId:%d, topic:%s, subscription is executing, wait for 5ms and retry", vgId, pHandle->subKey);
taosMsleep(5);
}
atomic_store_8(&pHandle->exec, 1);
if (offset->type != TMQ_OFFSET__LOG) {
if (tqScanTaosx(pTq, pHandle, &taosxRsp, &metaRsp, offset) < 0) {
tDeleteSTaosxRsp(&taosxRsp);
return -1;
code = -1;
goto end;
}
if (metaRsp.metaRspLen > 0) {
@ -243,30 +246,27 @@ static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle,
tqDebug("tmq poll: consumer:0x%" PRIx64 " subkey:%s vgId:%d, send meta offset type:%d,uid:%" PRId64 ",ts:%" PRId64,
pRequest->consumerId, pHandle->subKey, vgId, metaRsp.rspOffset.type, metaRsp.rspOffset.uid, metaRsp.rspOffset.ts);
taosMemoryFree(metaRsp.metaRsp);
tDeleteSTaosxRsp(&taosxRsp);
return code;
goto end;
}
tqDebug("taosx poll: consumer:0x%" PRIx64 " subkey:%s vgId:%d, send data blockNum:%d, offset type:%d,uid:%" PRId64
",ts:%" PRId64,pRequest->consumerId, pHandle->subKey, vgId, taosxRsp.blockNum, taosxRsp.rspOffset.type, taosxRsp.rspOffset.uid,taosxRsp.rspOffset.ts);
if (taosxRsp.blockNum > 0) {
code = tqSendDataRsp(pTq, pMsg, pRequest, (SMqDataRsp*)&taosxRsp, TMQ_MSG_TYPE__TAOSX_RSP);
tDeleteSTaosxRsp(&taosxRsp);
return code;
goto end;
}else {
*offset = taosxRsp.rspOffset;
}
}
if (offset->type == TMQ_OFFSET__LOG) {
verifyOffset(pHandle->pWalReader, offset);
int64_t fetchVer = offset->version + 1;
pCkHead = taosMemoryMalloc(sizeof(SWalCkHead) + 2048);
if (pCkHead == NULL) {
tDeleteSTaosxRsp(&taosxRsp);
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
code = -1;
goto end;
}
walSetReaderCapacity(pHandle->pWalReader, 2048);
int totalRows = 0;
@ -281,9 +281,7 @@ static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle,
if (tqFetchLog(pTq, pHandle, &fetchVer, &pCkHead, pRequest->reqId) < 0) {
tqOffsetResetToLog(&taosxRsp.rspOffset, fetchVer);
code = tqSendDataRsp(pTq, pMsg, pRequest, (SMqDataRsp*)&taosxRsp, TMQ_MSG_TYPE__TAOSX_RSP);
tDeleteSTaosxRsp(&taosxRsp);
taosMemoryFreeClear(pCkHead);
return code;
goto end;
}
SWalCont* pHead = &pCkHead->head;
@ -295,9 +293,7 @@ static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle,
if(totalRows > 0) {
tqOffsetResetToLog(&taosxRsp.rspOffset, fetchVer - 1);
code = tqSendDataRsp(pTq, pMsg, pRequest, (SMqDataRsp*)&taosxRsp, TMQ_MSG_TYPE__TAOSX_RSP);
tDeleteSTaosxRsp(&taosxRsp);
taosMemoryFreeClear(pCkHead);
return code;
goto end;
}
tqDebug("fetch meta msg, ver:%" PRId64 ", type:%s", pHead->version, TMSG_INFO(pHead->msgType));
@ -305,17 +301,8 @@ static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle,
metaRsp.resMsgType = pHead->msgType;
metaRsp.metaRspLen = pHead->bodyLen;
metaRsp.metaRsp = pHead->body;
if (tqSendMetaPollRsp(pTq, pMsg, pRequest, &metaRsp) < 0) {
code = -1;
taosMemoryFreeClear(pCkHead);
tDeleteSTaosxRsp(&taosxRsp);
return code;
}
code = 0;
taosMemoryFreeClear(pCkHead);
tDeleteSTaosxRsp(&taosxRsp);
return code;
code = tqSendMetaPollRsp(pTq, pMsg, pRequest, &metaRsp);
goto end;
}
// process data
@ -325,29 +312,28 @@ static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle,
.ver = pHead->version,
};
if (tqTaosxScanLog(pTq, pHandle, submit, &taosxRsp, &totalRows) < 0) {
tqError("tmq poll: tqTaosxScanLog error %" PRId64 ", in vgId:%d, subkey %s", pRequest->consumerId, vgId,
pRequest->subKey);
taosMemoryFreeClear(pCkHead);
tDeleteSTaosxRsp(&taosxRsp);
return -1;
code = tqTaosxScanLog(pTq, pHandle, submit, &taosxRsp, &totalRows);
if (code < 0) {
tqError("tmq poll: tqTaosxScanLog error %" PRId64 ", in vgId:%d, subkey %s", pRequest->consumerId, vgId, pRequest->subKey);
goto end;
}
if (totalRows >= 4096 || taosxRsp.createTableNum > 0) {
tqOffsetResetToLog(&taosxRsp.rspOffset, fetchVer);
code = tqSendDataRsp(pTq, pMsg, pRequest, (SMqDataRsp*)&taosxRsp, TMQ_MSG_TYPE__TAOSX_RSP);
tDeleteSTaosxRsp(&taosxRsp);
taosMemoryFreeClear(pCkHead);
return code;
goto end;
} else {
fetchVer++;
}
}
}
end:
atomic_store_8(&pHandle->exec, 0);
tDeleteSTaosxRsp(&taosxRsp);
taosMemoryFreeClear(pCkHead);
return 0;
return code;
}
int32_t tqExtractDataForMq(STQ* pTq, STqHandle* pHandle, const SMqPollReq* pRequest, SRpcMsg* pMsg) {

View File

@ -1437,12 +1437,12 @@ _return:
SMetaRes* pRes = taosArrayGet(ctx->pResList, pFetch->resIdx);
pRes->code = code;
pRes->pRes = NULL;
ctgTaskError("Get table %d.%s.%s meta failed with error %s", pName->acctId, pName->dbname, pName->tname,
tstrerror(code));
if (0 == atomic_sub_fetch_32(&ctx->fetchNum, 1)) {
TSWAP(pTask->res, ctx->pResList);
taskDone = true;
}
ctgTaskError("Get table %d.%s.%s meta failed with error %s", pName->acctId, pName->dbname, pName->tname,
tstrerror(code));
}
if (pTask->res && taskDone) {

View File

@ -244,6 +244,11 @@ static bool genInterpolationResult(STimeSliceOperatorInfo* pSliceInfo, SExprSupp
break;
}
if (end.key != INT64_MIN && end.key < pSliceInfo->current) {
hasInterp = false;
break;
}
if (start.key == INT64_MIN || end.key == INT64_MIN) {
colDataSetNULL(pDst, rows);
break;

View File

@ -388,6 +388,9 @@ static bool isSetUselessCol(SSetOperator* pSetOp, int32_t index, SExprNode* pPro
}
static int32_t calcConstSetOpProjections(SCalcConstContext* pCxt, SSetOperator* pSetOp, bool subquery) {
if (subquery && pSetOp->opType == SET_OP_TYPE_UNION) {
return TSDB_CODE_SUCCESS;
}
int32_t index = 0;
SNode* pProj = NULL;
WHERE_EACH(pProj, pSetOp->pProjectionList) {

View File

@ -2594,7 +2594,9 @@ static bool tbCntScanOptIsEligibleConds(STbCntScanOptInfo* pInfo, SNode* pCondit
if (NULL == pConditions) {
return true;
}
if (LIST_LENGTH(pInfo->pAgg->pGroupKeys) != 0) {
return false;
}
if (QUERY_NODE_LOGIC_CONDITION == nodeType(pConditions)) {
return tbCntScanOptIsEligibleLogicCond(pInfo, (SLogicConditionNode*)pConditions);
}

View File

@ -19,6 +19,14 @@
#include "scalar.h"
#include "tglobal.h"
static void debugPrintNode(SNode* pNode) {
char* pStr = NULL;
nodesNodeToString(pNode, false, &pStr, NULL);
printf("%s\n", pStr);
taosMemoryFree(pStr);
return;
}
static void dumpQueryPlan(SQueryPlan* pPlan) {
if (!tsQueryPlannerTrace) {
return;

View File

@ -118,6 +118,7 @@
,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/tmq3mnodeSwitch.py -N 6 -M 3 -n 3
,,y,system-test,./pytest.sh python3 ./test.py -f 99-TDcase/TD-19201.py
,,y,system-test,./pytest.sh python3 ./test.py -f 99-TDcase/TD-21561.py
,,y,system-test,./pytest.sh python3 ./test.py -f 99-TDcase/TS-3404.py
,,y,system-test,./pytest.sh python3 ./test.py -f 0-others/balance_vgroups_r1.py -N 6
,,y,system-test,./pytest.sh python3 ./test.py -f 0-others/taosShell.py

View File

@ -103,7 +103,14 @@ endi
if $data62 != 5 then
return -1
endi
sql select count(table_name) from information_schema.ins_tables where db_name='db1' and stable_name='sta' group by stable_name
print $rows , $data00
if $rows != 1 then
return -1
endi
if $data00 != 8 then
return -1
endi
sql select distinct db_name from information_schema.ins_tables;
print $rows
if $rows != 4 then

View File

@ -25,4 +25,21 @@ if $data05 != @0021001@ then
return -1
endi
sql create table st (ts timestamp, f int) tags (t int);
sql insert into ct1 using st tags(1) values(now, 1)(now+1s, 2)
sql insert into ct2 using st tags(2) values(now+2s, 3)(now+3s, 4)
sql select count(*) from (select * from ct1 union all select * from ct2)
if $rows != 1 then
return -1
endi
if $data00 != 4 then
return -1
endi
sql select count(*) from (select * from ct1 union select * from ct2)
if $rows != 1 then
return -1
endi
if $data00 != 4 then
return -1
endi
system sh/exec.sh -n dnode1 -s stop -x SIGINT

View File

@ -0,0 +1,101 @@
import taos
import sys
import time
import socket
import os
import threading
from util.log import *
from util.sql import *
from util.cases import *
from util.dnodes import *
class TDTestCase:
hostname = socket.gethostname()
def init(self, conn, logSql, replicaVar=1):
self.replicaVar = int(replicaVar)
tdLog.debug(f"start to excute {__file__}")
#tdSql.init(conn.cursor())
tdSql.init(conn.cursor(), logSql) # output sql.txt file
def getBuildPath(self):
selfPath = os.path.dirname(os.path.realpath(__file__))
if ("community" in selfPath):
projPath = selfPath[:selfPath.find("community")]
else:
projPath = selfPath[:selfPath.find("tests")]
for root, dirs, files in os.walk(projPath):
if ("taosd" in files or "taosd.exe" in files):
rootRealPath = os.path.dirname(os.path.realpath(root))
if ("packaging" not in rootRealPath):
buildPath = root[:len(root) - len("/build/bin")]
break
return buildPath
def create_tables(self):
tdSql.execute(f"CREATE STABLE `stb5` (`ts` TIMESTAMP, `ip_value` FLOAT, `ip_quality` INT) TAGS (`t1` INT)")
tdSql.execute(f"CREATE TABLE `t_11` USING `stb5` (`t1`) TAGS (1)")
def insert_data(self):
tdLog.debug("start to insert data ............")
tdSql.execute(f"INSERT INTO `t_11` VALUES ('2023-05-10 09:30:47.722', 10.30000, 100)")
tdSql.execute(f"INSERT INTO `t_11` VALUES ('2023-05-10 09:30:56.383', 12.30000, 100)")
tdSql.execute(f"INSERT INTO `t_11` VALUES ('2023-05-10 09:48:55.778', 13.30000, 100)")
tdSql.execute(f"INSERT INTO `t_11` VALUES ('2023-05-10 09:51:50.821', 9.30000, 100)")
tdSql.execute(f"INSERT INTO `t_11` VALUES ('2023-05-10 09:58:07.162', 9.30000, 100)")
tdSql.execute(f"INSERT INTO `t_11` VALUES ('2023-05-10 13:41:16.075', 9.30000, 100)")
tdSql.execute(f"INSERT INTO `t_11` VALUES ('2023-05-13 14:12:58.318', 21.00000, 100)")
tdSql.execute(f"INSERT INTO `t_11` VALUES ('2023-05-13 14:13:21.328', 1.10000, 100)")
tdSql.execute(f"INSERT INTO `t_11` VALUES ('2023-05-13 14:35:24.258', 1.30000, 100)")
tdSql.execute(f"INSERT INTO `t_11` VALUES ('2023-05-13 16:56:49.033', 1.80000, 100)")
tdLog.debug("insert data ............ [OK]")
def run(self):
tdSql.prepare()
self.create_tables()
self.insert_data()
tdLog.printNoPrefix("======== test TS-3404")
tdSql.query(f"select _irowts, interp(ip_value) from t_11 range('2023-05-13 14:00:00', '2023-05-13 15:00:00') every(300s) fill(linear);")
tdSql.checkRows(13)
tdSql.checkData(0, 0, '2023-05-13 14:00:00.000')
tdSql.checkData(1, 0, '2023-05-13 14:05:00.000')
tdSql.checkData(2, 0, '2023-05-13 14:10:00.000')
tdSql.checkData(3, 0, '2023-05-13 14:15:00.000')
tdSql.checkData(4, 0, '2023-05-13 14:20:00.000')
tdSql.checkData(5, 0, '2023-05-13 14:25:00.000')
tdSql.checkData(6, 0, '2023-05-13 14:30:00.000')
tdSql.checkData(7, 0, '2023-05-13 14:35:00.000')
tdSql.checkData(8, 0, '2023-05-13 14:40:00.000')
tdSql.checkData(9, 0, '2023-05-13 14:45:00.000')
tdSql.checkData(10, 0, '2023-05-13 14:50:00.000')
tdSql.checkData(11, 0, '2023-05-13 14:55:00.000')
tdSql.checkData(12, 0, '2023-05-13 15:00:00.000')
tdSql.checkData(0, 1, 20.96512)
tdSql.checkData(1, 1, 20.97857)
tdSql.checkData(2, 1, 20.99201)
tdSql.checkData(3, 1, 1.114917)
tdSql.checkData(4, 1, 1.160271)
tdSql.checkData(5, 1, 1.205625)
tdSql.checkData(6, 1, 1.250978)
tdSql.checkData(7, 1, 1.296333)
tdSql.checkData(8, 1, 1.316249)
tdSql.checkData(9, 1, 1.333927)
tdSql.checkData(10, 1, 1.351607)
tdSql.checkData(11, 1, 1.369285)
tdSql.checkData(12, 1, 1.386964)
def stop(self):
tdSql.close()
tdLog.success(f"{__file__} successfully executed")
tdCases.addLinux(__file__, TDTestCase())
tdCases.addWindows(__file__, TDTestCase())