fix:conflicts

This commit is contained in:
wangmm0220 2023-03-29 17:53:05 +08:00
commit 2243184a4d
49 changed files with 1031 additions and 481 deletions

View File

@ -2,7 +2,7 @@
# taos-tools
ExternalProject_Add(taos-tools
GIT_REPOSITORY https://github.com/taosdata/taos-tools.git
GIT_TAG 2864326
GIT_TAG e82b9fc
SOURCE_DIR "${TD_SOURCE_DIR}/tools/taos-tools"
BINARY_DIR ""
#BUILD_IN_SOURCE TRUE

View File

@ -178,7 +178,7 @@ int32_t getJsonValueLen(const char* data);
int32_t colDataSetVal(SColumnInfoData* pColumnInfoData, uint32_t rowIndex, const char* pData, bool isNull);
int32_t colDataAppend(SColumnInfoData* pColumnInfoData, uint32_t rowIndex, const char* pData, bool isNull);
int32_t colDataSetNItems(SColumnInfoData* pColumnInfoData, uint32_t rowIndex, const char* pData, uint32_t numOfRows);
int32_t colDataSetNItems(SColumnInfoData* pColumnInfoData, uint32_t rowIndex, const char* pData, uint32_t numOfRows, bool trimValue);
int32_t colDataMergeCol(SColumnInfoData* pColumnInfoData, int32_t numOfRow1, int32_t* capacity,
const SColumnInfoData* pSource, int32_t numOfRow2);
int32_t colDataAssign(SColumnInfoData* pColumnInfoData, const SColumnInfoData* pSource, int32_t numOfRows,

View File

@ -34,7 +34,7 @@ typedef enum {
WRITE_QUEUE,
APPLY_QUEUE,
SYNC_QUEUE,
SYNC_CTRL_QUEUE,
SYNC_RD_QUEUE,
STREAM_QUEUE,
QUEUE_MAX,
} EQueueType;

View File

@ -259,7 +259,7 @@ enum {
TD_NEW_MSG_SEG(TDMT_SYNC_MSG)
TD_DEF_MSG_TYPE(TDMT_SYNC_TIMEOUT, "sync-timer", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_SYNC_PING, "sync-ping", NULL, NULL) // no longer used
TD_DEF_MSG_TYPE(TDMT_SYNC_TIMEOUT_ELECTION, "sync-elect", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_SYNC_PING_REPLY, "sync-ping-reply", NULL, NULL) // no longer used
TD_DEF_MSG_TYPE(TDMT_SYNC_CLIENT_REQUEST, "sync-client-request", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_SYNC_CLIENT_REQUEST_BATCH, "sync-client-request-batch", NULL, NULL)

View File

@ -23,20 +23,19 @@ extern "C" {
#ifndef _STREAM_STATE_H_
#define _STREAM_STATE_H_
typedef struct SStreamTask SStreamTask;
typedef bool (*state_key_cmpr_fn)(void* pKey1, void* pKey2);
typedef struct STdbState {
SStreamTask* pOwner;
TDB* db;
TTB* pStateDb;
TTB* pFuncStateDb;
TTB* pFillStateDb; // todo refactor
TTB* pSessionStateDb;
TTB* pParNameDb;
TTB* pParTagDb;
TXN* txn;
struct SStreamTask* pOwner;
TDB* db;
TTB* pStateDb;
TTB* pFuncStateDb;
TTB* pFillStateDb; // todo refactor
TTB* pSessionStateDb;
TTB* pParNameDb;
TTB* pParTagDb;
TXN* txn;
} STdbState;
// incremental state storage
@ -45,7 +44,7 @@ typedef struct {
int32_t number;
} SStreamState;
SStreamState* streamStateOpen(char* path, SStreamTask* pTask, bool specPath, int32_t szPage, int32_t pages);
SStreamState* streamStateOpen(char* path, struct SStreamTask* pTask, bool specPath, int32_t szPage, int32_t pages);
void streamStateClose(SStreamState* pState);
int32_t streamStateBegin(SStreamState* pState);
int32_t streamStateCommit(SStreamState* pState);

View File

@ -295,7 +295,7 @@ typedef struct {
SEpSet epSet;
} SStreamChildEpInfo;
typedef struct SStreamTask {
struct SStreamTask {
int64_t streamId;
int32_t taskId;
int32_t totalLevel;
@ -362,8 +362,7 @@ typedef struct SStreamTask {
int64_t checkpointingId;
int32_t checkpointAlignCnt;
} SStreamTask;
};
int32_t tEncodeStreamEpInfo(SEncoder* pEncoder, const SStreamChildEpInfo* pInfo);
int32_t tDecodeStreamEpInfo(SDecoder* pDecoder, SStreamChildEpInfo* pInfo);
@ -587,7 +586,7 @@ int32_t streamProcessDispatchReq(SStreamTask* pTask, SStreamDispatchReq* pReq, S
int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp, int32_t code);
int32_t streamProcessRetrieveReq(SStreamTask* pTask, SStreamRetrieveReq* pReq, SRpcMsg* pMsg);
int32_t streamProcessRetrieveRsp(SStreamTask* pTask, SStreamRetrieveRsp* pRsp);
// int32_t streamProcessRetrieveRsp(SStreamTask* pTask, SStreamRetrieveRsp* pRsp);
int32_t streamTryExec(SStreamTask* pTask);
int32_t streamSchedExec(SStreamTask* pTask);

View File

@ -48,6 +48,13 @@ SSHashObj *tSimpleHashInit(size_t capacity, _hash_fn_t fn);
*/
int32_t tSimpleHashGetSize(const SSHashObj *pHashObj);
/**
* set the free function pointer
* @param pHashObj
* @param freeFp
*/
void tSimpleHashSetFreeFp(SSHashObj* pHashObj, _hash_free_fn_t freeFp);
int32_t tSimpleHashPrint(const SSHashObj *pHashObj);
/**

View File

@ -76,7 +76,7 @@ int32_t processConnectRsp(void* param, SDataBuf* pMsg, int32_t code) {
goto End;
}
if ((code = taosCheckVersionCompatibleFromStr(version, connectRsp.sVer, 2)) != 0) {
if ((code = taosCheckVersionCompatibleFromStr(version, connectRsp.sVer, 3)) != 0) {
setErrno(pRequest, code);
tsem_post(&pRequest->body.rspSem);
goto End;
@ -506,7 +506,7 @@ int32_t processShowVariablesRsp(void* param, SDataBuf* pMsg, int32_t code) {
code = setQueryResultFromRsp(&pRequest->body.resInfo, pRes, false, true);
}
if(code != 0){
if (code != 0) {
taosMemoryFree(pRes);
}
tFreeSShowVariablesRsp(&rsp);

View File

@ -998,7 +998,8 @@ TEST(clientCase, sub_db_test) {
// 创建订阅 topics 列表
tmq_list_t* topicList = tmq_list_new();
tmq_list_append(topicList, "topic_db1");
tmq_list_append(topicList, "topic_t1");
tmq_list_append(topicList, "topic_s2");
// 启动订阅
tmq_subscribe(tmq, topicList);

View File

@ -147,9 +147,17 @@ int32_t colDataReserve(SColumnInfoData* pColumnInfoData, size_t newSize) {
return TSDB_CODE_SUCCESS;
}
static void doCopyNItems(struct SColumnInfoData* pColumnInfoData, int32_t currentRow, const char* pData,
int32_t itemLen, int32_t numOfRows) {
ASSERT(pColumnInfoData->info.bytes >= itemLen);
static int32_t doCopyNItems(struct SColumnInfoData* pColumnInfoData, int32_t currentRow, const char* pData,
int32_t itemLen, int32_t numOfRows, bool trimValue) {
if (pColumnInfoData->info.bytes < itemLen) {
uWarn("column/tag actual data len %d is bigger than schema len %d, trim it:%d", itemLen, pColumnInfoData->info.bytes, trimValue);
if (trimValue) {
itemLen = pColumnInfoData->info.bytes;
} else {
return TSDB_CODE_TDB_INVALID_TABLE_SCHEMA_VER;
}
}
size_t start = 1;
// the first item
@ -178,10 +186,12 @@ static void doCopyNItems(struct SColumnInfoData* pColumnInfoData, int32_t curren
pColumnInfoData->varmeta.length += numOfRows * itemLen;
}
return TSDB_CODE_SUCCESS;
}
int32_t colDataSetNItems(SColumnInfoData* pColumnInfoData, uint32_t currentRow, const char* pData,
uint32_t numOfRows) {
uint32_t numOfRows, bool trimValue) {
int32_t len = pColumnInfoData->info.bytes;
if (IS_VAR_DATA_TYPE(pColumnInfoData->info.type)) {
len = varDataTLen(pData);
@ -193,8 +203,7 @@ int32_t colDataSetNItems(SColumnInfoData* pColumnInfoData, uint32_t currentRow,
}
}
doCopyNItems(pColumnInfoData, currentRow, pData, len, numOfRows);
return TSDB_CODE_SUCCESS;
return doCopyNItems(pColumnInfoData, currentRow, pData, len, numOfRows, trimValue);
}
static void doBitmapMerge(SColumnInfoData* pColumnInfoData, int32_t numOfRow1, const SColumnInfoData* pSource,

View File

@ -34,7 +34,7 @@ typedef struct SMnodeMgmt {
SSingleWorker readWorker;
SSingleWorker writeWorker;
SSingleWorker syncWorker;
SSingleWorker syncCtrlWorker;
SSingleWorker syncRdWorker;
bool stopped;
int32_t refCount;
TdThreadRwlock lock;
@ -54,7 +54,7 @@ int32_t mmStartWorker(SMnodeMgmt *pMgmt);
void mmStopWorker(SMnodeMgmt *pMgmt);
int32_t mmPutMsgToWriteQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg);
int32_t mmPutMsgToSyncQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg);
int32_t mmPutMsgToSyncCtrlQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg);
int32_t mmPutMsgToSyncRdQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg);
int32_t mmPutMsgToReadQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg);
int32_t mmPutMsgToQueryQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg);
int32_t mmPutMsgToFetchQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg);

View File

@ -188,7 +188,8 @@ SArray *mmGetMsgHandles() {
if (dmSetMgmtHandle(pArray, TDMT_VND_DROP_INDEX_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_DISABLE_WRITE_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_SYNC_TIMEOUT, mmPutMsgToSyncQueue, 1) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_SYNC_TIMEOUT_ELECTION, mmPutMsgToSyncQueue, 1) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_SYNC_HEARTBEAT, mmPutMsgToSyncRdQueue, 1) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_SYNC_CLIENT_REQUEST, mmPutMsgToSyncQueue, 1) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_SYNC_CLIENT_REQUEST_BATCH, mmPutMsgToSyncQueue, 1) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_SYNC_CLIENT_REQUEST_REPLY, mmPutMsgToSyncQueue, 1) == NULL) goto _OVER;
@ -198,11 +199,12 @@ SArray *mmGetMsgHandles() {
if (dmSetMgmtHandle(pArray, TDMT_SYNC_APPEND_ENTRIES_BATCH, mmPutMsgToSyncQueue, 1) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_SYNC_APPEND_ENTRIES_REPLY, mmPutMsgToSyncQueue, 1) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_SYNC_SNAPSHOT_SEND, mmPutMsgToSyncQueue, 1) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_SYNC_SNAPSHOT_RSP, mmPutMsgToSyncQueue, 1) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_SYNC_PRE_SNAPSHOT, mmPutMsgToSyncQueue, 1) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_SYNC_PRE_SNAPSHOT_REPLY, mmPutMsgToSyncQueue, 1) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_SYNC_HEARTBEAT, mmPutMsgToSyncCtrlQueue, 1) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_SYNC_HEARTBEAT_REPLY, mmPutMsgToSyncCtrlQueue, 1) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_SYNC_TIMEOUT, mmPutMsgToSyncRdQueue, 1) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_SYNC_HEARTBEAT_REPLY, mmPutMsgToSyncRdQueue, 1) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_SYNC_SNAPSHOT_RSP, mmPutMsgToSyncRdQueue, 1) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_SYNC_PRE_SNAPSHOT_REPLY, mmPutMsgToSyncRdQueue, 1) == NULL) goto _OVER;
code = 0;

View File

@ -111,8 +111,8 @@ int32_t mmPutMsgToSyncQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg) {
return mmPutMsgToWorker(pMgmt, &pMgmt->syncWorker, pMsg);
}
int32_t mmPutMsgToSyncCtrlQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg) {
return mmPutMsgToWorker(pMgmt, &pMgmt->syncCtrlWorker, pMsg);
int32_t mmPutMsgToSyncRdQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg) {
return mmPutMsgToWorker(pMgmt, &pMgmt->syncRdWorker, pMsg);
}
int32_t mmPutMsgToReadQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg) {
@ -151,8 +151,8 @@ int32_t mmPutMsgToQueue(SMnodeMgmt *pMgmt, EQueueType qtype, SRpcMsg *pRpc) {
case SYNC_QUEUE:
pWorker = &pMgmt->syncWorker;
break;
case SYNC_CTRL_QUEUE:
pWorker = &pMgmt->syncCtrlWorker;
case SYNC_RD_QUEUE:
pWorker = &pMgmt->syncRdWorker;
break;
default:
terrno = TSDB_CODE_INVALID_PARA;
@ -238,12 +238,12 @@ int32_t mmStartWorker(SMnodeMgmt *pMgmt) {
SSingleWorkerCfg scCfg = {
.min = 1,
.max = 1,
.name = "mnode-sync-ctrl",
.name = "mnode-sync-rd",
.fp = (FItem)mmProcessSyncMsg,
.param = pMgmt,
};
if (tSingleWorkerInit(&pMgmt->syncCtrlWorker, &scCfg) != 0) {
dError("failed to start mnode mnode-sync-ctrl worker since %s", terrstr());
if (tSingleWorkerInit(&pMgmt->syncRdWorker, &scCfg) != 0) {
dError("failed to start mnode mnode-sync-rd worker since %s", terrstr());
return -1;
}
@ -259,6 +259,6 @@ void mmStopWorker(SMnodeMgmt *pMgmt) {
tSingleWorkerCleanup(&pMgmt->readWorker);
tSingleWorkerCleanup(&pMgmt->writeWorker);
tSingleWorkerCleanup(&pMgmt->syncWorker);
tSingleWorkerCleanup(&pMgmt->syncCtrlWorker);
tSingleWorkerCleanup(&pMgmt->syncRdWorker);
dDebug("mnode workers are closed");
}

View File

@ -59,7 +59,7 @@ typedef struct {
SVnode *pImpl;
SMultiWorker pWriteW;
SMultiWorker pSyncW;
SMultiWorker pSyncCtrlW;
SMultiWorker pSyncRdW;
SMultiWorker pApplyW;
STaosQueue *pQueryQ;
STaosQueue *pStreamQ;
@ -107,7 +107,7 @@ int32_t vmPutRpcMsgToQueue(SVnodeMgmt *pMgmt, EQueueType qtype, SRpcMsg *pRpc);
int32_t vmPutMsgToWriteQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg);
int32_t vmPutMsgToSyncQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg);
int32_t vmPutMsgToSyncCtrlQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg);
int32_t vmPutMsgToSyncRdQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg);
int32_t vmPutMsgToQueryQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg);
int32_t vmPutMsgToFetchQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg);
int32_t vmPutMsgToStreamQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg);

View File

@ -549,7 +549,8 @@ SArray *vmGetMsgHandles() {
if (dmSetMgmtHandle(pArray, TDMT_DND_CREATE_VNODE, vmPutMsgToMgmtQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_DND_DROP_VNODE, vmPutMsgToMgmtQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_SYNC_TIMEOUT, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_SYNC_TIMEOUT_ELECTION, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_SYNC_HEARTBEAT, vmPutMsgToSyncRdQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_SYNC_CLIENT_REQUEST, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_SYNC_CLIENT_REQUEST_BATCH, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_SYNC_CLIENT_REQUEST_REPLY, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER;
@ -559,12 +560,12 @@ SArray *vmGetMsgHandles() {
if (dmSetMgmtHandle(pArray, TDMT_SYNC_APPEND_ENTRIES_BATCH, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_SYNC_APPEND_ENTRIES_REPLY, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_SYNC_SNAPSHOT_SEND, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_SYNC_SNAPSHOT_RSP, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_SYNC_PRE_SNAPSHOT, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_SYNC_PRE_SNAPSHOT_REPLY, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_SYNC_HEARTBEAT, vmPutMsgToSyncCtrlQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_SYNC_HEARTBEAT_REPLY, vmPutMsgToSyncCtrlQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_SYNC_TIMEOUT, vmPutMsgToSyncRdQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_SYNC_HEARTBEAT_REPLY, vmPutMsgToSyncRdQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_SYNC_SNAPSHOT_RSP, vmPutMsgToSyncRdQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_SYNC_PRE_SNAPSHOT_REPLY, vmPutMsgToSyncRdQueue, 0) == NULL) goto _OVER;
code = 0;

View File

@ -98,9 +98,9 @@ void vmCloseVnode(SVnodeMgmt *pMgmt, SVnodeObj *pVnode, bool commitAndRemoveWal)
pVnode->pSyncW.queue->threadId);
tMultiWorkerCleanup(&pVnode->pSyncW);
dInfo("vgId:%d, wait for vnode sync ctrl queue:%p is empty, thread:%08" PRId64, pVnode->vgId,
pVnode->pSyncCtrlW.queue, pVnode->pSyncCtrlW.queue->threadId);
tMultiWorkerCleanup(&pVnode->pSyncCtrlW);
dInfo("vgId:%d, wait for vnode sync rd queue:%p is empty, thread:%08" PRId64, pVnode->vgId, pVnode->pSyncRdW.queue,
pVnode->pSyncRdW.queue->threadId);
tMultiWorkerCleanup(&pVnode->pSyncRdW);
dInfo("vgId:%d, wait for vnode apply queue:%p is empty, thread:%08" PRId64, pVnode->vgId, pVnode->pApplyW.queue,
pVnode->pApplyW.queue->threadId);

View File

@ -216,9 +216,9 @@ static int32_t vmPutMsgToQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg, EQueueType qtyp
dGTrace("vgId:%d, msg:%p put into vnode-sync queue", pVnode->vgId, pMsg);
taosWriteQitem(pVnode->pSyncW.queue, pMsg);
break;
case SYNC_CTRL_QUEUE:
dGTrace("vgId:%d, msg:%p put into vnode-sync-ctrl queue", pVnode->vgId, pMsg);
taosWriteQitem(pVnode->pSyncCtrlW.queue, pMsg);
case SYNC_RD_QUEUE:
dGTrace("vgId:%d, msg:%p put into vnode-sync-rd queue", pVnode->vgId, pMsg);
taosWriteQitem(pVnode->pSyncRdW.queue, pMsg);
break;
case APPLY_QUEUE:
dGTrace("vgId:%d, msg:%p put into vnode-apply queue", pVnode->vgId, pMsg);
@ -234,9 +234,7 @@ static int32_t vmPutMsgToQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg, EQueueType qtyp
return code;
}
int32_t vmPutMsgToSyncCtrlQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
return vmPutMsgToQueue(pMgmt, pMsg, SYNC_CTRL_QUEUE);
}
int32_t vmPutMsgToSyncRdQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) { return vmPutMsgToQueue(pMgmt, pMsg, SYNC_RD_QUEUE); }
int32_t vmPutMsgToSyncQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) { return vmPutMsgToQueue(pMgmt, pMsg, SYNC_QUEUE); }
@ -327,18 +325,18 @@ int32_t vmGetQueueSize(SVnodeMgmt *pMgmt, int32_t vgId, EQueueType qtype) {
int32_t vmAllocQueue(SVnodeMgmt *pMgmt, SVnodeObj *pVnode) {
SMultiWorkerCfg wcfg = {.max = 1, .name = "vnode-write", .fp = (FItems)vnodeProposeWriteMsg, .param = pVnode->pImpl};
SMultiWorkerCfg scfg = {.max = 1, .name = "vnode-sync", .fp = (FItems)vmProcessSyncQueue, .param = pVnode};
SMultiWorkerCfg sccfg = {.max = 1, .name = "vnode-sync-ctrl", .fp = (FItems)vmProcessSyncQueue, .param = pVnode};
SMultiWorkerCfg sccfg = {.max = 1, .name = "vnode-sync-rd", .fp = (FItems)vmProcessSyncQueue, .param = pVnode};
SMultiWorkerCfg acfg = {.max = 1, .name = "vnode-apply", .fp = (FItems)vnodeApplyWriteMsg, .param = pVnode->pImpl};
(void)tMultiWorkerInit(&pVnode->pWriteW, &wcfg);
(void)tMultiWorkerInit(&pVnode->pSyncW, &scfg);
(void)tMultiWorkerInit(&pVnode->pSyncCtrlW, &sccfg);
(void)tMultiWorkerInit(&pVnode->pSyncRdW, &sccfg);
(void)tMultiWorkerInit(&pVnode->pApplyW, &acfg);
pVnode->pQueryQ = tQWorkerAllocQueue(&pMgmt->queryPool, pVnode, (FItem)vmProcessQueryQueue);
pVnode->pStreamQ = tAutoQWorkerAllocQueue(&pMgmt->streamPool, pVnode, (FItem)vmProcessStreamQueue);
pVnode->pFetchQ = tWWorkerAllocQueue(&pMgmt->fetchPool, pVnode, (FItems)vmProcessFetchQueue);
if (pVnode->pWriteW.queue == NULL || pVnode->pSyncW.queue == NULL || pVnode->pSyncCtrlW.queue == NULL ||
if (pVnode->pWriteW.queue == NULL || pVnode->pSyncW.queue == NULL || pVnode->pSyncRdW.queue == NULL ||
pVnode->pApplyW.queue == NULL || pVnode->pQueryQ == NULL || pVnode->pStreamQ == NULL || pVnode->pFetchQ == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
@ -348,8 +346,8 @@ int32_t vmAllocQueue(SVnodeMgmt *pMgmt, SVnodeObj *pVnode) {
pVnode->pWriteW.queue->threadId);
dInfo("vgId:%d, sync-queue:%p is alloced, thread:%08" PRId64, pVnode->vgId, pVnode->pSyncW.queue,
pVnode->pSyncW.queue->threadId);
dInfo("vgId:%d, sync-ctrl-queue:%p is alloced, thread:%08" PRId64, pVnode->vgId, pVnode->pSyncCtrlW.queue,
pVnode->pSyncCtrlW.queue->threadId);
dInfo("vgId:%d, sync-rd-queue:%p is alloced, thread:%08" PRId64, pVnode->vgId, pVnode->pSyncRdW.queue,
pVnode->pSyncRdW.queue->threadId);
dInfo("vgId:%d, apply-queue:%p is alloced, thread:%08" PRId64, pVnode->vgId, pVnode->pApplyW.queue,
pVnode->pApplyW.queue->threadId);
dInfo("vgId:%d, query-queue:%p is alloced", pVnode->vgId, pVnode->pQueryQ);

View File

@ -247,6 +247,13 @@ static SMqRebInfo *mndGetOrCreateRebSub(SHashObj *pHash, const char *key) {
return pRebInfo;
}
static void freeRebalanceItem(void* param) {
SMqRebInfo* pInfo = param;
taosArrayDestroy(pInfo->lostConsumers);
taosArrayDestroy(pInfo->newConsumers);
taosArrayDestroy(pInfo->removedConsumers);
}
static int32_t mndProcessMqTimerMsg(SRpcMsg *pMsg) {
SMnode *pMnode = pMsg->info.node;
SSdb *pSdb = pMnode->pSdb;
@ -262,8 +269,21 @@ static int32_t mndProcessMqTimerMsg(SRpcMsg *pMsg) {
}
SMqDoRebalanceMsg *pRebMsg = rpcMallocCont(sizeof(SMqDoRebalanceMsg));
if (pRebMsg == NULL) {
mError("failed to create the rebalance msg, size:%d, quit mq timer", (int32_t) sizeof(SMqDoRebalanceMsg));
mndRebEnd();
return TSDB_CODE_OUT_OF_MEMORY;
}
pRebMsg->rebSubHash = taosHashInit(64, MurmurHash3_32, true, HASH_NO_LOCK);
// TODO set cleanfp
if (pRebMsg->rebSubHash == NULL) {
mError("failed to create rebalance hashmap");
rpcFreeCont(pRebMsg);
mndRebEnd();
return TSDB_CODE_OUT_OF_MEMORY;
}
taosHashSetFreeFp(pRebMsg->rebSubHash, freeRebalanceItem);
// iterate all consumers, find all modification
while (1) {
@ -356,7 +376,7 @@ static int32_t mndProcessMqTimerMsg(SRpcMsg *pMsg) {
} else {
taosHashCleanup(pRebMsg->rebSubHash);
rpcFreeCont(pRebMsg);
mDebug("mq rebalance finished, no modification");
mDebug("mq timer finished, no need to re-balance");
mndRebEnd();
}
return 0;
@ -852,6 +872,53 @@ static int32_t mndConsumerActionDelete(SSdb *pSdb, SMqConsumerObj *pConsumer) {
return 0;
}
static void updateConsumerStatus(SMqConsumerObj* pConsumer) {
int32_t status = pConsumer->status;
if (taosArrayGetSize(pConsumer->rebNewTopics) == 0 && taosArrayGetSize(pConsumer->rebRemovedTopics) == 0) {
if (status == MQ_CONSUMER_STATUS__MODIFY || status == MQ_CONSUMER_STATUS__MODIFY_IN_REB) {
pConsumer->status = MQ_CONSUMER_STATUS__READY;
} else if (status == MQ_CONSUMER_STATUS__LOST_IN_REB || status == MQ_CONSUMER_STATUS__LOST) {
pConsumer->status = MQ_CONSUMER_STATUS__LOST_REBD;
}
} else {
if (status == MQ_CONSUMER_STATUS__MODIFY || status == MQ_CONSUMER_STATUS__MODIFY_IN_REB) {
pConsumer->status = MQ_CONSUMER_STATUS__MODIFY;
} else if (status == MQ_CONSUMER_STATUS__LOST || status == MQ_CONSUMER_STATUS__LOST_IN_REB) {
pConsumer->status = MQ_CONSUMER_STATUS__LOST;
}
}
}
// remove from new topic
static void removeFromNewTopicList(SMqConsumerObj* pConsumer, const char* pTopic) {
int32_t size = taosArrayGetSize(pConsumer->rebNewTopics);
for (int32_t i = 0; i < taosArrayGetSize(pConsumer->rebNewTopics); i++) {
char *p = taosArrayGetP(pConsumer->rebNewTopics, i);
if (strcmp(pTopic, p) == 0) {
taosArrayRemove(pConsumer->rebNewTopics, i);
taosMemoryFree(p);
mDebug("consumer:0x%" PRIx64 " remove new topic:%s in the topic list, remain newTopics:%d", pConsumer->consumerId,
pTopic, (int) taosArrayGetSize(pConsumer->rebNewTopics));
break;
}
}
}
// remove from removed topic
static void removeFromRemoveTopicList(SMqConsumerObj* pConsumer, const char* pTopic) {
int32_t size = taosArrayGetSize(pConsumer->rebRemovedTopics);
for (int32_t i = 0; i < size; i++) {
char *p = taosArrayGetP(pConsumer->rebRemovedTopics, i);
if (strcmp(pTopic, p) == 0) {
taosArrayRemove(pConsumer->rebRemovedTopics, i);
taosMemoryFree(p);
break;
}
}
}
static int32_t mndConsumerActionUpdate(SSdb *pSdb, SMqConsumerObj *pOldConsumer, SMqConsumerObj *pNewConsumer) {
mDebug("consumer:0x%" PRIx64 " perform update action, update type:%d, subscribe-time:%" PRId64 ", uptime:%" PRId64,
pOldConsumer->consumerId, pNewConsumer->updateType, pOldConsumer->subscribeTime, pOldConsumer->upTime);
@ -862,6 +929,7 @@ static int32_t mndConsumerActionUpdate(SSdb *pSdb, SMqConsumerObj *pOldConsumer,
/*A(taosArrayGetSize(pOldConsumer->rebNewTopics) == 0);*/
/*A(taosArrayGetSize(pOldConsumer->rebRemovedTopics) == 0);*/
// this new consumer has identical topics with one existed consumers.
if (taosArrayGetSize(pNewConsumer->rebNewTopics) == 0 && taosArrayGetSize(pNewConsumer->rebRemovedTopics) == 0) {
pOldConsumer->status = MQ_CONSUMER_STATUS__READY;
} else {
@ -878,7 +946,6 @@ static int32_t mndConsumerActionUpdate(SSdb *pSdb, SMqConsumerObj *pOldConsumer,
pNewConsumer->assignedTopics = tmp;
pOldConsumer->subscribeTime = pNewConsumer->upTime;
pOldConsumer->status = MQ_CONSUMER_STATUS__MODIFY;
}
} else if (pNewConsumer->updateType == CONSUMER_UPDATE__LOST) {
@ -918,71 +985,48 @@ static int32_t mndConsumerActionUpdate(SSdb *pSdb, SMqConsumerObj *pOldConsumer,
pOldConsumer->rebalanceTime = pNewConsumer->upTime;
} else if (pNewConsumer->updateType == CONSUMER_UPDATE__ADD) {
/*A(taosArrayGetSize(pNewConsumer->rebNewTopics) == 1);*/
/*A(taosArrayGetSize(pNewConsumer->rebRemovedTopics) == 0);*/
ASSERT(taosArrayGetSize(pNewConsumer->rebNewTopics) == 1 && taosArrayGetSize(pNewConsumer->rebRemovedTopics) == 0);
char *pNewTopic = taosStrdup(taosArrayGetP(pNewConsumer->rebNewTopics, 0));
char *addedTopic = taosStrdup(taosArrayGetP(pNewConsumer->rebNewTopics, 0));
// not exist in current topic
bool existing = false;
#if 1
bool existing = false;
int32_t numOfExistedTopics = taosArrayGetSize(pOldConsumer->currentTopics);
for (int32_t i = 0; i < numOfExistedTopics; i++) {
char *topic = taosArrayGetP(pOldConsumer->currentTopics, i);
if (strcmp(topic, addedTopic) == 0) {
if (strcmp(topic, pNewTopic) == 0) {
existing = true;
}
}
#endif
// remove from new topic
for (int32_t i = 0; i < taosArrayGetSize(pOldConsumer->rebNewTopics); i++) {
char *topic = taosArrayGetP(pOldConsumer->rebNewTopics, i);
if (strcmp(addedTopic, topic) == 0) {
taosArrayRemove(pOldConsumer->rebNewTopics, i);
taosMemoryFree(topic);
break;
}
}
removeFromNewTopicList(pOldConsumer, pNewTopic);
// add to current topic
if (!existing) {
taosArrayPush(pOldConsumer->currentTopics, &addedTopic);
taosArrayPush(pOldConsumer->currentTopics, &pNewTopic);
taosArraySort(pOldConsumer->currentTopics, taosArrayCompareString);
} else {
taosMemoryFree(addedTopic);
taosMemoryFree(pNewTopic);
}
// set status
int32_t status = pOldConsumer->status;
if (taosArrayGetSize(pOldConsumer->rebNewTopics) == 0 && taosArrayGetSize(pOldConsumer->rebRemovedTopics) == 0) {
if (status == MQ_CONSUMER_STATUS__MODIFY || status == MQ_CONSUMER_STATUS__MODIFY_IN_REB) {
pOldConsumer->status = MQ_CONSUMER_STATUS__READY;
} else if (status == MQ_CONSUMER_STATUS__LOST_IN_REB || status == MQ_CONSUMER_STATUS__LOST) {
pOldConsumer->status = MQ_CONSUMER_STATUS__LOST_REBD;
}
} else {
if (status == MQ_CONSUMER_STATUS__MODIFY || status == MQ_CONSUMER_STATUS__MODIFY_IN_REB) {
pOldConsumer->status = MQ_CONSUMER_STATUS__MODIFY_IN_REB;
} else if (status == MQ_CONSUMER_STATUS__LOST || status == MQ_CONSUMER_STATUS__LOST_IN_REB) {
pOldConsumer->status = MQ_CONSUMER_STATUS__LOST_IN_REB;
}
}
updateConsumerStatus(pOldConsumer);
// the re-balance is triggered when the new consumer is launched.
pOldConsumer->rebalanceTime = pNewConsumer->upTime;
atomic_add_fetch_32(&pOldConsumer->epoch, 1);
mDebug("consumer:0x%" PRIx64 " state (%d)%s -> (%d)%s, new epoch:%d, reb-time:%" PRId64 ", current topics:%d",
mDebug("consumer:0x%" PRIx64 " state (%d)%s -> (%d)%s, new epoch:%d, reb-time:%" PRId64
", current topics:%d, newTopics:%d, removeTopics:%d",
pOldConsumer->consumerId, status, mndConsumerStatusName(status), pOldConsumer->status,
mndConsumerStatusName(pOldConsumer->status),
pOldConsumer->epoch, pOldConsumer->rebalanceTime, (int)taosArrayGetSize(pOldConsumer->currentTopics));
mndConsumerStatusName(pOldConsumer->status), pOldConsumer->epoch, pOldConsumer->rebalanceTime,
(int)taosArrayGetSize(pOldConsumer->currentTopics), (int)taosArrayGetSize(pOldConsumer->rebNewTopics),
(int)taosArrayGetSize(pOldConsumer->rebRemovedTopics));
} else if (pNewConsumer->updateType == CONSUMER_UPDATE__REMOVE) {
/*A(taosArrayGetSize(pNewConsumer->rebNewTopics) == 0);*/
/*A(taosArrayGetSize(pNewConsumer->rebRemovedTopics) == 1);*/
char *removedTopic = taosArrayGetP(pNewConsumer->rebRemovedTopics, 0);
// not exist in new topic
#if 0
for (int32_t i = 0; i < taosArrayGetSize(pOldConsumer->rebNewTopics); i++) {
char *topic = taosArrayGetP(pOldConsumer->rebNewTopics, i);
@ -991,14 +1035,7 @@ static int32_t mndConsumerActionUpdate(SSdb *pSdb, SMqConsumerObj *pOldConsumer,
#endif
// remove from removed topic
for (int32_t i = 0; i < taosArrayGetSize(pOldConsumer->rebRemovedTopics); i++) {
char *topic = taosArrayGetP(pOldConsumer->rebRemovedTopics, i);
if (strcmp(removedTopic, topic) == 0) {
taosArrayRemove(pOldConsumer->rebRemovedTopics, i);
taosMemoryFree(topic);
break;
}
}
removeFromRemoveTopicList(pOldConsumer, removedTopic);
// remove from current topic
int32_t i = 0;
@ -1011,32 +1048,20 @@ static int32_t mndConsumerActionUpdate(SSdb *pSdb, SMqConsumerObj *pOldConsumer,
break;
}
}
// must find the topic
/*A(i < sz);*/
// set status
int32_t status = pOldConsumer->status;
if (taosArrayGetSize(pOldConsumer->rebNewTopics) == 0 && taosArrayGetSize(pOldConsumer->rebRemovedTopics) == 0) {
if (status == MQ_CONSUMER_STATUS__MODIFY || status == MQ_CONSUMER_STATUS__MODIFY_IN_REB) {
pOldConsumer->status = MQ_CONSUMER_STATUS__READY;
} else if (status == MQ_CONSUMER_STATUS__LOST_IN_REB || status == MQ_CONSUMER_STATUS__LOST) {
pOldConsumer->status = MQ_CONSUMER_STATUS__LOST_REBD;
}
} else {
if (status == MQ_CONSUMER_STATUS__MODIFY || status == MQ_CONSUMER_STATUS__MODIFY_IN_REB) {
pOldConsumer->status = MQ_CONSUMER_STATUS__MODIFY_IN_REB;
} else if (status == MQ_CONSUMER_STATUS__LOST || status == MQ_CONSUMER_STATUS__LOST_IN_REB) {
pOldConsumer->status = MQ_CONSUMER_STATUS__LOST_IN_REB;
}
}
updateConsumerStatus(pOldConsumer);
pOldConsumer->rebalanceTime = pNewConsumer->upTime;
atomic_add_fetch_32(&pOldConsumer->epoch, 1);
mDebug("consumer:0x%" PRIx64 " state %d(%s) -> %d(%s), new epoch:%d, reb-time:%" PRId64 ", current topics:%d",
mDebug("consumer:0x%" PRIx64 " state (%d)%s -> (%d)%s, new epoch:%d, reb-time:%" PRId64
", current topics:%d, newTopics:%d, removeTopics:%d",
pOldConsumer->consumerId, status, mndConsumerStatusName(status), pOldConsumer->status,
mndConsumerStatusName(pOldConsumer->status),
pOldConsumer->epoch, pOldConsumer->rebalanceTime, (int)taosArrayGetSize(pOldConsumer->currentTopics));
mndConsumerStatusName(pOldConsumer->status), pOldConsumer->epoch, pOldConsumer->rebalanceTime,
(int)taosArrayGetSize(pOldConsumer->currentTopics), (int)taosArrayGetSize(pOldConsumer->rebNewTopics),
(int)taosArrayGetSize(pOldConsumer->rebRemovedTopics));
}
taosWUnLockLatch(&pOldConsumer->lock);

View File

@ -226,7 +226,7 @@ static int32_t mndProcessConnectReq(SRpcMsg *pReq) {
goto _OVER;
}
if ((code = taosCheckVersionCompatibleFromStr(connReq.sVer, version, 2)) != 0) {
if ((code = taosCheckVersionCompatibleFromStr(connReq.sVer, version, 3)) != 0) {
terrno = code;
goto _OVER;
}

View File

@ -197,24 +197,20 @@ static SMqRebInfo *mndGetOrCreateRebSub(SHashObj *pHash, const char *key) {
return pRebSub;
}
static int32_t mndDoRebalance(SMnode *pMnode, const SMqRebInputObj *pInput, SMqRebOutputObj *pOutput) {
int32_t totalVgNum = pOutput->pSub->vgNum;
const char *sub = pOutput->pSub->key;
mInfo("sub:%s mq re-balance %d vgroups", sub, pOutput->pSub->vgNum);
static void doRemoveExistedConsumers(SMqRebOutputObj *pOutput, SHashObj *pHash, const SMqRebInputObj *pInput) {
int32_t numOfRemoved = taosArrayGetSize(pInput->pRebInfo->removedConsumers);
const char *pSubKey = pOutput->pSub->key;
// 1. build temporary hash(vgId -> SMqRebOutputVg) to store modified vg
SHashObj *pHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_NO_LOCK);
// 2. check and get actual removed consumers, put their vg into hash
int32_t removedNum = taosArrayGetSize(pInput->pRebInfo->removedConsumers);
int32_t actualRemoved = 0;
for (int32_t i = 0; i < removedNum; i++) {
for (int32_t i = 0; i < numOfRemoved; i++) {
uint64_t consumerId = *(uint64_t *)taosArrayGet(pInput->pRebInfo->removedConsumers, i);
SMqConsumerEp *pConsumerEp = taosHashGet(pOutput->pSub->consumerHash, &consumerId, sizeof(int64_t));
// consumer exists till now
if (pConsumerEp) {
actualRemoved++;
int32_t consumerVgNum = taosArrayGetSize(pConsumerEp->vgs);
for (int32_t j = 0; j < consumerVgNum; j++) {
SMqVgEp *pVgEp = taosArrayGetP(pConsumerEp->vgs, j);
@ -223,52 +219,66 @@ static int32_t mndDoRebalance(SMnode *pMnode, const SMqRebInputObj *pInput, SMqR
.newConsumerId = -1,
.pVgEp = pVgEp,
};
taosHashPut(pHash, &pVgEp->vgId, sizeof(int32_t), &outputVg, sizeof(SMqRebOutputVg));
mInfo("sub:%s mq re-balance remove vgId:%d from consumer:0x%" PRIx64, sub, pVgEp->vgId, consumerId);
mInfo("sub:%s mq re-balance remove vgId:%d from consumer:%" PRIx64, pSubKey, pVgEp->vgId, consumerId);
}
taosArrayDestroy(pConsumerEp->vgs);
taosHashRemove(pOutput->pSub->consumerHash, &consumerId, sizeof(int64_t));
// put into removed
taosArrayPush(pOutput->removedConsumers, &consumerId);
}
}
if (removedNum != actualRemoved) {
mError("sub:%s mq re-balance removedNum:%d not matched with actual:%d", sub, removedNum, actualRemoved);
if (numOfRemoved != actualRemoved) {
mError("sub:%s mq re-balance removedNum:%d not matched with actual:%d", pSubKey, numOfRemoved, actualRemoved);
} else {
mInfo("sub:%s removed %d consumers", pSubKey, numOfRemoved);
}
}
// if previously no consumer, there are vgs not assigned
{
int32_t consumerVgNum = taosArrayGetSize(pOutput->pSub->unassignedVgs);
for (int32_t i = 0; i < consumerVgNum; i++) {
SMqVgEp *pVgEp = *(SMqVgEp **)taosArrayPop(pOutput->pSub->unassignedVgs);
SMqRebOutputVg rebOutput = {
.oldConsumerId = -1,
.newConsumerId = -1,
.pVgEp = pVgEp,
};
taosHashPut(pHash, &pVgEp->vgId, sizeof(int32_t), &rebOutput, sizeof(SMqRebOutputVg));
mInfo("sub:%s mq re-balance remove vgId:%d from unassigned", sub, pVgEp->vgId);
}
static void doAddNewConsumers(SMqRebOutputObj *pOutput, const SMqRebInputObj *pInput) {
int32_t numOfNewConsumers = taosArrayGetSize(pInput->pRebInfo->newConsumers);
const char *pSubKey = pOutput->pSub->key;
for (int32_t i = 0; i < numOfNewConsumers; i++) {
int64_t consumerId = *(int64_t *)taosArrayGet(pInput->pRebInfo->newConsumers, i);
SMqConsumerEp newConsumerEp;
newConsumerEp.consumerId = consumerId;
newConsumerEp.vgs = taosArrayInit(0, sizeof(void *));
taosHashPut(pOutput->pSub->consumerHash, &consumerId, sizeof(int64_t), &newConsumerEp, sizeof(SMqConsumerEp));
taosArrayPush(pOutput->newConsumers, &consumerId);
mInfo("sub:%s mq rebalance add new consumer:%" PRIx64, pSubKey, consumerId);
}
}
// 3. calc vg number of each consumer
int32_t afterRebConsumerNum = pInput->oldConsumerNum + taosArrayGetSize(pInput->pRebInfo->newConsumers) -
taosArrayGetSize(pInput->pRebInfo->removedConsumers);
int32_t minVgCnt = 0;
int32_t imbConsumerNum = 0;
// calc num
if (afterRebConsumerNum) {
minVgCnt = totalVgNum / afterRebConsumerNum;
imbConsumerNum = totalVgNum % afterRebConsumerNum;
static void addUnassignedVgroups(SMqRebOutputObj *pOutput, SHashObj* pHash) {
const char *pSubKey = pOutput->pSub->key;
int32_t numOfVgroups = taosArrayGetSize(pOutput->pSub->unassignedVgs);
for (int32_t i = 0; i < numOfVgroups; i++) {
SMqVgEp *pVgEp = *(SMqVgEp **)taosArrayPop(pOutput->pSub->unassignedVgs);
SMqRebOutputVg rebOutput = {
.oldConsumerId = -1,
.newConsumerId = -1,
.pVgEp = pVgEp,
};
taosHashPut(pHash, &pVgEp->vgId, sizeof(int32_t), &rebOutput, sizeof(SMqRebOutputVg));
mInfo("sub:%s mq re-balance remove vgId:%d from unassigned", pSubKey, pVgEp->vgId);
}
}
mInfo("sub:%s mq re-balance %d consumers: at least %d vgs each, %d consumers has more vgs", sub,
afterRebConsumerNum, minVgCnt, imbConsumerNum);
static void transferVgroupsForConsumers(SMqRebOutputObj *pOutput, SHashObj* pHash, int32_t minVgCnt, int32_t imbConsumerNum) {
const char *pSubKey = pOutput->pSub->key;
// 4. first scan: remove consumer more than wanted, put to remove hash
int32_t imbCnt = 0;
void *pIter = NULL;
while (1) {
pIter = taosHashIterate(pOutput->pSub->consumerHash, pIter);
if (pIter == NULL) {
@ -276,8 +286,8 @@ static int32_t mndDoRebalance(SMnode *pMnode, const SMqRebInputObj *pInput, SMqR
}
SMqConsumerEp *pConsumerEp = (SMqConsumerEp *)pIter;
int32_t consumerVgNum = taosArrayGetSize(pConsumerEp->vgs);
int32_t consumerVgNum = taosArrayGetSize(pConsumerEp->vgs);
// all old consumers still existing are touched
// TODO optimize: touch only consumer whose vgs changed
taosArrayPush(pOutput->touchedConsumers, &pConsumerEp->consumerId);
@ -296,13 +306,13 @@ static int32_t mndDoRebalance(SMnode *pMnode, const SMqRebInputObj *pInput, SMqR
.pVgEp = pVgEp,
};
taosHashPut(pHash, &pVgEp->vgId, sizeof(int32_t), &outputVg, sizeof(SMqRebOutputVg));
mInfo("sub:%s mq rebalance remove vgId:%d from consumer:0x%" PRIx64 ",(first scan)", sub, pVgEp->vgId,
mInfo("sub:%s mq rebalance remove vgId:%d from consumer:0x%" PRIx64 ",(first scan)", pSubKey, pVgEp->vgId,
pConsumerEp->consumerId);
}
imbCnt++;
}
} else {
// pop until equal minVg
// all the remain consumers should only have the number of vgroups, which is equalled to the value of minVg
while (taosArrayGetSize(pConsumerEp->vgs) > minVgCnt) {
SMqVgEp *pVgEp = *(SMqVgEp **)taosArrayPop(pConsumerEp->vgs);
SMqRebOutputVg outputVg = {
@ -311,36 +321,67 @@ static int32_t mndDoRebalance(SMnode *pMnode, const SMqRebInputObj *pInput, SMqR
.pVgEp = pVgEp,
};
taosHashPut(pHash, &pVgEp->vgId, sizeof(int32_t), &outputVg, sizeof(SMqRebOutputVg));
mInfo("sub:%s mq rebalance remove vgId:%d from consumer:0x%" PRIx64 ",(first scan)", sub, pVgEp->vgId,
mInfo("sub:%s mq rebalance remove vgId:%d from consumer:0x%" PRIx64 ",(first scan)", pSubKey, pVgEp->vgId,
pConsumerEp->consumerId);
}
}
}
}
}
// 5. add new consumer into sub
{
int32_t consumerNum = taosArrayGetSize(pInput->pRebInfo->newConsumers);
for (int32_t i = 0; i < consumerNum; i++) {
int64_t consumerId = *(int64_t *)taosArrayGet(pInput->pRebInfo->newConsumers, i);
static int32_t mndDoRebalance(SMnode *pMnode, const SMqRebInputObj *pInput, SMqRebOutputObj *pOutput) {
int32_t totalVgNum = pOutput->pSub->vgNum;
const char *pSubKey = pOutput->pSub->key;
SMqConsumerEp newConsumerEp;
newConsumerEp.consumerId = consumerId;
newConsumerEp.vgs = taosArrayInit(0, sizeof(void *));
taosHashPut(pOutput->pSub->consumerHash, &consumerId, sizeof(int64_t), &newConsumerEp, sizeof(SMqConsumerEp));
taosArrayPush(pOutput->newConsumers, &consumerId);
mInfo("sub:%s mq rebalance add new consumer:0x%" PRIx64, sub, consumerId);
}
int32_t numOfRemoved = taosArrayGetSize(pInput->pRebInfo->removedConsumers);
int32_t numOfAdded = taosArrayGetSize(pInput->pRebInfo->newConsumers);
mInfo("sub:%s mq re-balance %d vgroups, existed consumers:%d, added:%d, removed:%d", pSubKey, totalVgNum,
pInput->oldConsumerNum, numOfAdded, numOfRemoved);
// 1. build temporary hash(vgId -> SMqRebOutputVg) to store modified vg
SHashObj *pHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_NO_LOCK);
// 2. check and get actual removed consumers, put their vg into hash
doRemoveExistedConsumers(pOutput, pHash, pInput);
// 3. if previously no consumer, there are vgs not assigned
addUnassignedVgroups(pOutput, pHash);
// 4. calc vg number of each consumer
int32_t numOfFinal = pInput->oldConsumerNum + numOfAdded - numOfRemoved;
int32_t minVgCnt = 0;
int32_t imbConsumerNum = 0;
// calc num
if (numOfFinal) {
minVgCnt = totalVgNum / numOfFinal;
imbConsumerNum = totalVgNum % numOfFinal;
mInfo("sub:%s mq re-balance %d consumers: at least %d vgs each, %d consumers has 1 more vgroups than avg value",
pSubKey, numOfFinal, minVgCnt, imbConsumerNum);
} else {
mInfo("sub:%s no consumer subscribe this topic", pSubKey);
}
// 6. second scan: find consumer do not have enough vg, extract from temporary hash and assign to new consumer.
// 5. first scan: remove vgroups from te consumers, who have more vgroups than the threashold value that is
// minVgCnt, and then put them into the recycled hash list
transferVgroupsForConsumers(pOutput, pHash, minVgCnt, imbConsumerNum);
// 6. add new consumer into sub
doAddNewConsumers(pOutput, pInput);
// 7. second scan: find consumer do not have enough vgroups, extract from temporary hash and assign to them
// All related vg should be put into rebVgs
SMqRebOutputVg *pRebVg = NULL;
void *pRemovedIter = NULL;
pIter = NULL;
void *pIter = NULL;
while (1) {
pIter = taosHashIterate(pOutput->pSub->consumerHash, pIter);
if (pIter == NULL) break;
if (pIter == NULL) {
break;
}
SMqConsumerEp *pConsumerEp = (SMqConsumerEp *)pIter;
// push until equal minVg
@ -348,8 +389,8 @@ static int32_t mndDoRebalance(SMnode *pMnode, const SMqRebInputObj *pInput, SMqR
// iter hash and find one vg
pRemovedIter = taosHashIterate(pHash, pRemovedIter);
if (pRemovedIter == NULL) {
mError("sub:%s removed iter is null", sub);
continue;
mError("sub:%s removed iter is null", pSubKey);
break;
}
pRebVg = (SMqRebOutputVg *)pRemovedIter;
@ -409,15 +450,15 @@ static int32_t mndDoRebalance(SMnode *pMnode, const SMqRebInputObj *pInput, SMqR
taosArrayPush(pOutput->pSub->unassignedVgs, &pRebOutput->pVgEp);
taosArrayPush(pOutput->rebVgs, pRebOutput);
mInfo("sub:%s mq re-balance unassign vgId:%d (second scan)", sub, pRebOutput->pVgEp->vgId);
mInfo("sub:%s mq re-balance unassign vgId:%d (second scan)", pSubKey, pRebOutput->pVgEp->vgId);
}
}
// 8. generate logs
mInfo("sub:%s mq re-balance calculation completed, re-balanced vg", sub);
mInfo("sub:%s mq re-balance calculation completed, re-balanced vg", pSubKey);
for (int32_t i = 0; i < taosArrayGetSize(pOutput->rebVgs); i++) {
SMqRebOutputVg *pOutputRebVg = taosArrayGet(pOutput->rebVgs, i);
mInfo("sub:%s mq re-balance vgId:%d, moved from consumer:0x%" PRIx64 ", to consumer:0x%" PRIx64, sub,
mInfo("sub:%s mq re-balance vgId:%d, moved from consumer:0x%" PRIx64 ", to consumer:0x%" PRIx64, pSubKey,
pOutputRebVg->pVgEp->vgId, pOutputRebVg->oldConsumerId, pOutputRebVg->newConsumerId);
}
{
@ -427,10 +468,10 @@ static int32_t mndDoRebalance(SMnode *pMnode, const SMqRebInputObj *pInput, SMqR
if (pIter == NULL) break;
SMqConsumerEp *pConsumerEp = (SMqConsumerEp *)pIter;
int32_t sz = taosArrayGetSize(pConsumerEp->vgs);
mInfo("sub:%s mq re-balance final cfg: consumer:0x%" PRIx64 " has %d vg", sub, pConsumerEp->consumerId, sz);
mInfo("sub:%s mq re-balance final cfg: consumer:0x%" PRIx64 " has %d vg", pSubKey, pConsumerEp->consumerId, sz);
for (int32_t i = 0; i < sz; i++) {
SMqVgEp *pVgEp = taosArrayGetP(pConsumerEp->vgs, i);
mInfo("sub:%s mq re-balance final cfg: vg %d to consumer:0x%" PRIx64, sub, pVgEp->vgId,
mInfo("sub:%s mq re-balance final cfg: vg %d to consumer:0x%" PRIx64, pSubKey, pVgEp->vgId,
pConsumerEp->consumerId);
}
}
@ -555,17 +596,23 @@ static int32_t mndProcessRebalanceReq(SRpcMsg *pMsg) {
SMnode *pMnode = pMsg->info.node;
SMqDoRebalanceMsg *pReq = pMsg->pCont;
void *pIter = NULL;
bool rebalanceOnce = false; // to ensure only once.
mInfo("mq re-balance start");
mInfo("mq re-balance start, total required re-balanced trans:%d", taosHashGetSize(pReq->rebSubHash));
// here we only handle one topic rebalance requirement to ensure the atomic execution of this transaction.
while (1) {
if (rebalanceOnce) {
break;
}
pIter = taosHashIterate(pReq->rebSubHash, pIter);
if (pIter == NULL) {
break;
}
// todo handle the malloc failure
SMqRebInputObj rebInput = {0};
SMqRebOutputObj rebOutput = {0};
rebOutput.newConsumers = taosArrayInit(0, sizeof(int64_t));
rebOutput.removedConsumers = taosArrayInit(0, sizeof(int64_t));
@ -582,9 +629,10 @@ static int32_t mndProcessRebalanceReq(SRpcMsg *pMsg) {
char topic[TSDB_TOPIC_FNAME_LEN];
char cgroup[TSDB_CGROUP_LEN];
mndSplitSubscribeKey(pRebInfo->key, topic, cgroup, true);
SMqTopicObj *pTopic = mndAcquireTopic(pMnode, topic);
if (pTopic == NULL) {
mError("mq re-balance %s ignored since topic %s not exist", pRebInfo->key, topic);
mError("mq re-balance %s ignored since topic %s doesn't exist", pRebInfo->key, topic);
continue;
}
@ -604,11 +652,13 @@ static int32_t mndProcessRebalanceReq(SRpcMsg *pMsg) {
mndReleaseTopic(pMnode, pTopic);
rebInput.oldConsumerNum = 0;
mInfo("topic:%s has no consumers sub yet", topic);
} else {
taosRLockLatch(&pSub->lock);
rebInput.oldConsumerNum = taosHashGetSize(pSub->consumerHash);
rebOutput.pSub = tCloneSubscribeObj(pSub);
taosRUnLockLatch(&pSub->lock);
mInfo("topic:%s has %d consumers sub till now", pRebInfo->key, rebInput.oldConsumerNum);
mndReleaseSubscribe(pMnode, pSub);
}
@ -623,16 +673,14 @@ static int32_t mndProcessRebalanceReq(SRpcMsg *pMsg) {
mError("mq re-balance persist output error, possibly vnode splitted or dropped");
}
taosArrayDestroy(pRebInfo->lostConsumers);
taosArrayDestroy(pRebInfo->newConsumers);
taosArrayDestroy(pRebInfo->removedConsumers);
taosArrayDestroy(rebOutput.newConsumers);
taosArrayDestroy(rebOutput.touchedConsumers);
taosArrayDestroy(rebOutput.removedConsumers);
taosArrayDestroy(rebOutput.rebVgs);
tDeleteSubscribeObj(rebOutput.pSub);
taosMemoryFree(rebOutput.pSub);
rebalanceOnce = true;
}
// reset flag

View File

@ -33,7 +33,7 @@ static int32_t mndSyncEqCtrlMsg(const SMsgCb *msgcb, SRpcMsg *pMsg) {
return -1;
}
int32_t code = tmsgPutToQueue(msgcb, SYNC_CTRL_QUEUE, pMsg);
int32_t code = tmsgPutToQueue(msgcb, SYNC_RD_QUEUE, pMsg);
if (code != 0) {
rpcFreeCont(pMsg->pCont);
pMsg->pCont = NULL;

View File

@ -57,7 +57,7 @@ target_sources(
# tq
"src/tq/tq.c"
"src/tq/tqExec.c"
"src/tq/tqScan.c"
"src/tq/tqMeta.c"
"src/tq/tqRead.c"
"src/tq/tqOffset.c"

View File

@ -182,7 +182,7 @@ int32_t tsdbReaderOpen(SVnode *pVnode, SQueryTableDataCond *pCond, void *pTableL
void tsdbReaderSetId(STsdbReader* pReader, const char* idstr);
void tsdbReaderClose(STsdbReader *pReader);
bool tsdbNextDataBlock(STsdbReader *pReader);
int32_t tsdbNextDataBlock(STsdbReader *pReader, bool *hasNext);
int32_t tsdbRetrieveDatablockSMA(STsdbReader *pReader, SSDataBlock *pDataBlock, bool *allHave);
void tsdbReleaseDataBlock(STsdbReader *pReader);
SSDataBlock *tsdbRetrieveDataBlock(STsdbReader *pTsdbReadHandle, SArray *pColumnIdList);

View File

@ -193,7 +193,7 @@ STQ* tqOpen(const char* path, SVnode* pVnode);
void tqClose(STQ*);
int tqPushMsg(STQ*, void* msg, int32_t msgLen, tmsg_t msgType, int64_t ver);
int tqRegisterPushEntry(STQ* pTq, void* pHandle, const SMqPollReq* pRequest, SRpcMsg* pRpcMsg, SMqDataRsp* pDataRsp, int32_t type);
int tqRemovePushEntry(STQ* pTq, const char* pKey, int32_t keyLen, uint64_t consumerId, bool rspConsumer);
int tqUnregisterPushEntry(STQ* pTq, const char* pKey, int32_t keyLen, uint64_t consumerId, bool rspConsumer);
int tqCommit(STQ*);
int32_t tqUpdateTbUidList(STQ* pTq, const SArray* tbUidList, bool isAdd);

View File

@ -869,7 +869,7 @@ int32_t tqProcessSubscribeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg
atomic_store_32(&pHandle->epoch, -1);
// remove if it has been register in the push manager, and return one empty block to consumer
tqRemovePushEntry(pTq, req.subKey, (int32_t)strlen(req.subKey), pHandle->consumerId, true);
tqUnregisterPushEntry(pTq, req.subKey, (int32_t)strlen(req.subKey), pHandle->consumerId, true);
atomic_store_64(&pHandle->consumerId, req.newConsumerId);
atomic_add_fetch_32(&pHandle->epoch, 1);

View File

@ -206,7 +206,84 @@ int32_t tqPushMsgNew(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_
}
#endif
int tqPushMsg(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_t ver) {
typedef struct {
void* pKey;
int64_t keyLen;
} SItem;
static void recordPushedEntry(SArray* cachedKey, void* pIter);
static void freeItem(void* param) {
SItem* p = (SItem*) param;
taosMemoryFree(p->pKey);
}
static void doRemovePushedEntry(SArray* pCachedKeys, STQ* pTq) {
int32_t vgId = TD_VID(pTq->pVnode);
int32_t numOfKeys = (int32_t) taosArrayGetSize(pCachedKeys);
for (int32_t i = 0; i < numOfKeys; i++) {
SItem* pItem = taosArrayGet(pCachedKeys, i);
if (taosHashRemove(pTq->pPushMgr, pItem->pKey, pItem->keyLen) != 0) {
tqError("vgId:%d, tq push hash remove key error, key: %s", vgId, (char*) pItem->pKey);
}
}
if (numOfKeys > 0) {
tqDebug("vgId:%d, pushed %d items and remain:%d", vgId, numOfKeys, (int32_t)taosHashGetSize(pTq->pPushMgr));
}
}
static void doPushDataForEntry(void* pIter, STqExecHandle* pExec, STQ* pTq, int64_t ver, int32_t vgId, char* pData,
int32_t dataLen, SArray* pCachedKey) {
STqPushEntry* pPushEntry = *(STqPushEntry**)pIter;
SMqDataRsp* pRsp = pPushEntry->pDataRsp;
if (pRsp->reqOffset.version >= ver) {
tqDebug("vgId:%d, push entry req version %" PRId64 ", while push version %" PRId64 ", skip", vgId,
pRsp->reqOffset.version, ver);
return;
}
qTaskInfo_t pTaskInfo = pExec->task;
// prepare scan mem data
SPackedData submit = {.msgStr = pData, .msgLen = dataLen, .ver = ver};
if (qStreamSetScanMemData(pTaskInfo, submit) != 0) {
return;
}
// here start to scan submit block to extract the subscribed data
int32_t totalRows = 0;
while (1) {
SSDataBlock* pDataBlock = NULL;
uint64_t ts = 0;
if (qExecTask(pTaskInfo, &pDataBlock, &ts) < 0) {
tqDebug("vgId:%d, tq exec error since %s", vgId, terrstr());
}
if (pDataBlock == NULL) {
break;
}
tqAddBlockDataToRsp(pDataBlock, pRsp, pExec->numOfCols, pTq->pVnode->config.tsdbCfg.precision);
pRsp->blockNum++;
totalRows += pDataBlock->info.rows;
}
tqDebug("vgId:%d, tq handle push, subkey:%s, block num:%d, rows:%d", vgId, pPushEntry->subKey, pRsp->blockNum,
totalRows);
if (pRsp->blockNum > 0) {
tqOffsetResetToLog(&pRsp->rspOffset, ver);
tqPushDataRsp(pTq, pPushEntry);
recordPushedEntry(pCachedKey, pIter);
}
}
int32_t tqPushMsg(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_t ver) {
void* pReq = POINTER_SHIFT(msg, sizeof(SSubmitReq2Msg));
int32_t len = msgLen - sizeof(SSubmitReq2Msg);
int32_t vgId = TD_VID(pTq->pVnode);
@ -220,24 +297,19 @@ int tqPushMsg(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_t ver)
tqDebug("vgId:%d tq push msg version:%" PRId64 " type:%s, head:%p, body:%p len:%d, numOfPushed consumers:%d",
vgId, ver, TMSG_INFO(msgType), msg, pReq, len, numOfRegisteredPush);
SArray* cachedKeys = taosArrayInit(0, sizeof(void*));
SArray* cachedKeyLens = taosArrayInit(0, sizeof(size_t));
void* data = taosMemoryMalloc(len);
void* data = taosMemoryMalloc(len);
if (data == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
tqError("failed to copy data for stream since out of memory");
taosArrayDestroyP(cachedKeys, (FDelete)taosMemoryFree);
taosArrayDestroy(cachedKeyLens);
// unlock
tqError("failed to copy data for stream since out of memory, vgId:%d", vgId);
taosWUnLockLatch(&pTq->lock);
return -1;
}
memcpy(data, pReq, len);
void* pIter = NULL;
SArray* cachedKey = taosArrayInit(0, sizeof(SItem));
void* pIter = NULL;
while (1) {
pIter = taosHashIterate(pTq->pPushMgr, pIter);
if (pIter == NULL) {
@ -248,83 +320,29 @@ int tqPushMsg(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_t ver)
STqHandle* pHandle = taosHashGet(pTq->pHandle, pPushEntry->subKey, strlen(pPushEntry->subKey));
if (pHandle == NULL) {
tqDebug("vgId:%d, cannot find handle %s", pTq->pVnode->config.vgId, pPushEntry->subKey);
continue;
}
SMqDataRsp* pRsp = pPushEntry->pDataRsp;
if (pRsp->reqOffset.version >= ver) {
tqDebug("vgId:%d, push entry req version %" PRId64 ", while push version %" PRId64 ", skip", vgId,
pRsp->reqOffset.version, ver);
tqDebug("vgId:%d, failed to find handle %s in pushing data to consumer, ignore", pTq->pVnode->config.vgId, pPushEntry->subKey);
continue;
}
STqExecHandle* pExec = &pHandle->execHandle;
qTaskInfo_t task = pExec->task;
// prepare scan mem data
SPackedData submit = {
.msgStr = data,
.msgLen = len,
.ver = ver,
};
if(qStreamSetScanMemData(task, submit) != 0){
continue;
}
// here start to scan submit block to extract the subscribed data
while (1) {
SSDataBlock* pDataBlock = NULL;
uint64_t ts = 0;
if (qExecTask(task, &pDataBlock, &ts) < 0) {
tqDebug("vgId:%d, tq exec error since %s", vgId, terrstr());
}
if (pDataBlock == NULL) {
break;
}
tqAddBlockDataToRsp(pDataBlock, pRsp, pExec->numOfCols, pTq->pVnode->config.tsdbCfg.precision);
pRsp->blockNum++;
}
tqDebug("vgId:%d, tq handle push, subkey:%s, block num:%d", vgId, pPushEntry->subKey, pRsp->blockNum);
if (pRsp->blockNum > 0) {
// set offset
tqOffsetResetToLog(&pRsp->rspOffset, ver);
// remove from hash
size_t kLen;
void* key = taosHashGetKey(pIter, &kLen);
void* keyCopy = taosMemoryCalloc(1, kLen + 1);
memcpy(keyCopy, key, kLen);
taosArrayPush(cachedKeys, &keyCopy);
taosArrayPush(cachedKeyLens, &kLen);
tqPushDataRsp(pTq, pPushEntry);
}
doPushDataForEntry(pIter, pExec, pTq, ver, vgId, data, len, cachedKey);
}
// delete entry
for (int32_t i = 0; i < taosArrayGetSize(cachedKeys); i++) {
void* key = taosArrayGetP(cachedKeys, i);
size_t kLen = *(size_t*)taosArrayGet(cachedKeyLens, i);
if (taosHashRemove(pTq->pPushMgr, key, kLen) != 0) {
tqError("vgId:%d, tq push hash remove key error, key: %s", pTq->pVnode->config.vgId, (char*)key);
}
}
taosArrayDestroyP(cachedKeys, (FDelete)taosMemoryFree);
taosArrayDestroy(cachedKeyLens);
doRemovePushedEntry(cachedKey, pTq);
taosArrayDestroyEx(cachedKey, freeItem);
taosMemoryFree(data);
}
// unlock
taosWUnLockLatch(&pTq->lock);
}
// push data for stream processing
if (!tsDisableStream && vnodeIsRoleLeader(pTq->pVnode)) {
if (taosHashGetSize(pTq->pStreamMeta->pTasks) == 0) return 0;
if (taosHashGetSize(pTq->pStreamMeta->pTasks) == 0) {
return 0;
}
if (msgType == TDMT_VND_SUBMIT) {
void* data = taosMemoryMalloc(len);
if (data == NULL) {
@ -332,12 +350,9 @@ int tqPushMsg(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_t ver)
tqError("failed to copy data for stream since out of memory");
return -1;
}
memcpy(data, pReq, len);
SPackedData submit = {
.msgStr = data,
.msgLen = len,
.ver = ver,
};
SPackedData submit = {.msgStr = data, .msgLen = len, .ver = ver};
tqDebug("tq copy write msg %p %d %" PRId64 " from %p", data, len, ver, pReq);
tqProcessSubmitReq(pTq, submit);
@ -351,6 +366,13 @@ int tqPushMsg(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_t ver)
return 0;
}
void recordPushedEntry(SArray* cachedKey, void* pIter) {
size_t kLen = 0;
void* key = taosHashGetKey(pIter, &kLen);
SItem item = {.pKey = strndup(key, kLen), .keyLen = kLen};
taosArrayPush(cachedKey, &item);
}
int32_t tqRegisterPushEntry(STQ* pTq, void* pHandle, const SMqPollReq* pRequest, SRpcMsg* pRpcMsg,
SMqDataRsp* pDataRsp, int32_t type) {
uint64_t consumerId = pRequest->consumerId;
@ -388,8 +410,8 @@ int32_t tqRegisterPushEntry(STQ* pTq, void* pHandle, const SMqPollReq* pRequest,
return 0;
}
int32_t tqRemovePushEntry(STQ* pTq, const char* pKey, int32_t keyLen, uint64_t consumerId, bool rspConsumer) {
int32_t vgId = TD_VID(pTq->pVnode);
int32_t tqUnregisterPushEntry(STQ* pTq, const char* pKey, int32_t keyLen, uint64_t consumerId, bool rspConsumer) {
int32_t vgId = TD_VID(pTq->pVnode);
STqPushEntry** pEntry = taosHashGet(pTq->pPushMgr, pKey, keyLen);
if (pEntry != NULL) {

View File

@ -18,7 +18,9 @@
int32_t tqAddBlockDataToRsp(const SSDataBlock* pBlock, SMqDataRsp* pRsp, int32_t numOfCols, int8_t precision) {
int32_t dataStrLen = sizeof(SRetrieveTableRsp) + blockGetEncodeSize(pBlock);
void* buf = taosMemoryCalloc(1, dataStrLen);
if (buf == NULL) return -1;
if (buf == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
}
SRetrieveTableRsp* pRetrieve = (SRetrieveTableRsp*)buf;
pRetrieve->useconds = 0;
@ -31,7 +33,8 @@ int32_t tqAddBlockDataToRsp(const SSDataBlock* pBlock, SMqDataRsp* pRsp, int32_t
actualLen += sizeof(SRetrieveTableRsp);
taosArrayPush(pRsp->blockDataLen, &actualLen);
taosArrayPush(pRsp->blockData, &buf);
return 0;
return TSDB_CODE_SUCCESS;
}
static int32_t tqAddBlockSchemaToRsp(const STqExecHandle* pExec, STaosxRsp* pRsp) {
@ -62,8 +65,12 @@ static int32_t tqAddTbNameToRsp(const STQ* pTq, int64_t uid, STaosxRsp* pRsp, in
}
int32_t tqScanData(STQ* pTq, const STqHandle* pHandle, SMqDataRsp* pRsp, STqOffsetVal* pOffset) {
const STqExecHandle* pExec = &pHandle->execHandle;
const int32_t MAX_ROWS_TO_RETURN = 4096;
int32_t vgId = TD_VID(pTq->pVnode);
int32_t code = 0;
int32_t totalRows = 0;
const STqExecHandle* pExec = &pHandle->execHandle;
qTaskInfo_t task = pExec->task;
if (qStreamPrepareScan(task, pOffset, pHandle->execHandle.subType) < 0) {
@ -71,28 +78,34 @@ int32_t tqScanData(STQ* pTq, const STqHandle* pHandle, SMqDataRsp* pRsp, STqOffs
return -1;
}
int32_t rowCnt = 0;
while (1) {
SSDataBlock* pDataBlock = NULL;
uint64_t ts = 0;
tqDebug("consumer:0x%"PRIx64" vgId:%d, tmq task start execute", pHandle->consumerId, pTq->pVnode->config.vgId);
tqDebug("consumer:0x%"PRIx64" vgId:%d, tmq task start execute", pHandle->consumerId, vgId);
if (qExecTask(task, &pDataBlock, &ts) < 0) {
tqError("consumer:0x%"PRIx64" vgId:%d, task exec error since %s", pHandle->consumerId, pTq->pVnode->config.vgId, terrstr());
tqError("consumer:0x%"PRIx64" vgId:%d, task exec error since %s", pHandle->consumerId, vgId, terrstr());
return -1;
}
tqDebug("consumer:0x%"PRIx64" vgId:%d, tmq task end execute, get block:%p", pHandle->consumerId, pTq->pVnode->config.vgId, pDataBlock);
tqDebug("consumer:0x%"PRIx64" vgId:%d tmq task executed, rows:%d, total blocks:%d", pHandle->consumerId, vgId,
pDataBlock->info.rows, pRsp->blockNum);
// current scan should be stopped asap, since the rebalance occurs.
if (pDataBlock == NULL) {
break;
}
tqAddBlockDataToRsp(pDataBlock, pRsp, pExec->numOfCols, pTq->pVnode->config.tsdbCfg.precision);
code = tqAddBlockDataToRsp(pDataBlock, pRsp, pExec->numOfCols, pTq->pVnode->config.tsdbCfg.precision);
if (code != TSDB_CODE_SUCCESS) {
tqError("vgId:%d, failed to add block to rsp msg", vgId);
return code;
}
pRsp->blockNum++;
rowCnt += pDataBlock->info.rows;
if (rowCnt >= 4096) {
totalRows += pDataBlock->info.rows;
if (totalRows >= MAX_ROWS_TO_RETURN) {
break;
}
}

View File

@ -15,6 +15,7 @@
#include "osDef.h"
#include "tsdb.h"
#include "tsimplehash.h"
#define ASCENDING_TRAVERSE(o) (o == TSDB_ORDER_ASC)
@ -176,14 +177,15 @@ struct STsdbReader {
SBlockLoadSuppInfo suppInfo;
STsdbReadSnap* pReadSnap;
SIOCostSummary cost;
STSchema* pSchema; // the newest version schema
STSchema* pMemSchema; // the previous schema for in-memory data, to avoid load schema too many times
SDataFReader* pFileReader; // the file reader
SDelFReader* pDelFReader; // the del file reader
SArray* pDelIdx; // del file block index;
SBlockInfoBuf blockInfoBuf;
int32_t step;
STsdbReader* innerReader[2];
STSchema* pSchema; // the newest version schema
// STSchema* pMemSchema; // the previous schema for in-memory data, to avoid load schema too many times
SSHashObj* pSchemaMap; // keep the retrieved schema info, to avoid the overhead by repeatly load schema
SDataFReader* pFileReader; // the file reader
SDelFReader* pDelFReader; // the del file reader
SArray* pDelIdx; // del file block index;
SBlockInfoBuf blockInfoBuf;
int32_t step;
STsdbReader* innerReader[2];
};
static SFileDataBlockInfo* getCurrentBlockInfo(SDataBlockIter* pBlockIter);
@ -949,14 +951,17 @@ static void setBlockAllDumped(SFileBlockDumpInfo* pDumpInfo, int64_t maxKey, int
pDumpInfo->lastKey = maxKey + step;
}
static void doCopyColVal(SColumnInfoData* pColInfoData, int32_t rowIndex, int32_t colIndex, SColVal* pColVal,
static int32_t doCopyColVal(SColumnInfoData* pColInfoData, int32_t rowIndex, int32_t colIndex, SColVal* pColVal,
SBlockLoadSuppInfo* pSup) {
if (IS_VAR_DATA_TYPE(pColVal->type)) {
if (!COL_VAL_IS_VALUE(pColVal)) {
colDataSetNULL(pColInfoData, rowIndex);
} else {
varDataSetLen(pSup->buildBuf[colIndex], pColVal->value.nData);
ASSERT(pColVal->value.nData <= pColInfoData->info.bytes);
if (pColVal->value.nData > pColInfoData->info.bytes) {
tsdbWarn("column cid:%d actual data len %d is bigger than schema len %d", pColVal->cid, pColVal->value.nData, pColInfoData->info.bytes);
return TSDB_CODE_TDB_INVALID_TABLE_SCHEMA_VER;
}
if (pColVal->value.nData > 0) { // pData may be null, if nData is 0
memcpy(varDataVal(pSup->buildBuf[colIndex]), pColVal->value.pData, pColVal->value.nData);
}
@ -966,6 +971,8 @@ static void doCopyColVal(SColumnInfoData* pColInfoData, int32_t rowIndex, int32_
} else {
colDataSetVal(pColInfoData, rowIndex, (const char*)&pColVal->value, !COL_VAL_IS_VALUE(pColVal));
}
return TSDB_CODE_SUCCESS;
}
static SFileDataBlockInfo* getCurrentBlockInfo(SDataBlockIter* pBlockIter) {
@ -1167,6 +1174,7 @@ static int32_t copyBlockDataToSDataBlock(STsdbReader* pReader) {
SDataBlk* pBlock = getCurrentBlock(pBlockIter);
SSDataBlock* pResBlock = pReader->pResBlock;
int32_t numOfOutputCols = pSupInfo->numOfCols;
int32_t code = TSDB_CODE_SUCCESS;
SColVal cv = {0};
int64_t st = taosGetTimestampUs();
@ -1244,7 +1252,10 @@ static int32_t copyBlockDataToSDataBlock(STsdbReader* pReader) {
} else { // varchar/nchar type
for (int32_t j = pDumpInfo->rowIndex; rowIndex < dumpedRows; j += step) {
tColDataGetValue(pData, j, &cv);
doCopyColVal(pColData, rowIndex++, i, &cv, pSupInfo);
code = doCopyColVal(pColData, rowIndex++, i, &cv, pSupInfo);
if (code) {
return code;
}
}
}
}
@ -1776,23 +1787,29 @@ static int32_t buildDataBlockFromBuf(STsdbReader* pReader, STableBlockScanInfo*
}
static bool tryCopyDistinctRowFromFileBlock(STsdbReader* pReader, SBlockData* pBlockData, int64_t key,
SFileBlockDumpInfo* pDumpInfo) {
SFileBlockDumpInfo* pDumpInfo, bool *copied) {
// opt version
// 1. it is not a border point
// 2. the direct next point is not an duplicated timestamp
int32_t code = TSDB_CODE_SUCCESS;
*copied = false;
bool asc = (pReader->order == TSDB_ORDER_ASC);
if ((pDumpInfo->rowIndex < pDumpInfo->totalRows - 1 && asc) || (pDumpInfo->rowIndex > 0 && (!asc))) {
int32_t step = pReader->order == TSDB_ORDER_ASC ? 1 : -1;
int64_t nextKey = pBlockData->aTSKEY[pDumpInfo->rowIndex + step];
if (nextKey != key) { // merge is not needed
doAppendRowFromFileBlock(pReader->pResBlock, pReader, pBlockData, pDumpInfo->rowIndex);
code = doAppendRowFromFileBlock(pReader->pResBlock, pReader, pBlockData, pDumpInfo->rowIndex);
if (code) {
return code;
}
pDumpInfo->rowIndex += step;
return true;
*copied = true;
}
}
return false;
return code;
}
static bool nextRowFromLastBlocks(SLastBlockReader* pLastBlockReader, STableBlockScanInfo* pScanInfo,
@ -1819,20 +1836,34 @@ static bool nextRowFromLastBlocks(SLastBlockReader* pLastBlockReader, STableBloc
}
static bool tryCopyDistinctRowFromSttBlock(TSDBROW* fRow, SLastBlockReader* pLastBlockReader,
STableBlockScanInfo* pScanInfo, int64_t ts, STsdbReader* pReader) {
STableBlockScanInfo* pScanInfo, int64_t ts, STsdbReader* pReader, bool *copied) {
int32_t code = TSDB_CODE_SUCCESS;
*copied = false;
bool hasVal = nextRowFromLastBlocks(pLastBlockReader, pScanInfo, &pReader->verRange);
if (hasVal) {
int64_t next1 = getCurrentKeyInLastBlock(pLastBlockReader);
if (next1 != ts) {
doAppendRowFromFileBlock(pReader->pResBlock, pReader, fRow->pBlockData, fRow->iRow);
return true;
code = doAppendRowFromFileBlock(pReader->pResBlock, pReader, fRow->pBlockData, fRow->iRow);
if (code) {
return code;
}
*copied = true;
return code;
}
} else {
doAppendRowFromFileBlock(pReader->pResBlock, pReader, fRow->pBlockData, fRow->iRow);
return true;
code = doAppendRowFromFileBlock(pReader->pResBlock, pReader, fRow->pBlockData, fRow->iRow);
if (code) {
return code;
}
*copied = true;
return code;
}
return false;
return code;
}
static FORCE_INLINE STSchema* getLatestTableSchema(STsdbReader* pReader, uint64_t uid) {
@ -1858,28 +1889,23 @@ static FORCE_INLINE STSchema* doGetSchemaForTSRow(int32_t sversion, STsdbReader*
return pReader->pSchema;
}
if (pReader->pMemSchema == NULL) {
int32_t code =
metaGetTbTSchemaEx(pReader->pTsdb->pVnode->pMeta, pReader->suid, uid, sversion, &pReader->pMemSchema);
if (code != TSDB_CODE_SUCCESS) {
terrno = code;
return NULL;
} else {
return pReader->pMemSchema;
}
void** p = tSimpleHashGet(pReader->pSchemaMap, &sversion, sizeof(sversion));
if (p != NULL) {
return *(STSchema**)p;
}
if (pReader->pMemSchema->version == sversion) {
return pReader->pMemSchema;
}
taosMemoryFreeClear(pReader->pMemSchema);
int32_t code = metaGetTbTSchemaEx(pReader->pTsdb->pVnode->pMeta, pReader->suid, uid, sversion, &pReader->pMemSchema);
if (code != TSDB_CODE_SUCCESS || pReader->pMemSchema == NULL) {
STSchema* ptr = NULL;
int32_t code = metaGetTbTSchemaEx(pReader->pTsdb->pVnode->pMeta, pReader->suid, uid, sversion, &ptr);
if (code != TSDB_CODE_SUCCESS) {
terrno = code;
return NULL;
} else {
return pReader->pMemSchema;
code = tSimpleHashPut(pReader->pSchemaMap, &sversion, sizeof(sversion), &ptr, POINTER_BYTES);
if (code != TSDB_CODE_SUCCESS) {
terrno = code;
return NULL;
}
return ptr;
}
}
@ -2022,11 +2048,12 @@ static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo*
return code;
}
doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow, pBlockScanInfo);
code = doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow, pBlockScanInfo);
taosMemoryFree(pTSRow);
tsdbRowMergerClear(&merge);
return TSDB_CODE_SUCCESS;
return code;
}
static int32_t doMergeFileBlockAndLastBlock(SLastBlockReader* pLastBlockReader, STsdbReader* pReader,
@ -2034,7 +2061,8 @@ static int32_t doMergeFileBlockAndLastBlock(SLastBlockReader* pLastBlockReader,
bool mergeBlockData) {
SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;
int64_t tsLastBlock = getCurrentKeyInLastBlock(pLastBlockReader);
bool copied = false;
int32_t code = TSDB_CODE_SUCCESS;
SRow* pTSRow = NULL;
SRowMerger merge = {0};
TSDBROW fRow = tMergeTreeGetRow(&pLastBlockReader->mergeTree);
@ -2042,7 +2070,12 @@ static int32_t doMergeFileBlockAndLastBlock(SLastBlockReader* pLastBlockReader,
// only last block exists
if ((!mergeBlockData) || (tsLastBlock != pBlockData->aTSKEY[pDumpInfo->rowIndex])) {
if (tryCopyDistinctRowFromSttBlock(&fRow, pLastBlockReader, pBlockScanInfo, tsLastBlock, pReader)) {
code = tryCopyDistinctRowFromSttBlock(&fRow, pLastBlockReader, pBlockScanInfo, tsLastBlock, pReader, &copied);
if (code) {
return code;
}
if (copied) {
pBlockScanInfo->lastKey = tsLastBlock;
return TSDB_CODE_SUCCESS;
} else {
@ -2060,10 +2093,15 @@ static int32_t doMergeFileBlockAndLastBlock(SLastBlockReader* pLastBlockReader,
return code;
}
doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow, pBlockScanInfo);
code = doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow, pBlockScanInfo);
taosMemoryFree(pTSRow);
tsdbRowMergerClear(&merge);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
}
} else { // not merge block data
int32_t code = tsdbRowMergerInit(&merge, &fRow, pReader->pSchema);
@ -2083,10 +2121,14 @@ static int32_t doMergeFileBlockAndLastBlock(SLastBlockReader* pLastBlockReader,
return code;
}
doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow, pBlockScanInfo);
code = doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow, pBlockScanInfo);
taosMemoryFree(pTSRow);
tsdbRowMergerClear(&merge);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
}
return TSDB_CODE_SUCCESS;
@ -2131,7 +2173,7 @@ static int32_t mergeFileBlockAndLastBlock(STsdbReader* pReader, SLastBlockReader
return code;
}
doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow, pBlockScanInfo);
code = doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow, pBlockScanInfo);
taosMemoryFree(pTSRow);
tsdbRowMergerClear(&merge);
@ -2353,7 +2395,7 @@ static int32_t doMergeMultiLevelRows(STsdbReader* pReader, STableBlockScanInfo*
return code;
}
doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow, pBlockScanInfo);
code = doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow, pBlockScanInfo);
taosMemoryFree(pTSRow);
tsdbRowMergerClear(&merge);
@ -2508,7 +2550,13 @@ bool hasDataInFileBlock(const SBlockData* pBlockData, const SFileBlockDumpInfo*
int32_t mergeRowsInFileBlocks(SBlockData* pBlockData, STableBlockScanInfo* pBlockScanInfo, int64_t key,
STsdbReader* pReader) {
SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;
if (tryCopyDistinctRowFromFileBlock(pReader, pBlockData, key, pDumpInfo)) {
bool copied = false;
int32_t code = tryCopyDistinctRowFromFileBlock(pReader, pBlockData, key, pDumpInfo, &copied);
if (code) {
return code;
}
if (copied) {
pBlockScanInfo->lastKey = key;
return TSDB_CODE_SUCCESS;
} else {
@ -2528,11 +2576,11 @@ int32_t mergeRowsInFileBlocks(SBlockData* pBlockData, STableBlockScanInfo* pBloc
return code;
}
doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow, pBlockScanInfo);
code = doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow, pBlockScanInfo);
taosMemoryFree(pTSRow);
tsdbRowMergerClear(&merge);
return TSDB_CODE_SUCCESS;
return code;
}
}
@ -2649,7 +2697,10 @@ static int32_t buildComposedDataBlock(STsdbReader* pReader) {
if (isCleanFileDataBlock(pReader, pBlockInfo, pBlock, pBlockScanInfo, keyInBuf, pLastBlockReader) &&
pBlock->nRow <= pReader->capacity) {
if (asc || ((!asc) && (!hasDataInLastBlock(pLastBlockReader)))) {
copyBlockDataToSDataBlock(pReader);
code = copyBlockDataToSDataBlock(pReader);
if (code) {
goto _end;
}
// record the last key value
pBlockScanInfo->lastKey = asc ? pBlock->maxKey.ts : pBlock->minKey.ts;
@ -2696,8 +2747,11 @@ static int32_t buildComposedDataBlock(STsdbReader* pReader) {
break;
}
buildComposedDataBlockImpl(pReader, pBlockScanInfo, pBlockData, pLastBlockReader);
code = buildComposedDataBlockImpl(pReader, pBlockScanInfo, pBlockData, pLastBlockReader);
if (code) {
goto _end;
}
// currently loaded file data block is consumed
if ((pBlockData->nRow > 0) && (pDumpInfo->rowIndex >= pBlockData->nRow || pDumpInfo->rowIndex < 0)) {
SDataBlk* pBlock = getCurrentBlock(&pReader->status.blockIter);
@ -2922,6 +2976,7 @@ static int32_t doLoadLastBlockSequentially(STsdbReader* pReader) {
SReaderStatus* pStatus = &pReader->status;
SLastBlockReader* pLastBlockReader = pStatus->fileIter.pLastBlockReader;
STableUidList* pUidList = &pStatus->uidList;
int32_t code = TSDB_CODE_SUCCESS;
if (taosHashGetSize(pStatus->pTableMap) == 0) {
return TSDB_CODE_SUCCESS;
@ -2952,7 +3007,11 @@ static int32_t doLoadLastBlockSequentially(STsdbReader* pReader) {
break;
}
buildComposedDataBlockImpl(pReader, pScanInfo, &pReader->status.fileBlockData, pLastBlockReader);
code = buildComposedDataBlockImpl(pReader, pScanInfo, &pReader->status.fileBlockData, pLastBlockReader);
if (code) {
return code;
}
if (pResBlock->info.rows >= pReader->capacity) {
break;
}
@ -3032,7 +3091,11 @@ static int32_t doBuildDataBlock(STsdbReader* pReader) {
break;
}
buildComposedDataBlockImpl(pReader, pScanInfo, &pReader->status.fileBlockData, pLastBlockReader);
code = buildComposedDataBlockImpl(pReader, pScanInfo, &pReader->status.fileBlockData, pLastBlockReader);
if (code) {
return code;
}
if (pResBlock->info.rows >= pReader->capacity) {
break;
}
@ -3677,8 +3740,9 @@ int32_t doMergeMemIMemRows(TSDBROW* pRow, TSDBROW* piRow, STableBlockScanInfo* p
if (ASCENDING_TRAVERSE(pReader->order)) { // ascending order imem --> mem
STSchema* pSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(pRow), pReader, pBlockScanInfo->uid);
STSchema* piSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(piRow), pReader, pBlockScanInfo->uid);
int32_t code = tsdbRowMergerInit(&merge, piRow, pSchema);
int32_t code = tsdbRowMergerInit2(&merge, pSchema, piRow, piSchema);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
@ -3689,7 +3753,8 @@ int32_t doMergeMemIMemRows(TSDBROW* pRow, TSDBROW* piRow, STableBlockScanInfo* p
return code;
}
tsdbRowMerge(&merge, pRow);
pSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(pRow), pReader, pBlockScanInfo->uid);
tsdbRowMergerAdd(&merge, pRow, pSchema);
code =
doMergeRowsInBuf(&pBlockScanInfo->iter, pBlockScanInfo->uid, k.ts, pBlockScanInfo->delSkyline, &merge, pReader);
if (code != TSDB_CODE_SUCCESS) {
@ -3784,6 +3849,7 @@ int32_t tsdbGetNextRowInMem(STableBlockScanInfo* pBlockScanInfo, STsdbReader* pR
int32_t doAppendRowFromTSRow(SSDataBlock* pBlock, STsdbReader* pReader, SRow* pTSRow, STableBlockScanInfo* pScanInfo) {
int32_t outputRowIndex = pBlock->info.rows;
int64_t uid = pScanInfo->uid;
int32_t code = TSDB_CODE_SUCCESS;
int32_t numOfCols = (int32_t)taosArrayGetSize(pBlock->pDataBlock);
@ -3806,7 +3872,10 @@ int32_t doAppendRowFromTSRow(SSDataBlock* pBlock, STsdbReader* pReader, SRow* pT
SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, pSupInfo->slotId[i]);
tRowGet(pTSRow, pSchema, j, &colVal);
doCopyColVal(pColInfoData, outputRowIndex, i, &colVal, pSupInfo);
code = doCopyColVal(pColInfoData, outputRowIndex, i, &colVal, pSupInfo);
if (code) {
return code;
}
i += 1;
j += 1;
} else if (colId < pSchema->columns[j].colId) {
@ -3836,6 +3905,7 @@ int32_t doAppendRowFromFileBlock(SSDataBlock* pResBlock, STsdbReader* pReader, S
int32_t rowIndex) {
int32_t i = 0, j = 0;
int32_t outputRowIndex = pResBlock->info.rows;
int32_t code = TSDB_CODE_SUCCESS;
SBlockLoadSuppInfo* pSupInfo = &pReader->suppInfo;
if (pReader->suppInfo.colId[i] == PRIMARYKEY_TIMESTAMP_COL_ID) {
@ -3858,7 +3928,10 @@ int32_t doAppendRowFromFileBlock(SSDataBlock* pResBlock, STsdbReader* pReader, S
SColumnInfoData* pCol = TARRAY_GET_ELEM(pResBlock->pDataBlock, pSupInfo->slotId[i]);
if (pData->cid == pSupInfo->colId[i]) {
tColDataGetValue(pData, rowIndex, &cv);
doCopyColVal(pCol, outputRowIndex, i, &cv, pSupInfo);
code = doCopyColVal(pCol, outputRowIndex, i, &cv, pSupInfo);
if (code) {
return code;
}
j += 1;
} else if (pData->cid > pCol->info.colId) {
// the specified column does not exist in file block, fill with null data
@ -3882,6 +3955,7 @@ int32_t doAppendRowFromFileBlock(SSDataBlock* pResBlock, STsdbReader* pReader, S
int32_t buildDataBlockFromBufImpl(STableBlockScanInfo* pBlockScanInfo, int64_t endKey, int32_t capacity,
STsdbReader* pReader) {
SSDataBlock* pBlock = pReader->pResBlock;
int32_t code = TSDB_CODE_SUCCESS;
do {
// SRow* pTSRow = NULL;
@ -3893,13 +3967,20 @@ int32_t buildDataBlockFromBufImpl(STableBlockScanInfo* pBlockScanInfo, int64_t e
}
if (row.type == TSDBROW_ROW_FMT) {
doAppendRowFromTSRow(pBlock, pReader, row.pTSRow, pBlockScanInfo);
code = doAppendRowFromTSRow(pBlock, pReader, row.pTSRow, pBlockScanInfo);
if (freeTSRow) {
taosMemoryFree(row.pTSRow);
}
if (code) {
return code;
}
} else {
doAppendRowFromFileBlock(pBlock, pReader, row.pBlockData, row.iRow);
code = doAppendRowFromFileBlock(pBlock, pReader, row.pBlockData, row.iRow);
if (code) {
break;
}
}
// no data in buffer, return immediately
@ -3912,7 +3993,7 @@ int32_t buildDataBlockFromBufImpl(STableBlockScanInfo* pBlockScanInfo, int64_t e
}
} while (1);
return TSDB_CODE_SUCCESS;
return code;
}
// TODO refactor: with createDataBlockScanInfo
@ -4002,6 +4083,11 @@ static int32_t doOpenReaderImpl(STsdbReader* pReader) {
return code;
}
static void freeSchemaFunc(void* param) {
void* p = *(void**)param;
taosMemoryFree(p);
}
// ====================================== EXPOSED APIs ======================================
int32_t tsdbReaderOpen(SVnode* pVnode, SQueryTableDataCond* pCond, void* pTableList, int32_t numOfTables,
SSDataBlock* pResBlock, STsdbReader** ppReader, const char* idstr) {
@ -4078,6 +4164,14 @@ int32_t tsdbReaderOpen(SVnode* pVnode, SQueryTableDataCond* pCond, void* pTableL
}
}
pReader->pSchemaMap = tSimpleHashInit(8, taosFastHash);
if (pReader->pSchemaMap == NULL) {
tsdbError("failed init schema hash for reader %s", pReader->idStr);
code = TSDB_CODE_OUT_OF_MEMORY;
goto _err;
}
tSimpleHashSetFreeFp(pReader->pSchemaMap, freeSchemaFunc);
if (pReader->pSchema != NULL) {
code = updateBlockSMAInfo(pReader->pSchema, &pReader->suppInfo);
if (code != TSDB_CODE_SUCCESS) {
@ -4120,7 +4214,7 @@ void tsdbReaderClose(STsdbReader* pReader) {
p->status.uidList.tableUidList = NULL;
p->pReadSnap = NULL;
p->pSchema = NULL;
p->pMemSchema = NULL;
p->pSchemaMap = NULL;
p = pReader->innerReader[1];
@ -4128,7 +4222,7 @@ void tsdbReaderClose(STsdbReader* pReader) {
p->status.uidList.tableUidList = NULL;
p->pReadSnap = NULL;
p->pSchema = NULL;
p->pMemSchema = NULL;
p->pSchemaMap = NULL;
tsdbReaderClose(pReader->innerReader[0]);
tsdbReaderClose(pReader->innerReader[1]);
@ -4208,10 +4302,7 @@ void tsdbReaderClose(STsdbReader* pReader) {
taosMemoryFree(pReader->idStr);
taosMemoryFree(pReader->pSchema);
if (pReader->pMemSchema != pReader->pSchema) {
taosMemoryFree(pReader->pMemSchema);
}
tSimpleHashCleanup(pReader->pSchemaMap);
taosMemoryFreeClear(pReader);
}
@ -4371,14 +4462,14 @@ int32_t tsdbReaderResume(STsdbReader* pReader) {
pPrevReader->status.pTableMap = pReader->status.pTableMap;
pPrevReader->status.uidList = pReader->status.uidList;
pPrevReader->pSchema = pReader->pSchema;
pPrevReader->pMemSchema = pReader->pMemSchema;
pPrevReader->pSchemaMap = pReader->pSchemaMap;
pPrevReader->pReadSnap = pReader->pReadSnap;
pNextReader->capacity = 1;
pNextReader->status.pTableMap = pReader->status.pTableMap;
pNextReader->status.uidList = pReader->status.uidList;
pNextReader->pSchema = pReader->pSchema;
pNextReader->pMemSchema = pReader->pMemSchema;
pNextReader->pSchemaMap = pReader->pSchemaMap;
pNextReader->pReadSnap = pReader->pReadSnap;
code = doOpenReaderImpl(pPrevReader);
@ -4399,43 +4490,51 @@ _err:
return code;
}
static bool doTsdbNextDataBlock(STsdbReader* pReader) {
static int32_t doTsdbNextDataBlock(STsdbReader* pReader, bool *hasNext) {
int32_t code = TSDB_CODE_SUCCESS;
// cleanup the data that belongs to the previous data block
SSDataBlock* pBlock = pReader->pResBlock;
blockDataCleanup(pBlock);
*hasNext = false;
SReaderStatus* pStatus = &pReader->status;
if (taosHashGetSize(pStatus->pTableMap) == 0) {
return false;
return code;
}
if (pStatus->loadFromFile) {
int32_t code = buildBlockFromFiles(pReader);
code = buildBlockFromFiles(pReader);
if (code != TSDB_CODE_SUCCESS) {
return false;
return code;
}
if (pBlock->info.rows > 0) {
return true;
} else {
if (pBlock->info.rows <= 0) {
resetTableListIndex(&pReader->status);
buildBlockFromBufferSequentially(pReader);
return pBlock->info.rows > 0;
code = buildBlockFromBufferSequentially(pReader);
}
} else { // no data in files, let's try the buffer
buildBlockFromBufferSequentially(pReader);
return pBlock->info.rows > 0;
code = buildBlockFromBufferSequentially(pReader);
}
*hasNext = pBlock->info.rows > 0;
return code;
}
bool tsdbNextDataBlock(STsdbReader* pReader) {
int32_t tsdbNextDataBlock(STsdbReader* pReader, bool *hasNext) {
int32_t code = TSDB_CODE_SUCCESS;
*hasNext = false;
if (isEmptyQueryTimeWindow(&pReader->window) || pReader->step == EXTERNAL_ROWS_NEXT) {
return false;
return code;
}
SReaderStatus* pStatus = &pReader->status;
int32_t code = tsdbAcquireReader(pReader);
code = tsdbAcquireReader(pReader);
qTrace("tsdb/read: %p, take read mutex, code: %d", pReader, code);
if (pReader->suspended) {
@ -4443,16 +4542,21 @@ bool tsdbNextDataBlock(STsdbReader* pReader) {
}
if (pReader->innerReader[0] != NULL && pReader->step == 0) {
bool ret = doTsdbNextDataBlock(pReader->innerReader[0]);
code = doTsdbNextDataBlock(pReader->innerReader[0], hasNext);
if (code) {
tsdbReleaseReader(pReader);
return code;
}
pReader->step = EXTERNAL_ROWS_PREV;
if (ret) {
if (*hasNext) {
pStatus = &pReader->innerReader[0]->status;
if (pStatus->composedDataBlock) {
qTrace("tsdb/read: %p, unlock read mutex", pReader);
tsdbReleaseReader(pReader);
}
return ret;
return code;
}
}
@ -4469,14 +4573,19 @@ bool tsdbNextDataBlock(STsdbReader* pReader) {
pReader->step = EXTERNAL_ROWS_MAIN;
}
bool ret = doTsdbNextDataBlock(pReader);
if (ret) {
code = doTsdbNextDataBlock(pReader, hasNext);
if (code != TSDB_CODE_SUCCESS) {
tsdbReleaseReader(pReader);
return code;
}
if (*hasNext) {
if (pStatus->composedDataBlock) {
qTrace("tsdb/read: %p, unlock read mutex", pReader);
tsdbReleaseReader(pReader);
}
return ret;
return code;
}
if (pReader->step == EXTERNAL_ROWS_MAIN && pReader->innerReader[1] != NULL) {
@ -4488,23 +4597,28 @@ bool tsdbNextDataBlock(STsdbReader* pReader) {
return code;
}
ret = doTsdbNextDataBlock(pReader->innerReader[1]);
code = doTsdbNextDataBlock(pReader->innerReader[1], hasNext);
if (code != TSDB_CODE_SUCCESS) {
tsdbReleaseReader(pReader);
return code;
}
pReader->step = EXTERNAL_ROWS_NEXT;
if (ret) {
if (*hasNext) {
pStatus = &pReader->innerReader[1]->status;
if (pStatus->composedDataBlock) {
qTrace("tsdb/read: %p, unlock read mutex", pReader);
tsdbReleaseReader(pReader);
}
return ret;
return code;
}
}
qTrace("tsdb/read: %p, unlock read mutex", pReader);
tsdbReleaseReader(pReader);
return false;
return code;
}
static void doFillNullColSMA(SBlockLoadSuppInfo* pSup, int32_t numOfRows, int32_t numOfCols, SColumnDataAgg* pTsAgg) {
@ -4649,20 +4763,27 @@ STableBlockScanInfo* getTableBlockScanInfo(SHashObj* pTableMap, uint64_t uid, co
static SSDataBlock* doRetrieveDataBlock(STsdbReader* pReader) {
SReaderStatus* pStatus = &pReader->status;
int32_t code = TSDB_CODE_SUCCESS;
SFileDataBlockInfo* pBlockInfo = getCurrentBlockInfo(&pStatus->blockIter);
STableBlockScanInfo* pBlockScanInfo = getTableBlockScanInfo(pStatus->pTableMap, pBlockInfo->uid, pReader->idStr);
if (pBlockScanInfo == NULL) {
return NULL;
}
int32_t code = doLoadFileBlockData(pReader, &pStatus->blockIter, &pStatus->fileBlockData, pBlockScanInfo->uid);
code = doLoadFileBlockData(pReader, &pStatus->blockIter, &pStatus->fileBlockData, pBlockScanInfo->uid);
if (code != TSDB_CODE_SUCCESS) {
tBlockDataDestroy(&pStatus->fileBlockData);
terrno = code;
return NULL;
}
code = copyBlockDataToSDataBlock(pReader);
if (code != TSDB_CODE_SUCCESS) {
tBlockDataDestroy(&pStatus->fileBlockData);
terrno = code;
return NULL;
}
copyBlockDataToSDataBlock(pReader);
return pReader->pResBlock;
}
@ -4703,7 +4824,7 @@ int32_t tsdbReaderReset(STsdbReader* pReader, SQueryTableDataCond* pCond) {
return TSDB_CODE_SUCCESS;
}
SReaderStatus* pStatus = &pReader->status;
SReaderStatus* pStatus = &pReader->status;
SDataBlockIter* pBlockIter = &pStatus->blockIter;
pReader->order = pCond->order;
@ -4724,9 +4845,9 @@ int32_t tsdbReaderReset(STsdbReader* pReader, SQueryTableDataCond* pCond) {
resetDataBlockIterator(pBlockIter, pReader->order);
resetTableListIndex(&pReader->status);
bool asc = ASCENDING_TRAVERSE(pReader->order);
int32_t step = asc? 1:-1;
int64_t ts = asc? pReader->window.skey - 1 : pReader->window.ekey + 1;
bool asc = ASCENDING_TRAVERSE(pReader->order);
int32_t step = asc ? 1 : -1;
int64_t ts = asc ? pReader->window.skey - 1 : pReader->window.ekey + 1;
resetAllDataBlockScanInfo(pStatus->pTableMap, ts, step);
int32_t code = 0;

View File

@ -378,7 +378,7 @@ static int32_t vnodeSyncEqCtrlMsg(const SMsgCb *msgcb, SRpcMsg *pMsg) {
return -1;
}
int32_t code = tmsgPutToQueue(msgcb, SYNC_CTRL_QUEUE, pMsg);
int32_t code = tmsgPutToQueue(msgcb, SYNC_RD_QUEUE, pMsg);
if (code != 0) {
rpcFreeCont(pMsg->pCont);
pMsg->pCont = NULL;

View File

@ -584,10 +584,16 @@ int32_t addTagPseudoColumnData(SReadHandle* pHandle, const SExprInfo* pExpr, int
if (isNullVal) {
colDataSetNNULL(pColInfoData, 0, pBlock->info.rows);
} else if (pColInfoData->info.type != TSDB_DATA_TYPE_JSON) {
colDataSetNItems(pColInfoData, 0, data, pBlock->info.rows);
code = colDataSetNItems(pColInfoData, 0, data, pBlock->info.rows, false);
if (IS_VAR_DATA_TYPE(((const STagVal*)p)->type)) {
taosMemoryFree(data);
}
if (code) {
if (freeReader) {
metaReaderClear(&mr);
}
return code;
}
} else { // todo opt for json tag
for (int32_t i = 0; i < pBlock->info.rows; ++i) {
colDataSetVal(pColInfoData, i, data, false);
@ -634,10 +640,22 @@ static SSDataBlock* doTableScanImpl(SOperatorInfo* pOperator) {
STableScanInfo* pTableScanInfo = pOperator->info;
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
SSDataBlock* pBlock = pTableScanInfo->pResBlock;
bool hasNext = false;
int32_t code = TSDB_CODE_SUCCESS;
int64_t st = taosGetTimestampUs();
while (tsdbNextDataBlock(pTableScanInfo->base.dataReader)) {
while (true) {
code = tsdbNextDataBlock(pTableScanInfo->base.dataReader, &hasNext);
if (code) {
tsdbReleaseDataBlock(pTableScanInfo->base.dataReader);
T_LONG_JMP(pTaskInfo->env, code);
}
if (!hasNext) {
break;
}
if (isTaskKilled(pTaskInfo)) {
tsdbReleaseDataBlock(pTableScanInfo->base.dataReader);
T_LONG_JMP(pTaskInfo->env, pTaskInfo->code);
@ -1016,7 +1034,15 @@ static SSDataBlock* readPreVersionData(SOperatorInfo* pTableScanOp, uint64_t tbU
return NULL;
}
if (tsdbNextDataBlock(pReader)) {
bool hasNext = false;
code = tsdbNextDataBlock(pReader, &hasNext);
if (code != TSDB_CODE_SUCCESS) {
terrno = code;
T_LONG_JMP(pTaskInfo->env, code);
return NULL;
}
if (hasNext) {
/*SSDataBlock* p = */ tsdbRetrieveDataBlock(pReader, NULL);
doSetTagColumnData(&pTableScanInfo->base, pBlock, pTaskInfo, pBlock->info.rows);
pBlock->info.id.groupId = getTableGroupId(pTaskInfo->pTableInfoList, pBlock->info.id.uid);
@ -2076,12 +2102,22 @@ static SSDataBlock* doRawScan(SOperatorInfo* pOperator) {
// NOTE: this operator does never check if current status is done or not
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
SStreamRawScanInfo* pInfo = pOperator->info;
int32_t code = TSDB_CODE_SUCCESS;
pTaskInfo->streamInfo.metaRsp.metaRspLen = 0; // use metaRspLen !=0 to judge if data is meta
pTaskInfo->streamInfo.metaRsp.metaRsp = NULL;
qDebug("tmqsnap doRawScan called");
if (pTaskInfo->streamInfo.currentOffset.type == TMQ_OFFSET__SNAPSHOT_DATA) {
if (pInfo->dataReader && tsdbNextDataBlock(pInfo->dataReader)) {
bool hasNext = false;
if (pInfo->dataReader) {
code = tsdbNextDataBlock(pInfo->dataReader, &hasNext);
if (code) {
tsdbReleaseDataBlock(pInfo->dataReader);
longjmp(pTaskInfo->env, code);
}
}
if (pInfo->dataReader && hasNext) {
if (isTaskKilled(pTaskInfo)) {
tsdbReleaseDataBlock(pInfo->dataReader);
longjmp(pTaskInfo->env, pTaskInfo->code);
@ -2572,8 +2608,21 @@ static SSDataBlock* getTableDataBlockImpl(void* param) {
pInfo->base.dataReader = source->dataReader;
STsdbReader* reader = pInfo->base.dataReader;
bool hasNext = false;
qTrace("tsdb/read-table-data: %p, enter next reader", reader);
while (tsdbNextDataBlock(reader)) {
while (true) {
code = tsdbNextDataBlock(reader, &hasNext);
if (code != 0) {
tsdbReleaseDataBlock(reader);
pInfo->base.dataReader = NULL;
T_LONG_JMP(pTaskInfo->env, code);
}
if (!hasNext) {
break;
}
if (isTaskKilled(pTaskInfo)) {
tsdbReleaseDataBlock(reader);
pInfo->base.dataReader = NULL;

View File

@ -1627,7 +1627,7 @@ static void sysTableScanFillTbName(SOperatorInfo* pOperator, const SSysTableScan
char varTbName[TSDB_TABLE_FNAME_LEN - 1 + VARSTR_HEADER_SIZE] = {0};
STR_TO_VARSTR(varTbName, name);
colDataSetNItems(pColumnInfoData, 0, varTbName, pBlock->info.rows);
colDataSetNItems(pColumnInfoData, 0, varTbName, pBlock->info.rows, true);
}
doFilter(pBlock, pOperator->exprSupp.pFilterInfo, NULL);

View File

@ -2500,7 +2500,8 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
.sprocessFunc = firstLastScalarFunction,
.pPartialFunc = "_last_row_partial",
.pMergeFunc = "_last_row_merge",
.finalizeFunc = firstLastFinalize
.finalizeFunc = firstLastFinalize,
.combineFunc = lastCombine
},
{
.name = "_cache_last_row",
@ -2809,7 +2810,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
{
.name = "mode",
.type = FUNCTION_TYPE_MODE,
.classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_SELECT_FUNC,
.classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_SELECT_FUNC | FUNC_MGT_FORBID_STREAM_FUNC,
.translateFunc = translateMode,
.getEnvFunc = getModeFuncEnv,
.initFunc = modeFunctionSetup,
@ -3212,7 +3213,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
{
.name = "_block_dist",
.type = FUNCTION_TYPE_BLOCK_DIST,
.classification = FUNC_MGT_AGG_FUNC,
.classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_FORBID_STREAM_FUNC,
.translateFunc = translateBlockDistFunc,
.getEnvFunc = getBlockDistFuncEnv,
.initFunc = blockDistSetup,

View File

@ -631,10 +631,10 @@ int rawBlockBindData(SQuery* query, STableMeta* pTableMeta, void* data, SVCreate
ret = TSDB_CODE_INVALID_PARA;
goto end;
}
for (int c = 0; c < boundInfo->numOfBound; ++c) {
SSchema* pColSchema = &pSchema[c];
SColData* pCol = taosArrayGet(pTableCxt->pData->aCol, c);
if(tFields == NULL || findFileds(pColSchema, tFields, numFields)){
if(tFields == NULL){
for (int j = 0; j < boundInfo->numOfBound; j++){
SSchema* pColSchema = &pSchema[j];
SColData* pCol = taosArrayGet(pTableCxt->pData->aCol, j);
if (*fields != pColSchema->type && *(int32_t*)(fields + sizeof(int8_t)) != pColSchema->bytes) {
uError("type or bytes not equal");
ret = TSDB_CODE_INVALID_PARA;
@ -652,12 +652,52 @@ int rawBlockBindData(SQuery* query, STableMeta* pTableMeta, void* data, SVCreate
tColDataAddValueByDataBlock(pCol, pColSchema->type, pColSchema->bytes, numOfRows, offset, pData);
fields += sizeof(int8_t) + sizeof(int32_t);
if (needChangeLength) {
pStart += htonl(colLength[c]);
pStart += htonl(colLength[j]);
} else {
pStart += colLength[c];
pStart += colLength[j];
}
}
}else{
for (int i = 0; i < numFields; i++) {
for (int j = 0; j < boundInfo->numOfBound; j++){
SSchema* pColSchema = &pSchema[j];
if(strcmp(pColSchema->name, tFields[i].name) == 0){
if (*fields != pColSchema->type && *(int32_t*)(fields + sizeof(int8_t)) != pColSchema->bytes) {
uError("type or bytes not equal");
ret = TSDB_CODE_INVALID_PARA;
goto end;
}
int8_t* offset = pStart;
if (IS_VAR_DATA_TYPE(pColSchema->type)) {
pStart += numOfRows * sizeof(int32_t);
} else {
pStart += BitmapLen(numOfRows);
}
char* pData = pStart;
SColData* pCol = taosArrayGet(pTableCxt->pData->aCol, j);
tColDataAddValueByDataBlock(pCol, pColSchema->type, pColSchema->bytes, numOfRows, offset, pData);
fields += sizeof(int8_t) + sizeof(int32_t);
if (needChangeLength) {
pStart += htonl(colLength[i]);
} else {
pStart += colLength[i];
}
boundInfo->pColIndex[j] = -1;
break;
}
}
}
for (int c = 0; c < boundInfo->numOfBound; ++c) {
if( boundInfo->pColIndex[c] != -1){
SColData* pCol = taosArrayGet(pTableCxt->pData->aCol, c);
tColDataAddValueByDataBlock(pCol, 0, 0, numOfRows, NULL, NULL);
}else{
boundInfo->pColIndex[c] = c; // restore for next block
}
}else{
tColDataAddValueByDataBlock(pCol, pColSchema->type, pColSchema->bytes, numOfRows, NULL, NULL);
}
}

View File

@ -1755,7 +1755,11 @@ int32_t winEndTsFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *p
int32_t qTbnameFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput) {
char* p = colDataGetVarData(pInput->columnData, 0);
colDataSetNItems(pOutput->columnData, pOutput->numOfRows, p, pInput->numOfRows);
int32_t code = colDataSetNItems(pOutput->columnData, pOutput->numOfRows, p, pInput->numOfRows, true);
if (code) {
return code;
}
pOutput->numOfRows += pInput->numOfRows;
return TSDB_CODE_SUCCESS;
}

View File

@ -275,7 +275,7 @@ int32_t streamProcessRetrieveReq(SStreamTask* pTask, SStreamRetrieveReq* pReq, S
return 0;
}
int32_t streamProcessRetrieveRsp(SStreamTask* pTask, SStreamRetrieveRsp* pRsp) {
//
return 0;
}
// int32_t streamProcessRetrieveRsp(SStreamTask* pTask, SStreamRetrieveRsp* pRsp) {
// //
// return 0;
// }

View File

@ -105,6 +105,7 @@ int32_t syncNodeOnAppendEntries(SSyncNode* ths, const SRpcMsg* pRpcMsg) {
SRpcMsg rpcRsp = {0};
bool accepted = false;
SSyncRaftEntry* pEntry = NULL;
bool resetElect = false;
// if already drop replica, do not process
if (!syncNodeInRaftGroup(ths, &(pMsg->srcId))) {
@ -137,7 +138,7 @@ int32_t syncNodeOnAppendEntries(SSyncNode* ths, const SRpcMsg* pRpcMsg) {
}
syncNodeStepDown(ths, pMsg->term);
syncNodeResetElectTimer(ths);
resetElect = true;
if (pMsg->dataLen < sizeof(SSyncRaftEntry)) {
sError("vgId:%d, incomplete append entries received. prev index:%" PRId64 ", term:%" PRId64 ", datalen:%d",
@ -184,10 +185,9 @@ _SEND_RESPONSE:
// commit index, i.e. leader notice me
if (syncLogBufferCommit(ths->pLogBuf, ths, ths->commitIndex) < 0) {
sError("vgId:%d, failed to commit raft fsm log since %s.", ths->vgId, terrstr());
goto _out;
}
_out:
if (resetElect) syncNodeResetElectTimer(ths);
return 0;
_IGNORE:

View File

@ -115,6 +115,5 @@ int32_t syncNodeElect(SSyncNode* pSyncNode) {
ASSERT(ret == 0);
syncNodeResetElectTimer(pSyncNode);
return ret;
}

View File

@ -182,6 +182,9 @@ int32_t syncProcessMsg(int64_t rid, SRpcMsg* pMsg) {
case TDMT_SYNC_TIMEOUT:
code = syncNodeOnTimeout(pSyncNode, pMsg);
break;
case TDMT_SYNC_TIMEOUT_ELECTION:
code = syncNodeOnTimeout(pSyncNode, pMsg);
break;
case TDMT_SYNC_CLIENT_REQUEST:
code = syncNodeOnClientRequest(pSyncNode, pMsg, NULL);
break;
@ -1593,8 +1596,8 @@ void syncNodeBecomeFollower(SSyncNode* pSyncNode, const char* debugStr) {
pSyncNode->state = TAOS_SYNC_STATE_FOLLOWER;
syncNodeStopHeartbeatTimer(pSyncNode);
// reset elect timer
syncNodeResetElectTimer(pSyncNode);
// trace log
sNTrace(pSyncNode, "become follower %s", debugStr);
// send rsp to client
syncNodeLeaderChangeRsp(pSyncNode);
@ -1610,8 +1613,8 @@ void syncNodeBecomeFollower(SSyncNode* pSyncNode, const char* debugStr) {
// reset log buffer
syncLogBufferReset(pSyncNode->pLogBuf, pSyncNode);
// trace log
sNTrace(pSyncNode, "become follower %s", debugStr);
// reset elect timer
syncNodeResetElectTimer(pSyncNode);
}
// TLA+ Spec
@ -2277,6 +2280,7 @@ static int32_t syncNodeAppendNoopOld(SSyncNode* ths) {
int32_t syncNodeOnHeartbeat(SSyncNode* ths, const SRpcMsg* pRpcMsg) {
SyncHeartbeat* pMsg = pRpcMsg->pCont;
bool resetElect = false;
const STraceId* trace = &pRpcMsg->info.traceId;
char tbuf[40] = {0};
@ -2300,12 +2304,11 @@ int32_t syncNodeOnHeartbeat(SSyncNode* ths, const SRpcMsg* pRpcMsg) {
if (pMsg->term == currentTerm && ths->state != TAOS_SYNC_STATE_LEADER) {
syncIndexMgrSetRecvTime(ths->pNextIndex, &(pMsg->srcId), tsMs);
resetElect = true;
syncNodeResetElectTimer(ths);
ths->minMatchIndex = pMsg->minMatchIndex;
if (ths->state == TAOS_SYNC_STATE_FOLLOWER) {
// syncNodeFollowerCommit(ths, pMsg->commitIndex);
SRpcMsg rpcMsgLocalCmd = {0};
(void)syncBuildLocalCmd(&rpcMsgLocalCmd, ths->vgId);
@ -2328,7 +2331,6 @@ int32_t syncNodeOnHeartbeat(SSyncNode* ths, const SRpcMsg* pRpcMsg) {
}
if (pMsg->term >= currentTerm && ths->state != TAOS_SYNC_STATE_FOLLOWER) {
// syncNodeStepDown(ths, pMsg->term);
SRpcMsg rpcMsgLocalCmd = {0};
(void)syncBuildLocalCmd(&rpcMsgLocalCmd, ths->vgId);
@ -2348,15 +2350,10 @@ int32_t syncNodeOnHeartbeat(SSyncNode* ths, const SRpcMsg* pRpcMsg) {
}
}
/*
// htonl
SMsgHead* pHead = rpcMsg.pCont;
pHead->contLen = htonl(pHead->contLen);
pHead->vgId = htonl(pHead->vgId);
*/
// reply
syncNodeSendMsgById(&pMsgReply->destId, ths, &rpcMsg);
if (resetElect) syncNodeResetElectTimer(ths);
return 0;
}

View File

@ -22,7 +22,7 @@ int32_t syncBuildTimeout(SRpcMsg* pMsg, ESyncTimeoutType timeoutType, uint64_t l
SSyncNode* pNode) {
int32_t bytes = sizeof(SyncTimeout);
pMsg->pCont = rpcMallocCont(bytes);
pMsg->msgType = TDMT_SYNC_TIMEOUT;
pMsg->msgType = (timeoutType == SYNC_TIMEOUT_ELECTION) ? TDMT_SYNC_TIMEOUT_ELECTION : TDMT_SYNC_TIMEOUT;
pMsg->contLen = bytes;
if (pMsg->pCont == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
@ -31,7 +31,7 @@ int32_t syncBuildTimeout(SRpcMsg* pMsg, ESyncTimeoutType timeoutType, uint64_t l
SyncTimeout* pTimeout = pMsg->pCont;
pTimeout->bytes = bytes;
pTimeout->msgType = TDMT_SYNC_TIMEOUT;
pTimeout->msgType = pMsg->msgType;
pTimeout->vgId = pNode->vgId;
pTimeout->timeoutType = timeoutType;
pTimeout->logicClock = logicClock;

View File

@ -89,6 +89,7 @@ static bool syncNodeOnRequestVoteLogOK(SSyncNode* ths, SyncRequestVote* pMsg) {
int32_t syncNodeOnRequestVote(SSyncNode* ths, const SRpcMsg* pRpcMsg) {
int32_t ret = 0;
SyncRequestVote* pMsg = pRpcMsg->pCont;
bool resetElect = false;
// if already drop replica, do not process
if (!syncNodeInRaftGroup(ths, &pMsg->srcId)) {
@ -115,7 +116,7 @@ int32_t syncNodeOnRequestVote(SSyncNode* ths, const SRpcMsg* pRpcMsg) {
syncNodeStepDown(ths, currentTerm);
// forbid elect for this round
syncNodeResetElectTimer(ths);
resetElect = true;
}
// send msg
@ -134,5 +135,7 @@ int32_t syncNodeOnRequestVote(SSyncNode* ths, const SRpcMsg* pRpcMsg) {
syncLogRecvRequestVote(ths, pMsg, pReply->voteGranted, "");
syncLogSendRequestVoteReply(ths, pReply, "");
syncNodeSendMsgById(&pReply->destId, ths, &rpcMsg);
if (resetElect) syncNodeResetElectTimer(ths);
return 0;
}

View File

@ -798,7 +798,6 @@ int32_t syncNodeOnSnapshot(SSyncNode *pSyncNode, const SRpcMsg *pRpcMsg) {
if (pMsg->term > raftStoreGetTerm(pSyncNode)) {
syncNodeStepDown(pSyncNode, pMsg->term);
}
syncNodeResetElectTimer(pSyncNode);
// state, term, seq/ack
int32_t code = 0;
@ -840,6 +839,7 @@ int32_t syncNodeOnSnapshot(SSyncNode *pSyncNode, const SRpcMsg *pRpcMsg) {
code = -1;
}
syncNodeResetElectTimer(pSyncNode);
return code;
}

View File

@ -120,9 +120,6 @@ int32_t syncNodeOnTimeout(SSyncNode* ths, const SRpcMsg* pRpc) {
if (atomic_load_64(&ths->pingTimerLogicClockUser) <= pMsg->logicClock) {
++(ths->pingTimerCounter);
// syncNodePingAll(ths);
// syncNodePingPeers(ths);
syncNodeTimerRoutine(ths);
}
@ -138,8 +135,6 @@ int32_t syncNodeOnTimeout(SSyncNode* ths, const SRpcMsg* pRpc) {
++(ths->heartbeatTimerCounter);
sTrace("vgId:%d, sync timer, type:replicate count:%" PRIu64 ", lc-user:%" PRIu64, ths->vgId,
ths->heartbeatTimerCounter, ths->heartbeatTimerLogicClockUser);
// syncNodeReplicate(ths, true);
}
} else {

View File

@ -28,19 +28,23 @@
#define HASH_INDEX(v, c) ((v) & ((c)-1))
#define FREE_HASH_NODE(_n) \
do { \
taosMemoryFreeClear(_n); \
#define FREE_HASH_NODE(_n, fp) \
do { \
if (fp) { \
fp((_n)->data); \
} \
taosMemoryFreeClear(_n); \
} while (0);
struct SSHashObj {
SHNode **hashList;
size_t capacity; // number of slots
int64_t size; // number of elements in hash table
_hash_fn_t hashFp; // hash function
_equal_fn_t equalFp; // equal function
SArray* pHashNodeBuf;// hash node allocation buffer, 1k size of each page by default
int32_t offset; // allocation offset in current page
SHNode **hashList;
size_t capacity; // number of slots
int64_t size; // number of elements in hash table
_hash_fn_t hashFp; // hash function
_equal_fn_t equalFp; // equal function
_hash_free_fn_t freeFp; // free function
SArray *pHashNodeBuf; // hash node allocation buffer, 1k size of each page by default
int32_t offset; // allocation offset in current page
};
static FORCE_INLINE int32_t taosHashCapacity(int32_t length) {
@ -71,7 +75,7 @@ SSHashObj *tSimpleHashInit(size_t capacity, _hash_fn_t fn) {
pHashObj->capacity = taosHashCapacity((int32_t)capacity);
pHashObj->equalFp = memcmp;
pHashObj->pHashNodeBuf = taosArrayInit(10, sizeof(void*));
pHashObj->freeFp = NULL;
pHashObj->offset = 0;
pHashObj->size = 0;
@ -92,6 +96,10 @@ int32_t tSimpleHashGetSize(const SSHashObj *pHashObj) {
return (int32_t) pHashObj->size;
}
void tSimpleHashSetFreeFp(SSHashObj* pHashObj, _hash_free_fn_t freeFp) {
pHashObj->freeFp = freeFp;
}
static void* doInternalAlloc(SSHashObj* pHashObj, int32_t size) {
#if 0
void** p = taosArrayGetLast(pHashObj->pHashNodeBuf);
@ -306,7 +314,8 @@ int32_t tSimpleHashRemove(SSHashObj *pHashObj, const void *key, size_t keyLen) {
} else {
pPrev->next = pNode->next;
}
FREE_HASH_NODE(pNode);
FREE_HASH_NODE(pNode, pHashObj->freeFp);
pHashObj->size -= 1;
code = TSDB_CODE_SUCCESS;
break;
@ -341,7 +350,7 @@ int32_t tSimpleHashIterateRemove(SSHashObj *pHashObj, const void *key, size_t ke
*pIter = pPrev ? GET_SHASH_NODE_DATA(pPrev) : NULL;
}
FREE_HASH_NODE(pNode);
FREE_HASH_NODE(pNode, pHashObj->freeFp);
pHashObj->size -= 1;
break;
}
@ -370,14 +379,13 @@ void tSimpleHashClear(SSHashObj *pHashObj) {
while (pNode) {
pNext = pNode->next;
FREE_HASH_NODE(pNode);
FREE_HASH_NODE(pNode, pHashObj->freeFp);
pNode = pNext;
}
pHashObj->hashList[i] = NULL;
}
taosArrayClearEx(pHashObj->pHashNodeBuf, destroyItems);
pHashObj->offset = 0;
pHashObj->size = 0;
}
@ -388,7 +396,6 @@ void tSimpleHashCleanup(SSHashObj *pHashObj) {
}
tSimpleHashClear(pHashObj);
taosArrayDestroy(pHashObj->pHashNodeBuf);
taosMemoryFreeClear(pHashObj->hashList);
taosMemoryFree(pHashObj);
}

View File

@ -26,8 +26,9 @@ int32_t taosVersionStrToInt(const char *vstr, int32_t *vint) {
int32_t vnum[4] = {0};
int32_t len = strlen(vstr);
char tmp[16] = {0};
int32_t vpos = 0;
for (int32_t spos = 0, tpos = 0, vpos = 0; spos < len && vpos < 4; ++spos) {
for (int32_t spos = 0, tpos = 0; spos < len && vpos < 4; ++spos) {
if (vstr[spos] != '.') {
tmp[spos - tpos] = vstr[spos];
} else {
@ -38,6 +39,10 @@ int32_t taosVersionStrToInt(const char *vstr, int32_t *vint) {
}
}
if ('\0' != tmp[0] && vpos < 4) {
vnum[vpos] = atoi(tmp);
}
if (vnum[0] <= 0) {
terrno = TSDB_CODE_INVALID_VERSION_STRING;
return -1;
@ -66,16 +71,16 @@ int32_t taosCheckVersionCompatible(int32_t clientVer, int32_t serverVer, int32_t
case 4:
break;
case 3:
clientVer %= 100;
serverVer %= 100;
clientVer /= 100;
serverVer /= 100;
break;
case 2:
clientVer %= 10000;
serverVer %= 10000;
clientVer /= 10000;
serverVer /= 10000;
break;
case 1:
clientVer %= 1000000;
serverVer %= 1000000;
clientVer /= 1000000;
serverVer /= 1000000;
break;
default:
terrno = TSDB_CODE_INVALID_VERSION_NUMBER;

View File

@ -16,7 +16,7 @@
"num_of_records_per_req": 10,
"databases": [{
"dbinfo": {
"name": "db",
"name": "opentsdb_telnet",
"drop": "yes"
},
"super_tables": [{

View File

@ -14,6 +14,8 @@
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/nestedQuery_math.py -Q 2
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/nestedQuery_time.py -Q 2
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/nestedQuery.py -Q 2
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/columnLenUpdated.py -Q 2
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/columnLenUpdated.py -Q 3
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/nestedQuery.py -Q 3
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/nestedQuery_str.py -Q 3
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/nestedQuery_math.py -Q 3
@ -22,6 +24,7 @@
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/nestedQuery_str.py -Q 4
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/nestedQuery_math.py -Q 4
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/nestedQuery_time.py -Q 4
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/columnLenUpdated.py -Q 4
,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/tmqShow.py
,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/tmqDropStb.py
,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/subscribeStb0.py
@ -1103,9 +1106,9 @@
,,n,develop-test,python3 ./test.py -f 5-taos-tools/taosbenchmark/json_tag.py
,,n,develop-test,python3 ./test.py -f 5-taos-tools/taosbenchmark/query_json.py
,,n,develop-test,python3 ./test.py -f 5-taos-tools/taosbenchmark/sample_csv_json.py
#,,n,develop-test,python3 ./test.py -f 5-taos-tools/taosbenchmark/sml_json_alltypes.py
,,n,develop-test,python3 ./test.py -f 5-taos-tools/taosbenchmark/sml_json_alltypes.py
,,n,develop-test,python3 ./test.py -f 5-taos-tools/taosbenchmark/taosdemoTestQueryWithJson.py -R
#,,n,develop-test,python3 ./test.py -f 5-taos-tools/taosbenchmark/telnet_tcp.py -R
,,n,develop-test,python3 ./test.py -f 5-taos-tools/taosbenchmark/telnet_tcp.py -R
#docs-examples test
,,n,docs-examples-test,bash python.sh

View File

@ -284,7 +284,7 @@ sql create table ccc using st tags(3,2,2);
sql create table ddd using st tags(4,2,2);
sql create stream streams1 ignore expired 0 fill_history 0 watermark 3s into streamst subtable(c) as select _wstart, c , count(*) c1 from st partition by c interval(1s) ;
sql create stream streams1 ignore expired 0 fill_history 0 watermark 3s into streamst subtable(c) as select _wstart, c , count(*) c1, last_row(b) c2 from st partition by c interval(1s) ;
sql insert into aaa values(1648791221001,2,2,"/a1/aa/aa");
sql insert into bbb values(1648791221001,2,2,"/a1/aa/aa");

View File

@ -3,7 +3,6 @@ system sh/deploy.sh -n dnode1 -i 1
system sh/deploy.sh -n dnode2 -i 2
system sh/exec.sh -n dnode1 -s start
#==system sh/exec.sh -n dnode1 -s start -v
sleep 50
sql connect
@ -11,7 +10,6 @@ sql connect
sql create dnode $hostname2 port 7200
system sh/exec.sh -n dnode2 -s start
#==system sh/exec.sh -n dnode2 -s start -v
print ===== step1
$x = 0
@ -37,14 +35,14 @@ endi
print ===== step2
sql create database test vgroups 4;
sql create database test vgroups 10;
sql use test;
sql create stable st(ts timestamp, a int, b int , c int, d double) tags(ta int,tb int,tc int);
sql create table ts1 using st tags(1,1,1);
sql create table ts2 using st tags(2,2,2);
sql create table ts3 using st tags(3,2,2);
sql create table ts4 using st tags(4,2,2);
sql create stream stream_t1 trigger at_once IGNORE EXPIRED 0 into streamtST1 as select _wstart, count(*) c1, sum(a) c3 , max(b) c4, min(c) c5 from st interval(10s);
sql create stream stream_t1 trigger at_once IGNORE EXPIRED 0 delete_mark 10s into streamtST1 as select _wstart, count(*) c1, sum(a) c3 , max(b) c4, min(c) c5 from st interval(10s);
sleep 1000
@ -235,6 +233,29 @@ endi
print loop3 over
sql insert into ts1 values(1648791200001,1,12,3,1.0);
sql insert into ts2 values(1648791200001,1,12,3,1.0);
sql insert into ts3 values(1648791200001,1,12,3,1.0);
sql insert into ts4 values(1648791200001,1,12,3,1.0);
$loop_count = 0
loop31:
sleep 1000
$loop_count = $loop_count + 1
if $loop_count == 10 then
return -1
endi
sql select * from streamtST1;
if $rows <= 4 then
print =====rows=$rows
goto loop31
endi
print loop31 over
sql drop stream if exists streams1;
sql drop database if exists test1;
@ -243,7 +264,7 @@ sql use test1;
sql create stable st(ts timestamp,a int,b int,c int) tags(ta int,tb int,tc int);
sql create table t1 using st tags(1,1,1);
sql create table t2 using st tags(2,2,2);
sql create stream streams1 trigger at_once IGNORE EXPIRED 0 into streamt1 as select _wstart as c0, count(*) c1, count(a) c2 from st interval(10s) ;
sql create stream streams1 trigger at_once IGNORE EXPIRED 0 delete_mark 20s into streamt1 as select _wstart as c0, count(*) c1, count(a) c2 from st interval(10s) ;
sql insert into t1 values(1648791211000,1,2,3);

View File

@ -0,0 +1,181 @@
import taos
import sys
import time
import socket
import os
import platform
if platform.system().lower() == 'windows':
import wexpect as taosExpect
else:
import pexpect as taosExpect
from util.log import *
from util.sql import *
from util.cases import *
from util.dnodes import *
def taos_command (buildPath, key, value, expectString, sqlString=''):
if len(key) == 0:
tdLog.exit("taos test key is null!")
if platform.system().lower() == 'windows':
taosCmd = buildPath + '\\build\\bin\\taos.exe '
taosCmd = taosCmd.replace('\\','\\\\')
else:
taosCmd = buildPath + '/build/bin/taos '
cfgPath = buildPath + "/../sim/psim/cfg"
taosCmd = taosCmd + ' -c' + cfgPath + ' -' + key
if len(value) != 0:
taosCmd = taosCmd + ' ' + value
tdLog.info ("taos cmd: %s" % taosCmd)
child = taosExpect.spawn(taosCmd, timeout=20)
#output = child.readline()
#print (output.decode())
if len(expectString) != 0:
i = child.expect([expectString, taosExpect.TIMEOUT, taosExpect.EOF], timeout=20)
else:
i = child.expect([taosExpect.TIMEOUT, taosExpect.EOF], timeout=20)
if platform.system().lower() == 'windows':
retResult = child.before
else:
retResult = child.before.decode()
print(retResult)
#print(child.after.decode())
if i == 0:
print ('taos login success! Here can run sql, taos> ')
return "TAOS_OK"
else:
return "TAOS_FAIL"
class TDTestCase:
#updatecfgDict = {'clientCfg': {'serverPort': 7080, 'firstEp': 'trd02:7080', 'secondEp':'trd02:7080'},\
# 'serverPort': 7080, 'firstEp': 'trd02:7080'}
hostname = socket.gethostname()
if (platform.system().lower() == 'windows' and not tdDnodes.dnodes[0].remoteIP == ""):
try:
config = eval(tdDnodes.dnodes[0].remoteIP)
hostname = config["host"]
except Exception:
hostname = tdDnodes.dnodes[0].remoteIP
serverPort = '7080'
rpcDebugFlagVal = '143'
clientCfgDict = {'serverPort': '', 'firstEp': '', 'secondEp':'', 'rpcDebugFlag':'135', 'fqdn':''}
clientCfgDict["serverPort"] = serverPort
clientCfgDict["firstEp"] = hostname + ':' + serverPort
clientCfgDict["secondEp"] = hostname + ':' + serverPort
clientCfgDict["rpcDebugFlag"] = rpcDebugFlagVal
clientCfgDict["fqdn"] = hostname
updatecfgDict = {'clientCfg': {}, 'serverPort': '', 'firstEp': '', 'secondEp':'', 'rpcDebugFlag':'135', 'fqdn':''}
updatecfgDict["clientCfg"] = clientCfgDict
updatecfgDict["serverPort"] = serverPort
updatecfgDict["firstEp"] = hostname + ':' + serverPort
updatecfgDict["secondEp"] = hostname + ':' + serverPort
updatecfgDict["fqdn"] = hostname
print ("===================: ", updatecfgDict)
def init(self, conn, logSql, replicaVar=1):
self.replicaVar = int(replicaVar)
tdLog.debug(f"start to excute {__file__}")
tdSql.init(conn.cursor())
def getBuildPath(self):
selfPath = os.path.dirname(os.path.realpath(__file__))
if ("community" in selfPath):
projPath = selfPath[:selfPath.find("community")]
else:
projPath = selfPath[:selfPath.find("tests")]
for root, dirs, files in os.walk(projPath):
if ("taosd" in files or "taosd.exe" in files):
rootRealPath = os.path.dirname(os.path.realpath(root))
if ("packaging" not in rootRealPath):
buildPath = root[:len(root) - len("/build/bin")]
break
return buildPath
def run(self): # sourcery skip: extract-duplicate-method, remove-redundant-fstring
tdSql.prepare()
# time.sleep(2)
tdSql.query("create user testpy pass 'testpy'")
buildPath = self.getBuildPath()
if (buildPath == ""):
tdLog.exit("taosd not found!")
else:
tdLog.info("taosd found in %s" % buildPath)
cfgPath = buildPath + "/../sim/psim/cfg"
tdLog.info("cfgPath: %s" % cfgPath)
checkNetworkStatus = ['0: unavailable', '1: network ok', '2: service ok', '3: service degraded', '4: exiting']
netrole = ['client', 'server']
keyDict = {'h':'', 'P':'6030', 'p':'testpy', 'u':'testpy', 'a':'', 'A':'', 'c':'', 'C':'', 's':'', 'r':'', 'f':'', \
'k':'', 't':'', 'n':'', 'l':'1024', 'N':'100', 'V':'', 'd':'db', 'w':'30', '-help':'', '-usage':'', '?':''}
keyDict['h'] = self.hostname
keyDict['c'] = cfgPath
keyDict['P'] = self.serverPort
tdSql.query("drop database if exists db1")
tdSql.query("create database if not exists db1 vgroups 1")
tdSql.query("use db1")
tdSql.query("create table tba (ts timestamp, f1 binary(2))")
tdSql.query("insert into tba values (now, '22')")
tdSql.query("select * from tba")
tdSql.checkData(0, 1, '22')
keyDict['s'] = "\"alter table db1.tba modify column f1 binary(5) \""
retCode = taos_command(buildPath, "s", keyDict['s'], "Query OK", '')
if retCode != "TAOS_OK":
tdLog.exit("taos -s fail")
keyDict['s'] = "\"insert into db1.tba values (now, '55555')\""
retCode = taos_command(buildPath, "s", keyDict['s'], "Insert OK", '')
if retCode != "TAOS_OK":
tdLog.exit("taos -s fail")
tdSql.query("select * from tba order by ts")
tdSql.checkData(0, 1, '22')
tdSql.checkData(1, 1, '55555')
tdSql.query("create table stb (ts timestamp, f1 int) tags (tg1 binary(2))")
tdSql.query("create table tb1 using stb tags('bb')")
tdSql.query("insert into tb1 values (now, 2)")
tdSql.query("select count(*) from stb group by tg1")
tdSql.checkData(0, 0, 1)
keyDict['s'] = "\"alter table db1.stb modify tag tg1 binary(5) \""
retCode = taos_command(buildPath, "s", keyDict['s'], "Query OK", '')
if retCode != "TAOS_OK":
tdLog.exit("taos -s fail")
keyDict['s'] = "\"create table db1.tb2 using db1.stb tags('bbbbb')\""
retCode = taos_command(buildPath, "s", keyDict['s'], "Create OK", '')
if retCode != "TAOS_OK":
tdLog.exit("taos -s fail")
keyDict['s'] = "\"insert into db1.tb2 values (now, 2)\""
retCode = taos_command(buildPath, "s", keyDict['s'], "Insert OK", '')
if retCode != "TAOS_OK":
tdLog.exit("taos -s fail")
tdSql.query("select count(*) from stb group by tg1")
tdSql.checkData(0, 0, 1)
tdSql.checkData(1, 0, 1)
def stop(self):
tdSql.close()
tdLog.success(f"{__file__} successfully executed")
tdCases.addLinux(__file__, TDTestCase())
tdCases.addWindows(__file__, TDTestCase())