Merge branch '3.0' into test/jcy
This commit is contained in:
commit
eeba1cdb2f
|
|
@ -194,6 +194,7 @@ DLL_EXPORT int taos_errno(TAOS_RES *tres);
|
||||||
|
|
||||||
DLL_EXPORT void taos_query_a(TAOS *taos, const char *sql, __taos_async_fn_t fp, void *param);
|
DLL_EXPORT void taos_query_a(TAOS *taos, const char *sql, __taos_async_fn_t fp, void *param);
|
||||||
DLL_EXPORT void taos_fetch_rows_a(TAOS_RES *res, __taos_async_fn_t fp, void *param);
|
DLL_EXPORT void taos_fetch_rows_a(TAOS_RES *res, __taos_async_fn_t fp, void *param);
|
||||||
|
DLL_EXPORT void taos_fetch_raw_block_a(TAOS_RES* res, __taos_async_fn_t fp, void* param);
|
||||||
|
|
||||||
// Shuduo: temporary enable for app build
|
// Shuduo: temporary enable for app build
|
||||||
#if 1
|
#if 1
|
||||||
|
|
|
||||||
|
|
@ -127,6 +127,8 @@ typedef enum EFunctionType {
|
||||||
FUNCTION_TYPE_APERCENTILE_MERGE,
|
FUNCTION_TYPE_APERCENTILE_MERGE,
|
||||||
FUNCTION_TYPE_SPREAD_PARTIAL,
|
FUNCTION_TYPE_SPREAD_PARTIAL,
|
||||||
FUNCTION_TYPE_SPREAD_MERGE,
|
FUNCTION_TYPE_SPREAD_MERGE,
|
||||||
|
FUNCTION_TYPE_HISTOGRAM_PARTIAL,
|
||||||
|
FUNCTION_TYPE_HISTOGRAM_MERGE,
|
||||||
|
|
||||||
// user defined funcion
|
// user defined funcion
|
||||||
FUNCTION_TYPE_UDF = 10000
|
FUNCTION_TYPE_UDF = 10000
|
||||||
|
|
|
||||||
|
|
@ -339,6 +339,7 @@ typedef struct {
|
||||||
int32_t sourceTaskId;
|
int32_t sourceTaskId;
|
||||||
int32_t sourceVg;
|
int32_t sourceVg;
|
||||||
int32_t sourceChildId;
|
int32_t sourceChildId;
|
||||||
|
int32_t upstreamNodeId;
|
||||||
#if 0
|
#if 0
|
||||||
int64_t sourceVer;
|
int64_t sourceVer;
|
||||||
#endif
|
#endif
|
||||||
|
|
|
||||||
|
|
@ -171,6 +171,7 @@ typedef struct SReqResultInfo {
|
||||||
uint32_t current;
|
uint32_t current;
|
||||||
bool completed;
|
bool completed;
|
||||||
int32_t precision;
|
int32_t precision;
|
||||||
|
bool convertUcs4;
|
||||||
int32_t payloadLen;
|
int32_t payloadLen;
|
||||||
} SReqResultInfo;
|
} SReqResultInfo;
|
||||||
|
|
||||||
|
|
@ -222,7 +223,7 @@ typedef struct SSyncQueryParam {
|
||||||
SRequestObj* pRequest;
|
SRequestObj* pRequest;
|
||||||
} SSyncQueryParam;
|
} SSyncQueryParam;
|
||||||
|
|
||||||
void* doAsyncFetchRow(SRequestObj* pRequest, bool setupOneRowPtr, bool convertUcs4);
|
void* doAsyncFetchRows(SRequestObj* pRequest, bool setupOneRowPtr, bool convertUcs4);
|
||||||
void* doFetchRows(SRequestObj* pRequest, bool setupOneRowPtr, bool convertUcs4);
|
void* doFetchRows(SRequestObj* pRequest, bool setupOneRowPtr, bool convertUcs4);
|
||||||
|
|
||||||
void doSetOneRowPtr(SReqResultInfo* pResultInfo);
|
void doSetOneRowPtr(SReqResultInfo* pResultInfo);
|
||||||
|
|
|
||||||
|
|
@ -191,6 +191,8 @@ void *createRequest(STscObj *pObj, int32_t type) {
|
||||||
pRequest->requestId = generateRequestId();
|
pRequest->requestId = generateRequestId();
|
||||||
pRequest->metric.start = taosGetTimestampUs();
|
pRequest->metric.start = taosGetTimestampUs();
|
||||||
|
|
||||||
|
pRequest->body.resInfo.convertUcs4 = true; // convert ucs4 by default
|
||||||
|
|
||||||
pRequest->type = type;
|
pRequest->type = type;
|
||||||
pRequest->pTscObj = pObj;
|
pRequest->pTscObj = pObj;
|
||||||
pRequest->msgBuf = taosMemoryCalloc(1, ERROR_MSG_BUF_DEFAULT_SIZE);
|
pRequest->msgBuf = taosMemoryCalloc(1, ERROR_MSG_BUF_DEFAULT_SIZE);
|
||||||
|
|
|
||||||
|
|
@ -1114,7 +1114,7 @@ static void syncFetchFn(void* param, TAOS_RES* res, int32_t numOfRows) {
|
||||||
tsem_post(&pParam->sem);
|
tsem_post(&pParam->sem);
|
||||||
}
|
}
|
||||||
|
|
||||||
void* doAsyncFetchRow(SRequestObj* pRequest, bool setupOneRowPtr, bool convertUcs4) {
|
void* doAsyncFetchRows(SRequestObj* pRequest, bool setupOneRowPtr, bool convertUcs4) {
|
||||||
assert(pRequest != NULL);
|
assert(pRequest != NULL);
|
||||||
|
|
||||||
SReqResultInfo* pResultInfo = &pRequest->body.resInfo;
|
SReqResultInfo* pResultInfo = &pRequest->body.resInfo;
|
||||||
|
|
@ -1126,6 +1126,10 @@ void* doAsyncFetchRow(SRequestObj* pRequest, bool setupOneRowPtr, bool convertUc
|
||||||
}
|
}
|
||||||
|
|
||||||
SSyncQueryParam* pParam = pRequest->body.param;
|
SSyncQueryParam* pParam = pRequest->body.param;
|
||||||
|
|
||||||
|
// convert ucs4 to native multi-bytes string
|
||||||
|
pResultInfo->convertUcs4 = convertUcs4;
|
||||||
|
|
||||||
taos_fetch_rows_a(pRequest, syncFetchFn, pParam);
|
taos_fetch_rows_a(pRequest, syncFetchFn, pParam);
|
||||||
tsem_wait(&pParam->sem);
|
tsem_wait(&pParam->sem);
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -219,14 +219,13 @@ TAOS_ROW taos_fetch_row(TAOS_RES *res) {
|
||||||
|
|
||||||
if (TD_RES_QUERY(res)) {
|
if (TD_RES_QUERY(res)) {
|
||||||
SRequestObj *pRequest = (SRequestObj *)res;
|
SRequestObj *pRequest = (SRequestObj *)res;
|
||||||
|
#if SYNC_ON_TOP_OF_ASYNC
|
||||||
|
return doAsyncFetchRows(pRequest, true, true);
|
||||||
|
#else
|
||||||
if (pRequest->type == TSDB_SQL_RETRIEVE_EMPTY_RESULT || pRequest->type == TSDB_SQL_INSERT ||
|
if (pRequest->type == TSDB_SQL_RETRIEVE_EMPTY_RESULT || pRequest->type == TSDB_SQL_INSERT ||
|
||||||
pRequest->code != TSDB_CODE_SUCCESS || taos_num_fields(res) == 0) {
|
pRequest->code != TSDB_CODE_SUCCESS || taos_num_fields(res) == 0) {
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
#if SYNC_ON_TOP_OF_ASYNC
|
|
||||||
return doAsyncFetchRow(pRequest, true, true);
|
|
||||||
#else
|
|
||||||
return doFetchRows(pRequest, true, true);
|
return doFetchRows(pRequest, true, true);
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
|
|
@ -489,6 +488,7 @@ int taos_fetch_block_s(TAOS_RES *res, int *numOfRows, TAOS_ROW *rows) {
|
||||||
if (res == NULL) {
|
if (res == NULL) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (TD_RES_QUERY(res)) {
|
if (TD_RES_QUERY(res)) {
|
||||||
SRequestObj *pRequest = (SRequestObj *)res;
|
SRequestObj *pRequest = (SRequestObj *)res;
|
||||||
|
|
||||||
|
|
@ -501,7 +501,7 @@ int taos_fetch_block_s(TAOS_RES *res, int *numOfRows, TAOS_ROW *rows) {
|
||||||
}
|
}
|
||||||
|
|
||||||
#if SYNC_ON_TOP_OF_ASYNC
|
#if SYNC_ON_TOP_OF_ASYNC
|
||||||
doAsyncFetchRow(pRequest, false, true);
|
doAsyncFetchRows(pRequest, false, true);
|
||||||
#else
|
#else
|
||||||
doFetchRows(pRequest, true, true);
|
doFetchRows(pRequest, true, true);
|
||||||
#endif
|
#endif
|
||||||
|
|
@ -552,7 +552,11 @@ int taos_fetch_raw_block(TAOS_RES *res, int *numOfRows, void **pData) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#if SYNC_ON_TOP_OF_ASYNC
|
||||||
|
doAsyncFetchRows(pRequest, false, false);
|
||||||
|
#else
|
||||||
doFetchRows(pRequest, false, false);
|
doFetchRows(pRequest, false, false);
|
||||||
|
#endif
|
||||||
|
|
||||||
SReqResultInfo *pResultInfo = &pRequest->body.resInfo;
|
SReqResultInfo *pResultInfo = &pRequest->body.resInfo;
|
||||||
|
|
||||||
|
|
@ -771,11 +775,11 @@ static void fetchCallback(void* pResult, void* param, int32_t code) {
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pRequest->code != TSDB_CODE_SUCCESS) {
|
if (pRequest->code != TSDB_CODE_SUCCESS) {
|
||||||
pRequest->code = code;
|
|
||||||
pRequest->body.fetchFp(pRequest->body.param, pRequest, 0);
|
pRequest->body.fetchFp(pRequest->body.param, pRequest, 0);
|
||||||
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
pRequest->code = setQueryResultFromRsp(&pRequest->body.resInfo, (SRetrieveTableRsp*)pResultInfo->pData, true, false);
|
pRequest->code = setQueryResultFromRsp(pResultInfo, (SRetrieveTableRsp*)pResultInfo->pData, pResultInfo->convertUcs4, false);
|
||||||
if (pRequest->code != TSDB_CODE_SUCCESS) {
|
if (pRequest->code != TSDB_CODE_SUCCESS) {
|
||||||
pResultInfo->numOfRows = 0;
|
pResultInfo->numOfRows = 0;
|
||||||
pRequest->code = code;
|
pRequest->code = code;
|
||||||
|
|
@ -815,6 +819,13 @@ void taos_fetch_rows_a(TAOS_RES *res, __taos_async_fn_t fp, void *param) {
|
||||||
schedulerAsyncFetchRows(pRequest->body.queryJob, fetchCallback, pRequest);
|
schedulerAsyncFetchRows(pRequest->body.queryJob, fetchCallback, pRequest);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void taos_fetch_raw_block_a(TAOS_RES* res, __taos_async_fn_t fp, void* param) {
|
||||||
|
ASSERT(res != NULL && fp != NULL);
|
||||||
|
SRequestObj *pRequest = res;
|
||||||
|
pRequest->body.resInfo.convertUcs4 = false;
|
||||||
|
taos_fetch_rows_a(res, fp, param);
|
||||||
|
}
|
||||||
|
|
||||||
TAOS_SUB *taos_subscribe(TAOS *taos, int restart, const char *topic, const char *sql, TAOS_SUBSCRIBE_CALLBACK fp,
|
TAOS_SUB *taos_subscribe(TAOS *taos, int restart, const char *topic, const char *sql, TAOS_SUBSCRIBE_CALLBACK fp,
|
||||||
void *param, int interval) {
|
void *param, int interval) {
|
||||||
// TODO
|
// TODO
|
||||||
|
|
|
||||||
|
|
@ -494,7 +494,6 @@ int32_t tmqHandleAllDelayedTask(tmq_t* tmq) {
|
||||||
tmqAskEp(tmq, true);
|
tmqAskEp(tmq, true);
|
||||||
taosTmrReset(tmqAssignDelayedHbTask, 1000, tmq, tmqMgmt.timer, &tmq->hbTimer);
|
taosTmrReset(tmqAssignDelayedHbTask, 1000, tmq, tmqMgmt.timer, &tmq->hbTimer);
|
||||||
} else if (*pTaskType == TMQ_DELAYED_TASK__COMMIT) {
|
} else if (*pTaskType == TMQ_DELAYED_TASK__COMMIT) {
|
||||||
/*tmq_commit(tmq, NULL, true);*/
|
|
||||||
tmqCommitInner(tmq, NULL, 1, 1, tmq->commitCb, tmq->commitCbUserParam);
|
tmqCommitInner(tmq, NULL, 1, 1, tmq->commitCb, tmq->commitCbUserParam);
|
||||||
taosTmrReset(tmqAssignDelayedCommitTask, tmq->autoCommitInterval, tmq, tmqMgmt.timer, &tmq->commitTimer);
|
taosTmrReset(tmqAssignDelayedCommitTask, tmq->autoCommitInterval, tmq, tmqMgmt.timer, &tmq->commitTimer);
|
||||||
} else if (*pTaskType == TMQ_DELAYED_TASK__REPORT) {
|
} else if (*pTaskType == TMQ_DELAYED_TASK__REPORT) {
|
||||||
|
|
@ -667,94 +666,6 @@ FAIL:
|
||||||
|
|
||||||
tmq_resp_err_t tmq_commit(tmq_t* tmq, const tmq_topic_vgroup_list_t* offsets, int32_t async) {
|
tmq_resp_err_t tmq_commit(tmq_t* tmq, const tmq_topic_vgroup_list_t* offsets, int32_t async) {
|
||||||
return tmqCommitInner(tmq, offsets, 0, async, tmq->commitCb, tmq->commitCbUserParam);
|
return tmqCommitInner(tmq, offsets, 0, async, tmq->commitCb, tmq->commitCbUserParam);
|
||||||
#if 0
|
|
||||||
// TODO: add read write lock
|
|
||||||
SRequestObj* pRequest = NULL;
|
|
||||||
tmq_resp_err_t resp = TMQ_RESP_ERR__SUCCESS;
|
|
||||||
// build msg
|
|
||||||
// send to mnode
|
|
||||||
SMqCMCommitOffsetReq req;
|
|
||||||
SArray* pOffsets = NULL;
|
|
||||||
|
|
||||||
if (offsets == NULL) {
|
|
||||||
pOffsets = taosArrayInit(0, sizeof(SMqOffset));
|
|
||||||
for (int i = 0; i < taosArrayGetSize(tmq->clientTopics); i++) {
|
|
||||||
SMqClientTopic* pTopic = taosArrayGet(tmq->clientTopics, i);
|
|
||||||
for (int j = 0; j < taosArrayGetSize(pTopic->vgs); j++) {
|
|
||||||
SMqClientVg* pVg = taosArrayGet(pTopic->vgs, j);
|
|
||||||
SMqOffset offset;
|
|
||||||
strcpy(offset.topicName, pTopic->topicName);
|
|
||||||
strcpy(offset.cgroup, tmq->groupId);
|
|
||||||
offset.vgId = pVg->vgId;
|
|
||||||
offset.offset = pVg->currentOffset;
|
|
||||||
taosArrayPush(pOffsets, &offset);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
req.num = pOffsets->size;
|
|
||||||
req.offsets = pOffsets->pData;
|
|
||||||
} else {
|
|
||||||
req.num = taosArrayGetSize(&offsets->container);
|
|
||||||
req.offsets = (SMqOffset*)offsets->container.pData;
|
|
||||||
}
|
|
||||||
|
|
||||||
SEncoder encoder;
|
|
||||||
|
|
||||||
tEncoderInit(&encoder, NULL, 0);
|
|
||||||
tEncodeSMqCMCommitOffsetReq(&encoder, &req);
|
|
||||||
int32_t tlen = encoder.pos;
|
|
||||||
void* buf = taosMemoryMalloc(tlen);
|
|
||||||
if (buf == NULL) {
|
|
||||||
tEncoderClear(&encoder);
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
tEncoderClear(&encoder);
|
|
||||||
|
|
||||||
tEncoderInit(&encoder, buf, tlen);
|
|
||||||
tEncodeSMqCMCommitOffsetReq(&encoder, &req);
|
|
||||||
tEncoderClear(&encoder);
|
|
||||||
|
|
||||||
pRequest = createRequest(tmq->pTscObj, NULL, NULL, TDMT_MND_MQ_COMMIT_OFFSET);
|
|
||||||
if (pRequest == NULL) {
|
|
||||||
tscError("failed to malloc request");
|
|
||||||
}
|
|
||||||
|
|
||||||
SMqCommitCbParam* pParam = taosMemoryCalloc(1, sizeof(SMqCommitCbParam));
|
|
||||||
if (pParam == NULL) {
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
pParam->tmq = tmq;
|
|
||||||
tsem_init(&pParam->rspSem, 0, 0);
|
|
||||||
pParam->async = async;
|
|
||||||
pParam->offsets = pOffsets;
|
|
||||||
|
|
||||||
pRequest->body.requestMsg = (SDataBuf){
|
|
||||||
.pData = buf,
|
|
||||||
.len = tlen,
|
|
||||||
.handle = NULL,
|
|
||||||
};
|
|
||||||
|
|
||||||
SMsgSendInfo* sendInfo = buildMsgInfoImpl(pRequest);
|
|
||||||
sendInfo->requestObjRefId = 0;
|
|
||||||
sendInfo->param = pParam;
|
|
||||||
sendInfo->fp = tmqCommitCb;
|
|
||||||
SEpSet epSet = getEpSet_s(&tmq->pTscObj->pAppInfo->mgmtEp);
|
|
||||||
|
|
||||||
int64_t transporterId = 0;
|
|
||||||
asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &epSet, &transporterId, sendInfo);
|
|
||||||
|
|
||||||
if (!async) {
|
|
||||||
tsem_wait(&pParam->rspSem);
|
|
||||||
resp = pParam->rspErr;
|
|
||||||
tsem_destroy(&pParam->rspSem);
|
|
||||||
taosMemoryFree(pParam);
|
|
||||||
|
|
||||||
if (pOffsets) {
|
|
||||||
taosArrayDestroy(pOffsets);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return resp;
|
|
||||||
#endif
|
|
||||||
}
|
}
|
||||||
|
|
||||||
tmq_resp_err_t tmq_subscribe(tmq_t* tmq, const tmq_list_t* topic_list) {
|
tmq_resp_err_t tmq_subscribe(tmq_t* tmq, const tmq_list_t* topic_list) {
|
||||||
|
|
@ -859,93 +770,6 @@ void tmq_conf_set_auto_commit_cb(tmq_conf_t* conf, tmq_commit_cb* cb, void* para
|
||||||
conf->commitCbUserParam = param;
|
conf->commitCbUserParam = param;
|
||||||
}
|
}
|
||||||
|
|
||||||
#if 0
|
|
||||||
TAOS_RES* tmq_create_stream(TAOS* taos, const char* streamName, const char* tbName, const char* sql) {
|
|
||||||
STscObj* pTscObj = (STscObj*)taos;
|
|
||||||
SRequestObj* pRequest = NULL;
|
|
||||||
SQuery* pQueryNode = NULL;
|
|
||||||
char* astStr = NULL;
|
|
||||||
int32_t sqlLen;
|
|
||||||
|
|
||||||
terrno = TSDB_CODE_SUCCESS;
|
|
||||||
if (taos == NULL || streamName == NULL || sql == NULL) {
|
|
||||||
tscError("invalid parameters for creating stream, connObj:%p, stream name:%s, sql:%s", taos, streamName, sql);
|
|
||||||
terrno = TSDB_CODE_TSC_INVALID_INPUT;
|
|
||||||
goto _return;
|
|
||||||
}
|
|
||||||
sqlLen = strlen(sql);
|
|
||||||
|
|
||||||
if (strlen(tbName) >= TSDB_TABLE_NAME_LEN) {
|
|
||||||
tscError("output tb name too long, max length:%d", TSDB_TABLE_NAME_LEN - 1);
|
|
||||||
terrno = TSDB_CODE_TSC_INVALID_INPUT;
|
|
||||||
goto _return;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (sqlLen > TSDB_MAX_ALLOWED_SQL_LEN) {
|
|
||||||
tscError("sql string exceeds max length:%d", TSDB_MAX_ALLOWED_SQL_LEN);
|
|
||||||
terrno = TSDB_CODE_TSC_EXCEED_SQL_LIMIT;
|
|
||||||
goto _return;
|
|
||||||
}
|
|
||||||
|
|
||||||
tscDebug("start to create stream: %s", streamName);
|
|
||||||
|
|
||||||
int32_t code = 0;
|
|
||||||
CHECK_CODE_GOTO(buildRequest(pTscObj, sql, sqlLen, &pRequest), _return);
|
|
||||||
CHECK_CODE_GOTO(parseSql(pRequest, false, &pQueryNode, NULL), _return);
|
|
||||||
CHECK_CODE_GOTO(nodesNodeToString(pQueryNode->pRoot, false, &astStr, NULL), _return);
|
|
||||||
|
|
||||||
/*printf("%s\n", pStr);*/
|
|
||||||
|
|
||||||
SName name = {.acctId = pTscObj->acctId, .type = TSDB_TABLE_NAME_T};
|
|
||||||
strcpy(name.dbname, pRequest->pDb);
|
|
||||||
strcpy(name.tname, streamName);
|
|
||||||
|
|
||||||
SCMCreateStreamReq req = {
|
|
||||||
.igExists = 1,
|
|
||||||
.ast = astStr,
|
|
||||||
.sql = (char*)sql,
|
|
||||||
};
|
|
||||||
tNameExtractFullName(&name, req.name);
|
|
||||||
strcpy(req.targetStbFullName, tbName);
|
|
||||||
|
|
||||||
int tlen = tSerializeSCMCreateStreamReq(NULL, 0, &req);
|
|
||||||
void* buf = taosMemoryMalloc(tlen);
|
|
||||||
if (buf == NULL) {
|
|
||||||
goto _return;
|
|
||||||
}
|
|
||||||
|
|
||||||
tSerializeSCMCreateStreamReq(buf, tlen, &req);
|
|
||||||
|
|
||||||
pRequest->body.requestMsg = (SDataBuf){
|
|
||||||
.pData = buf,
|
|
||||||
.len = tlen,
|
|
||||||
.handle = NULL,
|
|
||||||
};
|
|
||||||
pRequest->type = TDMT_MND_CREATE_STREAM;
|
|
||||||
|
|
||||||
SMsgSendInfo* sendInfo = buildMsgInfoImpl(pRequest);
|
|
||||||
SEpSet epSet = getEpSet_s(&pTscObj->pAppInfo->mgmtEp);
|
|
||||||
|
|
||||||
int64_t transporterId = 0;
|
|
||||||
asyncSendMsgToServer(pTscObj->pAppInfo->pTransporter, &epSet, &transporterId, sendInfo);
|
|
||||||
|
|
||||||
tsem_wait(&pRequest->body.rspSem);
|
|
||||||
|
|
||||||
_return:
|
|
||||||
taosMemoryFreeClear(astStr);
|
|
||||||
qDestroyQuery(pQueryNode);
|
|
||||||
/*if (sendInfo != NULL) {*/
|
|
||||||
/*destroySendMsgInfo(sendInfo);*/
|
|
||||||
/*}*/
|
|
||||||
|
|
||||||
if (pRequest != NULL && terrno != TSDB_CODE_SUCCESS) {
|
|
||||||
pRequest->code = terrno;
|
|
||||||
}
|
|
||||||
|
|
||||||
return pRequest;
|
|
||||||
}
|
|
||||||
#endif
|
|
||||||
|
|
||||||
#if 0
|
#if 0
|
||||||
int32_t tmqGetSkipLogNum(tmq_message_t* tmq_message) {
|
int32_t tmqGetSkipLogNum(tmq_message_t* tmq_message) {
|
||||||
if (tmq_message == NULL) return 0;
|
if (tmq_message == NULL) return 0;
|
||||||
|
|
@ -1540,10 +1364,11 @@ const char* tmq_get_table_name(TAOS_RES* res) {
|
||||||
}
|
}
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
DLL_EXPORT void tmq_commit_async(tmq_t* tmq, const tmq_topic_vgroup_list_t* offsets, tmq_commit_cb* cb, void* param) {
|
|
||||||
|
void tmq_commit_async(tmq_t* tmq, const tmq_topic_vgroup_list_t* offsets, tmq_commit_cb* cb, void* param) {
|
||||||
tmqCommitInner(tmq, offsets, 0, 1, cb, param);
|
tmqCommitInner(tmq, offsets, 0, 1, cb, param);
|
||||||
}
|
}
|
||||||
|
|
||||||
DLL_EXPORT tmq_resp_err_t tmq_commit_sync(tmq_t* tmq, const tmq_topic_vgroup_list_t* offsets) {
|
tmq_resp_err_t tmq_commit_sync(tmq_t* tmq, const tmq_topic_vgroup_list_t* offsets) {
|
||||||
return tmqCommitInner(tmq, offsets, 0, 0, NULL, NULL);
|
return tmqCommitInner(tmq, offsets, 0, 0, NULL, NULL);
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -684,7 +684,8 @@ size_t blockDataGetRowSize(SSDataBlock* pBlock) {
|
||||||
*/
|
*/
|
||||||
size_t blockDataGetSerialMetaSize(uint32_t numOfCols) {
|
size_t blockDataGetSerialMetaSize(uint32_t numOfCols) {
|
||||||
// | total rows/total length | block group id | column schema | each column length |
|
// | total rows/total length | block group id | column schema | each column length |
|
||||||
return sizeof(int32_t) + sizeof(uint64_t) + numOfCols * (sizeof(int16_t) + sizeof(int32_t)) + numOfCols * sizeof(int32_t);
|
return sizeof(int32_t) + sizeof(uint64_t) + numOfCols * (sizeof(int16_t) + sizeof(int32_t)) +
|
||||||
|
numOfCols * sizeof(int32_t);
|
||||||
}
|
}
|
||||||
|
|
||||||
double blockDataGetSerialRowSize(const SSDataBlock* pBlock) {
|
double blockDataGetSerialRowSize(const SSDataBlock* pBlock) {
|
||||||
|
|
@ -1217,6 +1218,7 @@ SSDataBlock* createOneDataBlock(const SSDataBlock* pDataBlock, bool copyData) {
|
||||||
pBlock->info.numOfCols = numOfCols;
|
pBlock->info.numOfCols = numOfCols;
|
||||||
pBlock->info.hasVarCol = pDataBlock->info.hasVarCol;
|
pBlock->info.hasVarCol = pDataBlock->info.hasVarCol;
|
||||||
pBlock->info.rowSize = pDataBlock->info.rowSize;
|
pBlock->info.rowSize = pDataBlock->info.rowSize;
|
||||||
|
pBlock->info.groupId = pDataBlock->info.groupId;
|
||||||
|
|
||||||
for (int32_t i = 0; i < numOfCols; ++i) {
|
for (int32_t i = 0; i < numOfCols; ++i) {
|
||||||
SColumnInfoData colInfo = {0};
|
SColumnInfoData colInfo = {0};
|
||||||
|
|
@ -1892,12 +1894,12 @@ void blockCompressEncode(const SSDataBlock* pBlock, char* data, int32_t* dataLen
|
||||||
uint64_t* groupId = (uint64_t*)data;
|
uint64_t* groupId = (uint64_t*)data;
|
||||||
data += sizeof(uint64_t);
|
data += sizeof(uint64_t);
|
||||||
|
|
||||||
for(int32_t i = 0; i < numOfCols; ++i) {
|
for (int32_t i = 0; i < numOfCols; ++i) {
|
||||||
SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, i);
|
SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, i);
|
||||||
*((int16_t*) data) = pColInfoData->info.type;
|
*((int16_t*)data) = pColInfoData->info.type;
|
||||||
data += sizeof(int16_t);
|
data += sizeof(int16_t);
|
||||||
|
|
||||||
*((int32_t*) data) = pColInfoData->info.bytes;
|
*((int32_t*)data) = pColInfoData->info.bytes;
|
||||||
data += sizeof(int32_t);
|
data += sizeof(int32_t);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -1943,6 +1945,8 @@ void blockCompressEncode(const SSDataBlock* pBlock, char* data, int32_t* dataLen
|
||||||
|
|
||||||
const char* blockCompressDecode(SSDataBlock* pBlock, int32_t numOfCols, int32_t numOfRows, const char* pData) {
|
const char* blockCompressDecode(SSDataBlock* pBlock, int32_t numOfCols, int32_t numOfRows, const char* pData) {
|
||||||
blockDataEnsureCapacity(pBlock, numOfRows);
|
blockDataEnsureCapacity(pBlock, numOfRows);
|
||||||
|
pBlock->info.rows = numOfRows;
|
||||||
|
|
||||||
const char* pStart = pData;
|
const char* pStart = pData;
|
||||||
|
|
||||||
int32_t dataLen = *(int32_t*)pStart;
|
int32_t dataLen = *(int32_t*)pStart;
|
||||||
|
|
@ -1951,13 +1955,25 @@ const char* blockCompressDecode(SSDataBlock* pBlock, int32_t numOfCols, int32_t
|
||||||
pBlock->info.groupId = *(uint64_t*)pStart;
|
pBlock->info.groupId = *(uint64_t*)pStart;
|
||||||
pStart += sizeof(uint64_t);
|
pStart += sizeof(uint64_t);
|
||||||
|
|
||||||
for(int32_t i = 0; i < numOfCols; ++i) {
|
if (pBlock->pDataBlock == NULL) {
|
||||||
|
pBlock->pDataBlock = taosArrayInit(numOfCols, sizeof(SColumnInfoData));
|
||||||
|
taosArraySetSize(pBlock->pDataBlock, numOfCols);
|
||||||
|
}
|
||||||
|
|
||||||
|
pBlock->info.numOfCols = taosArrayGetSize(pBlock->pDataBlock);
|
||||||
|
ASSERT(pBlock->pDataBlock->size >= numOfCols);
|
||||||
|
|
||||||
|
for (int32_t i = 0; i < numOfCols; ++i) {
|
||||||
SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, i);
|
SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, i);
|
||||||
pColInfoData->info.type = *(int16_t*)pStart;
|
pColInfoData->info.type = *(int16_t*)pStart;
|
||||||
pStart += sizeof(int16_t);
|
pStart += sizeof(int16_t);
|
||||||
|
|
||||||
pColInfoData->info.bytes = *(int32_t*)pStart;
|
pColInfoData->info.bytes = *(int32_t*)pStart;
|
||||||
pStart += sizeof(int32_t);
|
pStart += sizeof(int32_t);
|
||||||
|
|
||||||
|
if (IS_VAR_DATA_TYPE(pColInfoData->info.type)) {
|
||||||
|
pBlock->info.hasVarCol = true;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
blockDataEnsureCapacity(pBlock, numOfRows);
|
blockDataEnsureCapacity(pBlock, numOfRows);
|
||||||
|
|
@ -1982,11 +1998,17 @@ const char* blockCompressDecode(SSDataBlock* pBlock, int32_t numOfCols, int32_t
|
||||||
pColInfoData->pData = taosMemoryMalloc(colLen[i]);
|
pColInfoData->pData = taosMemoryMalloc(colLen[i]);
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
|
if (pColInfoData->nullbitmap == NULL) {
|
||||||
|
pColInfoData->nullbitmap = taosMemoryCalloc(1, BitmapLen(numOfRows));
|
||||||
|
}
|
||||||
memcpy(pColInfoData->nullbitmap, pStart, BitmapLen(numOfRows));
|
memcpy(pColInfoData->nullbitmap, pStart, BitmapLen(numOfRows));
|
||||||
pStart += BitmapLen(numOfRows);
|
pStart += BitmapLen(numOfRows);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (colLen[i] > 0) {
|
if (colLen[i] > 0) {
|
||||||
|
if (pColInfoData->pData == NULL) {
|
||||||
|
pColInfoData->pData = taosMemoryCalloc(1, colLen[i]);
|
||||||
|
}
|
||||||
memcpy(pColInfoData->pData, pStart, colLen[i]);
|
memcpy(pColInfoData->pData, pStart, colLen[i]);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -244,7 +244,7 @@ int32_t tTSRowNew(STSRowBuilder *pBuilder, SArray *pArray, STSchema *pTSchema, S
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
ASSERT(flags);
|
// ASSERT(flags); // only 1 column(ts)
|
||||||
|
|
||||||
// decide
|
// decide
|
||||||
uint32_t nData = 0;
|
uint32_t nData = 0;
|
||||||
|
|
@ -268,7 +268,8 @@ int32_t tTSRowNew(STSRowBuilder *pBuilder, SArray *pArray, STSchema *pTSchema, S
|
||||||
nDataT = BIT2_SIZE(pTSchema->numOfCols - 1) + pTSchema->flen + ntv;
|
nDataT = BIT2_SIZE(pTSchema->numOfCols - 1) + pTSchema->flen + ntv;
|
||||||
break;
|
break;
|
||||||
default:
|
default:
|
||||||
ASSERT(0);
|
break; // only ts column
|
||||||
|
// ASSERT(0);
|
||||||
}
|
}
|
||||||
|
|
||||||
uint8_t tflags = 0;
|
uint8_t tflags = 0;
|
||||||
|
|
@ -283,7 +284,7 @@ int32_t tTSRowNew(STSRowBuilder *pBuilder, SArray *pArray, STSchema *pTSchema, S
|
||||||
tflags |= TSROW_KV_BIG;
|
tflags |= TSROW_KV_BIG;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (nDataT < nDataK) {
|
if (nDataT <= nDataK) {
|
||||||
nData = nDataT;
|
nData = nDataT;
|
||||||
} else {
|
} else {
|
||||||
nData = nDataK;
|
nData = nDataK;
|
||||||
|
|
@ -373,7 +374,8 @@ int32_t tTSRowNew(STSRowBuilder *pBuilder, SArray *pArray, STSchema *pTSchema, S
|
||||||
ptv = pf + pTSchema->flen;
|
ptv = pf + pTSchema->flen;
|
||||||
break;
|
break;
|
||||||
default:
|
default:
|
||||||
ASSERT(0);
|
// ASSERT(0);
|
||||||
|
break;
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
pTSKVRow = (STSKVRow *)(*ppRow)->pData;
|
pTSKVRow = (STSKVRow *)(*ppRow)->pData;
|
||||||
|
|
@ -495,7 +497,7 @@ void tTSRowGet(STSRow2 *pRow, STSchema *pTSchema, int32_t iCol, SColVal *pColVal
|
||||||
SValue value;
|
SValue value;
|
||||||
|
|
||||||
ASSERT(iCol < pTSchema->numOfCols);
|
ASSERT(iCol < pTSchema->numOfCols);
|
||||||
ASSERT(flags);
|
// ASSERT(flags); // only 1 ts column
|
||||||
ASSERT(pRow->sver == pTSchema->version);
|
ASSERT(pRow->sver == pTSchema->version);
|
||||||
|
|
||||||
if (iCol == 0) {
|
if (iCol == 0) {
|
||||||
|
|
|
||||||
|
|
@ -5,7 +5,12 @@ MESSAGE(STATUS "build parser unit test")
|
||||||
SET(CMAKE_CXX_STANDARD 11)
|
SET(CMAKE_CXX_STANDARD 11)
|
||||||
AUX_SOURCE_DIRECTORY(${CMAKE_CURRENT_SOURCE_DIR} SOURCE_LIST)
|
AUX_SOURCE_DIRECTORY(${CMAKE_CURRENT_SOURCE_DIR} SOURCE_LIST)
|
||||||
|
|
||||||
ADD_EXECUTABLE(commonTest ${SOURCE_LIST})
|
ADD_EXECUTABLE(commonTest "")
|
||||||
|
TARGET_SOURCES(
|
||||||
|
commonTest
|
||||||
|
PRIVATE
|
||||||
|
"commonTests.cpp"
|
||||||
|
)
|
||||||
TARGET_LINK_LIBRARIES(
|
TARGET_LINK_LIBRARIES(
|
||||||
commonTest
|
commonTest
|
||||||
PUBLIC os util common gtest
|
PUBLIC os util common gtest
|
||||||
|
|
@ -24,7 +29,12 @@ target_sources(
|
||||||
PRIVATE
|
PRIVATE
|
||||||
"dataformatTest.cpp"
|
"dataformatTest.cpp"
|
||||||
)
|
)
|
||||||
target_link_libraries(dataformatTest gtest gtest_main util)
|
target_link_libraries(dataformatTest gtest gtest_main util common)
|
||||||
|
target_include_directories(
|
||||||
|
dataformatTest
|
||||||
|
PUBLIC "${TD_SOURCE_DIR}/include/common"
|
||||||
|
PUBLIC "${TD_SOURCE_DIR}/include/util"
|
||||||
|
)
|
||||||
|
|
||||||
# tmsg test
|
# tmsg test
|
||||||
# add_executable(tmsgTest "")
|
# add_executable(tmsgTest "")
|
||||||
|
|
|
||||||
|
|
@ -1 +1,481 @@
|
||||||
#include "gtest/gtest.h"
|
/*
|
||||||
|
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
|
||||||
|
*
|
||||||
|
* This program is free software: you can use, redistribute, and/or modify
|
||||||
|
* it under the terms of the GNU Affero General Public License, version 3
|
||||||
|
* or later ("AGPL"), as published by the Free Software Foundation.
|
||||||
|
*
|
||||||
|
* This program is distributed in the hope that it will be useful, but WITHOUT
|
||||||
|
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
||||||
|
* FITNESS FOR A PARTICULAR PURPOSE.
|
||||||
|
*
|
||||||
|
* You should have received a copy of the GNU Affero General Public License
|
||||||
|
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||||
|
*/
|
||||||
|
|
||||||
|
#include <gtest/gtest.h>
|
||||||
|
|
||||||
|
#include <taoserror.h>
|
||||||
|
#include <tdataformat.h>
|
||||||
|
#include <tglobal.h>
|
||||||
|
#include <tmsg.h>
|
||||||
|
#include <iostream>
|
||||||
|
|
||||||
|
#pragma GCC diagnostic push
|
||||||
|
#pragma GCC diagnostic ignored "-Wwrite-strings"
|
||||||
|
#pragma GCC diagnostic ignored "-Wunused-function"
|
||||||
|
#pragma GCC diagnostic ignored "-Wunused-variable"
|
||||||
|
#pragma GCC diagnostic ignored "-Wsign-compare"
|
||||||
|
|
||||||
|
#define NONE_CSTR "no"
|
||||||
|
#define NULL_CSTR "nu"
|
||||||
|
#define NONE_LEN 2
|
||||||
|
#define NULL_LEN 2
|
||||||
|
const static int16_t MAX_COLS = 14;
|
||||||
|
|
||||||
|
int main(int argc, char **argv) {
|
||||||
|
testing::InitGoogleTest(&argc, argv);
|
||||||
|
return RUN_ALL_TESTS();
|
||||||
|
}
|
||||||
|
|
||||||
|
// ts timestamp, c1 int, c2 bigint, c3 float, c4 double, c5 binary(10), c6 nchar(10), c7 tinyint, c8 smallint, c9 bool
|
||||||
|
STSchema *genSTSchema(int16_t nCols) {
|
||||||
|
EXPECT_LE(nCols, MAX_COLS);
|
||||||
|
SSchema *pSchema = (SSchema *)taosMemoryCalloc(nCols, sizeof(SSchema));
|
||||||
|
EXPECT_NE(pSchema, nullptr);
|
||||||
|
|
||||||
|
for (int16_t i = 0; i < nCols; ++i) {
|
||||||
|
pSchema[i].colId = PRIMARYKEY_TIMESTAMP_COL_ID + i;
|
||||||
|
char colName[TSDB_COL_NAME_LEN] = {0};
|
||||||
|
snprintf(colName, TSDB_COL_NAME_LEN, "c%" PRIi16, i);
|
||||||
|
strncpy(pSchema[i].name, colName, TSDB_COL_NAME_LEN);
|
||||||
|
|
||||||
|
switch (i) {
|
||||||
|
case 0: {
|
||||||
|
pSchema[0].type = TSDB_DATA_TYPE_TIMESTAMP;
|
||||||
|
pSchema[0].bytes = TYPE_BYTES[pSchema[0].type];
|
||||||
|
} break;
|
||||||
|
case 1: {
|
||||||
|
pSchema[1].type = TSDB_DATA_TYPE_INT;
|
||||||
|
pSchema[1].bytes = TYPE_BYTES[pSchema[1].type];
|
||||||
|
;
|
||||||
|
} break;
|
||||||
|
case 2: {
|
||||||
|
pSchema[2].type = TSDB_DATA_TYPE_BIGINT;
|
||||||
|
pSchema[2].bytes = TYPE_BYTES[pSchema[2].type];
|
||||||
|
} break;
|
||||||
|
case 3: {
|
||||||
|
pSchema[3].type = TSDB_DATA_TYPE_FLOAT;
|
||||||
|
pSchema[3].bytes = TYPE_BYTES[pSchema[3].type];
|
||||||
|
} break;
|
||||||
|
case 4: {
|
||||||
|
pSchema[4].type = TSDB_DATA_TYPE_DOUBLE;
|
||||||
|
pSchema[4].bytes = TYPE_BYTES[pSchema[4].type];
|
||||||
|
} break;
|
||||||
|
case 5: {
|
||||||
|
pSchema[5].type = TSDB_DATA_TYPE_BINARY;
|
||||||
|
pSchema[5].bytes = 12;
|
||||||
|
} break;
|
||||||
|
case 6: {
|
||||||
|
pSchema[6].type = TSDB_DATA_TYPE_NCHAR;
|
||||||
|
pSchema[6].bytes = 42;
|
||||||
|
} break;
|
||||||
|
case 7: {
|
||||||
|
pSchema[7].type = TSDB_DATA_TYPE_TINYINT;
|
||||||
|
pSchema[7].bytes = TYPE_BYTES[pSchema[7].type];
|
||||||
|
} break;
|
||||||
|
case 8: {
|
||||||
|
pSchema[8].type = TSDB_DATA_TYPE_SMALLINT;
|
||||||
|
pSchema[8].bytes = TYPE_BYTES[pSchema[8].type];
|
||||||
|
} break;
|
||||||
|
case 9: {
|
||||||
|
pSchema[9].type = TSDB_DATA_TYPE_BOOL;
|
||||||
|
pSchema[9].bytes = TYPE_BYTES[pSchema[9].type];
|
||||||
|
} break;
|
||||||
|
case 10: {
|
||||||
|
pSchema[10].type = TSDB_DATA_TYPE_UTINYINT;
|
||||||
|
pSchema[10].bytes = TYPE_BYTES[pSchema[10].type];
|
||||||
|
} break;
|
||||||
|
case 11: {
|
||||||
|
pSchema[11].type = TSDB_DATA_TYPE_USMALLINT;
|
||||||
|
pSchema[11].bytes = TYPE_BYTES[pSchema[11].type];
|
||||||
|
} break;
|
||||||
|
case 12: {
|
||||||
|
pSchema[12].type = TSDB_DATA_TYPE_UINT;
|
||||||
|
pSchema[12].bytes = TYPE_BYTES[pSchema[12].type];
|
||||||
|
} break;
|
||||||
|
case 13: {
|
||||||
|
pSchema[13].type = TSDB_DATA_TYPE_UBIGINT;
|
||||||
|
pSchema[13].bytes = TYPE_BYTES[pSchema[13].type];
|
||||||
|
} break;
|
||||||
|
|
||||||
|
default:
|
||||||
|
ASSERT(0);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
STSchema *pResult = NULL;
|
||||||
|
pResult = tdGetSTSChemaFromSSChema(&pSchema, nCols);
|
||||||
|
|
||||||
|
taosMemoryFree(pSchema);
|
||||||
|
return pResult;
|
||||||
|
}
|
||||||
|
|
||||||
|
// ts timestamp, c1 int, c2 bigint, c3 float, c4 double, c5 binary(10), c6 nchar(10), c7 tinyint, c8 smallint, c9 bool
|
||||||
|
static int32_t genTestData(const char **data, int16_t nCols, SArray **pArray) {
|
||||||
|
if (!(*pArray)) {
|
||||||
|
*pArray = taosArrayInit(nCols, sizeof(SColVal));
|
||||||
|
if (!(*pArray)) return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
for (int16_t i = 0; i < nCols; ++i) {
|
||||||
|
SColVal colVal = {0};
|
||||||
|
colVal.cid = PRIMARYKEY_TIMESTAMP_COL_ID + i;
|
||||||
|
if (strncasecmp(data[i], NONE_CSTR, NONE_LEN) == 0) {
|
||||||
|
colVal.isNone = 1;
|
||||||
|
taosArrayPush(*pArray, &colVal);
|
||||||
|
continue;
|
||||||
|
} else if (strncasecmp(data[i], NULL_CSTR, NULL_LEN) == 0) {
|
||||||
|
colVal.isNull = 1;
|
||||||
|
taosArrayPush(*pArray, &colVal);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
switch (i) {
|
||||||
|
case 0:
|
||||||
|
sscanf(data[i], "%" PRIi64, &colVal.value.ts);
|
||||||
|
break;
|
||||||
|
case 1: {
|
||||||
|
sscanf(data[i], "%" PRIi32, &colVal.value.i32);
|
||||||
|
} break;
|
||||||
|
case 2:
|
||||||
|
sscanf(data[i], "%" PRIi64, &colVal.value.i64);
|
||||||
|
break;
|
||||||
|
case 3:
|
||||||
|
sscanf(data[i], "%f", &colVal.value.f);
|
||||||
|
break;
|
||||||
|
case 4:
|
||||||
|
sscanf(data[i], "%lf", &colVal.value.d);
|
||||||
|
break;
|
||||||
|
case 5: {
|
||||||
|
int16_t dataLen = strlen(data[i]) + 1;
|
||||||
|
colVal.value.nData = dataLen < 10 ? dataLen : 10;
|
||||||
|
colVal.value.pData = (uint8_t *)data[i];
|
||||||
|
} break;
|
||||||
|
case 6: {
|
||||||
|
int16_t dataLen = strlen(data[i]) + 1;
|
||||||
|
colVal.value.nData = dataLen < 40 ? dataLen : 40;
|
||||||
|
colVal.value.pData = (uint8_t *)data[i]; // just for test, not real nchar
|
||||||
|
} break;
|
||||||
|
case 7:
|
||||||
|
case 9: {
|
||||||
|
int32_t d8;
|
||||||
|
sscanf(data[i], "%" PRId32, &d8);
|
||||||
|
colVal.value.i8 = (int8_t)d8;
|
||||||
|
} break;
|
||||||
|
case 8: {
|
||||||
|
int32_t d16;
|
||||||
|
sscanf(data[i], "%" PRId32, &d16);
|
||||||
|
colVal.value.i16 = (int16_t)d16;
|
||||||
|
} break;
|
||||||
|
case 10: {
|
||||||
|
uint32_t u8;
|
||||||
|
sscanf(data[i], "%" PRId32, &u8);
|
||||||
|
colVal.value.u8 = (uint8_t)u8;
|
||||||
|
} break;
|
||||||
|
case 11: {
|
||||||
|
uint32_t u16;
|
||||||
|
sscanf(data[i], "%" PRId32, &u16);
|
||||||
|
colVal.value.u16 = (uint16_t)u16;
|
||||||
|
} break;
|
||||||
|
case 12: {
|
||||||
|
sscanf(data[i], "%" PRIu32, &colVal.value.u32);
|
||||||
|
} break;
|
||||||
|
case 13: {
|
||||||
|
sscanf(data[i], "%" PRIu64, &colVal.value.u64);
|
||||||
|
} break;
|
||||||
|
default:
|
||||||
|
ASSERT(0);
|
||||||
|
}
|
||||||
|
taosArrayPush(*pArray, &colVal);
|
||||||
|
}
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t debugPrintSColVal(SColVal *cv, int8_t type) {
|
||||||
|
if (cv->isNone) {
|
||||||
|
printf("None ");
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
if (cv->isNull) {
|
||||||
|
printf("Null ");
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
switch (type) {
|
||||||
|
case TSDB_DATA_TYPE_BOOL:
|
||||||
|
printf("%s ", cv->value.i8 == 0 ? "false" : "true");
|
||||||
|
break;
|
||||||
|
case TSDB_DATA_TYPE_TINYINT:
|
||||||
|
printf("%" PRIi8 " ", cv->value.i8);
|
||||||
|
break;
|
||||||
|
case TSDB_DATA_TYPE_SMALLINT:
|
||||||
|
printf("%" PRIi16 " ", cv->value.i16);
|
||||||
|
break;
|
||||||
|
case TSDB_DATA_TYPE_INT:
|
||||||
|
printf("%" PRIi32 " ", cv->value.i32);
|
||||||
|
break;
|
||||||
|
case TSDB_DATA_TYPE_BIGINT:
|
||||||
|
printf("%" PRIi64 " ", cv->value.i64);
|
||||||
|
break;
|
||||||
|
case TSDB_DATA_TYPE_FLOAT:
|
||||||
|
printf("%f ", cv->value.f);
|
||||||
|
break;
|
||||||
|
case TSDB_DATA_TYPE_DOUBLE:
|
||||||
|
printf("%lf ", cv->value.d);
|
||||||
|
break;
|
||||||
|
case TSDB_DATA_TYPE_VARCHAR: {
|
||||||
|
char tv[15] = {0};
|
||||||
|
snprintf(tv, 15, "%s", cv->value.pData);
|
||||||
|
printf("%s ", tv);
|
||||||
|
} break;
|
||||||
|
case TSDB_DATA_TYPE_TIMESTAMP:
|
||||||
|
printf("%" PRIi64 " ", cv->value.i64);
|
||||||
|
break;
|
||||||
|
case TSDB_DATA_TYPE_NCHAR: {
|
||||||
|
char tv[15] = {0};
|
||||||
|
snprintf(tv, 15, "%s", cv->value.pData);
|
||||||
|
printf("%s ", tv);
|
||||||
|
} break;
|
||||||
|
case TSDB_DATA_TYPE_UTINYINT:
|
||||||
|
printf("%" PRIu8 " ", cv->value.u8);
|
||||||
|
break;
|
||||||
|
case TSDB_DATA_TYPE_USMALLINT:
|
||||||
|
printf("%" PRIu16 " ", cv->value.u16);
|
||||||
|
break;
|
||||||
|
case TSDB_DATA_TYPE_UINT:
|
||||||
|
printf("%" PRIu32 " ", cv->value.u32);
|
||||||
|
break;
|
||||||
|
case TSDB_DATA_TYPE_UBIGINT:
|
||||||
|
printf("%" PRIu64 " ", cv->value.u64);
|
||||||
|
break;
|
||||||
|
case TSDB_DATA_TYPE_JSON:
|
||||||
|
printf("JSON ");
|
||||||
|
break;
|
||||||
|
case TSDB_DATA_TYPE_VARBINARY:
|
||||||
|
printf("VARBIN ");
|
||||||
|
break;
|
||||||
|
case TSDB_DATA_TYPE_DECIMAL:
|
||||||
|
printf("DECIMAL ");
|
||||||
|
break;
|
||||||
|
case TSDB_DATA_TYPE_BLOB:
|
||||||
|
printf("BLOB ");
|
||||||
|
break;
|
||||||
|
case TSDB_DATA_TYPE_MEDIUMBLOB:
|
||||||
|
printf("MedBLOB ");
|
||||||
|
break;
|
||||||
|
// case TSDB_DATA_TYPE_BINARY:
|
||||||
|
// printf("BINARY ");
|
||||||
|
// break;
|
||||||
|
case TSDB_DATA_TYPE_MAX:
|
||||||
|
printf("UNDEF ");
|
||||||
|
break;
|
||||||
|
default:
|
||||||
|
printf("UNDEF ");
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
void debugPrintTSRow(STSRow2 *row, STSchema *pTSchema, const char *tags, int32_t ln) {
|
||||||
|
printf("%s:%d %s:v%d:%d ", tags, ln, (row->flags & 0xf0) ? "KV" : "TP", row->sver, row->nData);
|
||||||
|
for (int16_t i = 0; i < schemaNCols(pTSchema); ++i) {
|
||||||
|
SColVal cv = {0};
|
||||||
|
tTSRowGet(row, pTSchema, i, &cv);
|
||||||
|
debugPrintSColVal(&cv, pTSchema->columns[i].type);
|
||||||
|
}
|
||||||
|
printf("\n");
|
||||||
|
fflush(stdout);
|
||||||
|
}
|
||||||
|
|
||||||
|
static int32_t checkSColVal(const char *rawVal, SColVal *cv, int8_t type) {
|
||||||
|
ASSERT(rawVal);
|
||||||
|
|
||||||
|
if (cv->isNone) {
|
||||||
|
EXPECT_STRCASEEQ(rawVal, NONE_CSTR);
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
if (cv->isNull) {
|
||||||
|
EXPECT_STRCASEEQ(rawVal, NULL_CSTR);
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
SValue rawSVal = {0};
|
||||||
|
switch (type) {
|
||||||
|
case TSDB_DATA_TYPE_BOOL:
|
||||||
|
case TSDB_DATA_TYPE_TINYINT: {
|
||||||
|
int32_t d8;
|
||||||
|
sscanf(rawVal, "%" PRId32, &d8);
|
||||||
|
EXPECT_EQ(cv->value.i8, (int8_t)d8);
|
||||||
|
} break;
|
||||||
|
case TSDB_DATA_TYPE_SMALLINT: {
|
||||||
|
int32_t d16;
|
||||||
|
sscanf(rawVal, "%" PRId32, &d16);
|
||||||
|
EXPECT_EQ(cv->value.i16, (int16_t)d16);
|
||||||
|
} break;
|
||||||
|
case TSDB_DATA_TYPE_INT: {
|
||||||
|
sscanf(rawVal, "%" PRId32, &rawSVal.i32);
|
||||||
|
EXPECT_EQ(cv->value.i32, rawSVal.i32);
|
||||||
|
} break;
|
||||||
|
case TSDB_DATA_TYPE_BIGINT: {
|
||||||
|
sscanf(rawVal, "%" PRIi64, &rawSVal.i64);
|
||||||
|
EXPECT_EQ(cv->value.i64, rawSVal.i64);
|
||||||
|
} break;
|
||||||
|
case TSDB_DATA_TYPE_FLOAT: {
|
||||||
|
sscanf(rawVal, "%f", &rawSVal.f);
|
||||||
|
EXPECT_FLOAT_EQ(cv->value.f, rawSVal.f);
|
||||||
|
} break;
|
||||||
|
case TSDB_DATA_TYPE_DOUBLE: {
|
||||||
|
sscanf(rawVal, "%lf", &rawSVal.d);
|
||||||
|
EXPECT_DOUBLE_EQ(cv->value.d, rawSVal.d);
|
||||||
|
} break;
|
||||||
|
case TSDB_DATA_TYPE_VARCHAR: {
|
||||||
|
EXPECT_STRCASEEQ(rawVal, (const char *)cv->value.pData);
|
||||||
|
} break;
|
||||||
|
case TSDB_DATA_TYPE_TIMESTAMP: {
|
||||||
|
sscanf(rawVal, "%" PRIi64, &rawSVal.ts);
|
||||||
|
EXPECT_DOUBLE_EQ(cv->value.ts, rawSVal.ts);
|
||||||
|
} break;
|
||||||
|
case TSDB_DATA_TYPE_NCHAR: {
|
||||||
|
EXPECT_STRCASEEQ(rawVal, (const char *)cv->value.pData); // informal nchar comparsion
|
||||||
|
} break;
|
||||||
|
case TSDB_DATA_TYPE_UTINYINT: {
|
||||||
|
uint32_t u8;
|
||||||
|
sscanf(rawVal, "%" PRIu32, &u8);
|
||||||
|
EXPECT_EQ(cv->value.u8, (uint8_t)u8);
|
||||||
|
} break;
|
||||||
|
case TSDB_DATA_TYPE_USMALLINT: {
|
||||||
|
uint32_t u16;
|
||||||
|
sscanf(rawVal, "%" PRIu32, &u16);
|
||||||
|
EXPECT_EQ(cv->value.u16, (uint16_t)u16);
|
||||||
|
} break;
|
||||||
|
case TSDB_DATA_TYPE_UINT: {
|
||||||
|
sscanf(rawVal, "%" PRIu32, &rawSVal.u32);
|
||||||
|
EXPECT_EQ(cv->value.u32, rawSVal.u32);
|
||||||
|
} break;
|
||||||
|
case TSDB_DATA_TYPE_UBIGINT: {
|
||||||
|
sscanf(rawVal, "%" PRIu64, &rawSVal.u64);
|
||||||
|
EXPECT_EQ(cv->value.u64, rawSVal.u64);
|
||||||
|
} break;
|
||||||
|
case TSDB_DATA_TYPE_JSON:
|
||||||
|
printf("JSON ");
|
||||||
|
break;
|
||||||
|
case TSDB_DATA_TYPE_VARBINARY:
|
||||||
|
printf("VARBIN ");
|
||||||
|
break;
|
||||||
|
case TSDB_DATA_TYPE_DECIMAL:
|
||||||
|
printf("DECIMAL ");
|
||||||
|
break;
|
||||||
|
case TSDB_DATA_TYPE_BLOB:
|
||||||
|
printf("BLOB ");
|
||||||
|
break;
|
||||||
|
case TSDB_DATA_TYPE_MEDIUMBLOB:
|
||||||
|
printf("MedBLOB ");
|
||||||
|
break;
|
||||||
|
// case TSDB_DATA_TYPE_BINARY:
|
||||||
|
// printf("BINARY ");
|
||||||
|
// break;
|
||||||
|
case TSDB_DATA_TYPE_MAX:
|
||||||
|
printf("UNDEF ");
|
||||||
|
break;
|
||||||
|
default:
|
||||||
|
printf("UNDEF ");
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
static void checkTSRow(const char **data, STSRow2 *row, STSchema *pTSchema) {
|
||||||
|
for (int16_t i = 0; i < schemaNCols(pTSchema); ++i) {
|
||||||
|
SColVal cv = {0};
|
||||||
|
tTSRowGet(row, pTSchema, i, &cv);
|
||||||
|
checkSColVal(data[i], &cv, pTSchema->columns[i].type);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
TEST(testCase, AllNormTest) {
|
||||||
|
int16_t nCols = 1;
|
||||||
|
STSRow2 *row = nullptr;
|
||||||
|
SArray *pArray = taosArrayInit(nCols, sizeof(SColVal));
|
||||||
|
EXPECT_NE(pArray, nullptr);
|
||||||
|
|
||||||
|
STSchema *pTSchema = genSTSchema(nCols);
|
||||||
|
EXPECT_NE(pTSchema, nullptr);
|
||||||
|
|
||||||
|
// ts timestamp, c1 int, c2 bigint, c3 float, c4 double, c5 binary(10), c6 nchar(10), c7 tinyint, c8 smallint,
|
||||||
|
// c9 bool
|
||||||
|
char *data[10] = {"1653694220000", "10", "20", "10.1", "10.1", "binary10", "nchar10", "10", "10", "1"};
|
||||||
|
|
||||||
|
genTestData((const char **)&data, nCols, &pArray);
|
||||||
|
|
||||||
|
tTSRowNew(NULL, pArray, pTSchema, &row);
|
||||||
|
|
||||||
|
debugPrintTSRow(row, pTSchema, __func__, __LINE__);
|
||||||
|
checkTSRow((const char **)&data, row, pTSchema);
|
||||||
|
|
||||||
|
taosArrayDestroy(pArray);
|
||||||
|
taosMemoryFree(pTSchema);
|
||||||
|
}
|
||||||
|
|
||||||
|
#if 1
|
||||||
|
TEST(testCase, NoneTest) {
|
||||||
|
const static int nCols = 14;
|
||||||
|
const static int nRows = 20;
|
||||||
|
STSRow2 *row = nullptr;
|
||||||
|
SArray *pArray = taosArrayInit(nCols, sizeof(SColVal));
|
||||||
|
EXPECT_NE(pArray, nullptr);
|
||||||
|
|
||||||
|
STSchema *pTSchema = genSTSchema(nCols);
|
||||||
|
EXPECT_NE(pTSchema, nullptr);
|
||||||
|
|
||||||
|
// ts timestamp, c1 int, c2 bigint, c3 float, c4 double, c5 binary(10), c6 nchar(10), c7 tinyint, c8 smallint,
|
||||||
|
// c9 bool c10 tinyint unsigned, c11 smallint unsigned, c12 int unsigned, c13 bigint unsigned
|
||||||
|
const char *data[nRows][nCols] = {
|
||||||
|
{"1653694220000", "no", "20", "10.1", "10.1", "binary10", "no", "10", "10", "nu", "10", "20", "30", "40"},
|
||||||
|
{"1653694220001", "no", "no", "no", "no", "no", "no", "no", "no", "no", "no", "no", "no", "no"},
|
||||||
|
{"1653694220002", "no", "no", "no", "no", "no", "nu", "no", "no", "no", "no", "no", "no", "nu"},
|
||||||
|
{"1653694220003", "nu", "no", "no", "no", "no", "nu", "no", "no", "no", "no", "no", "no", "no"},
|
||||||
|
{"1653694220004", "no", "20", "no", "no", "no", "nchar10", "no", "no", "no", "no", "no", "no", "no"},
|
||||||
|
{"1653694220005", "nu", "nu", "nu", "nu", "nu", "nu", "nu", "nu", "nu", "nu", "nu", "nu", "nu"},
|
||||||
|
{"1653694220006", "no", "nu", "nu", "nu", "nu", "nu", "nu", "nu", "nu", "nu", "nu", "nu", "nu"},
|
||||||
|
{"1653694220007", "no", "nu", "nu", "nu", "nu", "nu", "nu", "nu", "nu", "nu", "nu", "nu", "no"},
|
||||||
|
{"1653694220008", "no", "nu", "nu", "nu", "binary10", "nu", "nu", "nu", "nu", "nu", "nu", "nu", "no"},
|
||||||
|
{"1653694220009", "no", "nu", "nu", "nu", "binary10", "nu", "nu", "10", "no", "nu", "nu", "nu", "100"},
|
||||||
|
{"1653694220010", "-1", "-1", "-1", "-1", "binary10", "nu", "-1", "0", "0", "0", "0", "0", "0"},
|
||||||
|
{"1653694220011", "-2147483648", "nu", "nu", "nu", "biy10", "nu", "nu", "32767", "no", "nu", "nu", "nu", "100"},
|
||||||
|
{"1653694220012", "2147483647", "nu", "nu", "nu", "ary10", "nu", "nu", "-32768", "no", "nu", "nu", "nu", "100"},
|
||||||
|
{"1653694220013", "no", "-9223372036854775818", "nu", "nu", "b1", "nu", "nu", "10", "no", "nu", "nu", "nu", "nu"},
|
||||||
|
{"1653694220014", "no", "nu", "nu", "nu", "b0", "nu", "nu", "10", "no", "nu", "nu", "nu", "9223372036854775808"},
|
||||||
|
{"1653694220015", "no", "nu", "nu", "nu", "binary30", "char4", "nu", "10", "no", "nu", "nu", "nu",
|
||||||
|
"18446744073709551615"},
|
||||||
|
{"1653694220016", "2147483647", "nu", "nu", "nu", "bin50", "nu", "nu", "10", "no", "nu", "nu", "nu", "100"},
|
||||||
|
{"1653694220017", "2147483646", "0", "0", "0", "binary10", "0", "0", "0", "0", "255", "0", "0", "0"},
|
||||||
|
{"1653694220018", "no", "-9223372036854775808", "nu", "nu", "binary10", "nu", "nu", "10", "no", "nu", "nu",
|
||||||
|
"4294967295", "100"},
|
||||||
|
{"1653694220019", "no", "9223372036854775807", "nu", "nu", "bin10", "nu", "nu", "10", "no", "254", "nu", "nu",
|
||||||
|
"no"}};
|
||||||
|
|
||||||
|
|
||||||
|
for (int r = 0; r < nRows; ++r) {
|
||||||
|
genTestData((const char **)&data[r], nCols, &pArray);
|
||||||
|
tTSRowNew(NULL, pArray, pTSchema, &row);
|
||||||
|
debugPrintTSRow(row, pTSchema, __func__, __LINE__); // debug print
|
||||||
|
checkTSRow((const char **)&data[r], row, pTSchema); // check
|
||||||
|
taosMemoryFreeClear(row);
|
||||||
|
taosArrayClear(pArray);
|
||||||
|
}
|
||||||
|
|
||||||
|
taosArrayDestroy(pArray);
|
||||||
|
taosMemoryFree(pTSchema);
|
||||||
|
}
|
||||||
|
#endif
|
||||||
|
|
@ -34,6 +34,7 @@ typedef struct SVnodeMgmt {
|
||||||
SQWorkerPool fetchPool;
|
SQWorkerPool fetchPool;
|
||||||
SWWorkerPool syncPool;
|
SWWorkerPool syncPool;
|
||||||
SWWorkerPool writePool;
|
SWWorkerPool writePool;
|
||||||
|
SWWorkerPool applyPool;
|
||||||
SWWorkerPool mergePool;
|
SWWorkerPool mergePool;
|
||||||
SSingleWorker mgmtWorker;
|
SSingleWorker mgmtWorker;
|
||||||
SSingleWorker monitorWorker;
|
SSingleWorker monitorWorker;
|
||||||
|
|
|
||||||
|
|
@ -352,7 +352,9 @@ SArray *vmGetMsgHandles() {
|
||||||
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_DEPLOY, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
|
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_DEPLOY, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
|
||||||
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_RUN, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
|
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_RUN, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
|
||||||
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_DISPATCH, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
|
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_DISPATCH, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
|
||||||
|
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_DISPATCH_RSP, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
|
||||||
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_RECOVER, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
|
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_RECOVER, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
|
||||||
|
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_RECOVER_RSP, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
|
||||||
|
|
||||||
if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_REPLICA, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
|
if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_REPLICA, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
|
||||||
if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_CONFIG, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
|
if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_CONFIG, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
|
||||||
|
|
|
||||||
|
|
@ -16,10 +16,8 @@
|
||||||
#define _DEFAULT_SOURCE
|
#define _DEFAULT_SOURCE
|
||||||
#include "vmInt.h"
|
#include "vmInt.h"
|
||||||
|
|
||||||
#include "sync.h"
|
|
||||||
#include "syncTools.h"
|
|
||||||
|
|
||||||
static inline void vmSendRsp(SRpcMsg *pMsg, int32_t code) {
|
static inline void vmSendRsp(SRpcMsg *pMsg, int32_t code) {
|
||||||
|
if (pMsg->info.handle == NULL) return;
|
||||||
SRpcMsg rsp = {
|
SRpcMsg rsp = {
|
||||||
.code = code,
|
.code = code,
|
||||||
.pCont = pMsg->info.rsp,
|
.pCont = pMsg->info.rsp,
|
||||||
|
|
@ -55,7 +53,7 @@ static void vmProcessMgmtQueue(SQueueInfo *pInfo, SRpcMsg *pMsg) {
|
||||||
if (IsReq(pMsg)) {
|
if (IsReq(pMsg)) {
|
||||||
if (code != 0) {
|
if (code != 0) {
|
||||||
if (terrno != 0) code = terrno;
|
if (terrno != 0) code = terrno;
|
||||||
dError("msg:%p failed to process since %s", pMsg, terrstr());
|
dError("msg:%p, failed to process since %s", pMsg, terrstr());
|
||||||
}
|
}
|
||||||
vmSendRsp(pMsg, code);
|
vmSendRsp(pMsg, code);
|
||||||
}
|
}
|
||||||
|
|
@ -107,11 +105,9 @@ static void vmProcessSyncQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numOf
|
||||||
|
|
||||||
int32_t code = vnodeProcessSyncReq(pVnode->pImpl, pMsg, NULL);
|
int32_t code = vnodeProcessSyncReq(pVnode->pImpl, pMsg, NULL);
|
||||||
if (code != 0) {
|
if (code != 0) {
|
||||||
|
if (terrno != 0) code = terrno;
|
||||||
dError("vgId:%d, msg:%p failed to sync since %s", pVnode->vgId, pMsg, terrstr());
|
dError("vgId:%d, msg:%p failed to sync since %s", pVnode->vgId, pMsg, terrstr());
|
||||||
if (pMsg->info.handle != NULL) {
|
vmSendRsp(pMsg, code);
|
||||||
if (terrno != 0) code = terrno;
|
|
||||||
vmSendRsp(pMsg, code);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
dTrace("vgId:%d, msg:%p is freed, code:0x%x", pVnode->vgId, pMsg, code);
|
dTrace("vgId:%d, msg:%p is freed, code:0x%x", pVnode->vgId, pMsg, code);
|
||||||
|
|
@ -130,8 +126,8 @@ static void vmProcessMergeQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numO
|
||||||
|
|
||||||
int32_t code = vnodeProcessFetchMsg(pVnode->pImpl, pMsg, pInfo);
|
int32_t code = vnodeProcessFetchMsg(pVnode->pImpl, pMsg, pInfo);
|
||||||
if (code != 0) {
|
if (code != 0) {
|
||||||
dError("vgId:%d, msg:%p failed to merge since %s", pVnode->vgId, pMsg, terrstr());
|
|
||||||
if (terrno != 0) code = terrno;
|
if (terrno != 0) code = terrno;
|
||||||
|
dError("vgId:%d, msg:%p failed to merge since %s", pVnode->vgId, pMsg, terrstr());
|
||||||
vmSendRsp(pMsg, code);
|
vmSendRsp(pMsg, code);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -150,7 +146,7 @@ static int32_t vmPutMsgToQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg, EQueueType qtyp
|
||||||
|
|
||||||
SVnodeObj *pVnode = vmAcquireVnode(pMgmt, pHead->vgId);
|
SVnodeObj *pVnode = vmAcquireVnode(pMgmt, pHead->vgId);
|
||||||
if (pVnode == NULL) {
|
if (pVnode == NULL) {
|
||||||
dError("vgId:%d, failed to put msg:%p into vnode queue since %s, type:%s", pHead->vgId, pMsg, terrstr(),
|
dError("vgId:%d, msg:%p failed to put into vnode queue since %s, type:%s", pHead->vgId, pMsg, terrstr(),
|
||||||
TMSG_INFO(pMsg->msgType));
|
TMSG_INFO(pMsg->msgType));
|
||||||
return terrno != 0 ? terrno : -1;
|
return terrno != 0 ? terrno : -1;
|
||||||
}
|
}
|
||||||
|
|
@ -260,7 +256,7 @@ int32_t vmGetQueueSize(SVnodeMgmt *pMgmt, int32_t vgId, EQueueType qtype) {
|
||||||
int32_t vmAllocQueue(SVnodeMgmt *pMgmt, SVnodeObj *pVnode) {
|
int32_t vmAllocQueue(SVnodeMgmt *pMgmt, SVnodeObj *pVnode) {
|
||||||
pVnode->pWriteQ = tWWorkerAllocQueue(&pMgmt->writePool, pVnode->pImpl, (FItems)vnodeProposeMsg);
|
pVnode->pWriteQ = tWWorkerAllocQueue(&pMgmt->writePool, pVnode->pImpl, (FItems)vnodeProposeMsg);
|
||||||
pVnode->pSyncQ = tWWorkerAllocQueue(&pMgmt->syncPool, pVnode, (FItems)vmProcessSyncQueue);
|
pVnode->pSyncQ = tWWorkerAllocQueue(&pMgmt->syncPool, pVnode, (FItems)vmProcessSyncQueue);
|
||||||
pVnode->pApplyQ = tWWorkerAllocQueue(&pMgmt->writePool, pVnode->pImpl, (FItems)vnodeApplyMsg);
|
pVnode->pApplyQ = tWWorkerAllocQueue(&pMgmt->applyPool, pVnode->pImpl, (FItems)vnodeApplyMsg);
|
||||||
pVnode->pQueryQ = tQWorkerAllocQueue(&pMgmt->queryPool, pVnode, (FItem)vmProcessQueryQueue);
|
pVnode->pQueryQ = tQWorkerAllocQueue(&pMgmt->queryPool, pVnode, (FItem)vmProcessQueryQueue);
|
||||||
pVnode->pFetchQ = tQWorkerAllocQueue(&pMgmt->fetchPool, pVnode, (FItem)vmProcessFetchQueue);
|
pVnode->pFetchQ = tQWorkerAllocQueue(&pMgmt->fetchPool, pVnode, (FItem)vmProcessFetchQueue);
|
||||||
pVnode->pMergeQ = tWWorkerAllocQueue(&pMgmt->mergePool, pVnode, (FItems)vmProcessMergeQueue);
|
pVnode->pMergeQ = tWWorkerAllocQueue(&pMgmt->mergePool, pVnode, (FItems)vmProcessMergeQueue);
|
||||||
|
|
@ -277,7 +273,7 @@ int32_t vmAllocQueue(SVnodeMgmt *pMgmt, SVnodeObj *pVnode) {
|
||||||
|
|
||||||
void vmFreeQueue(SVnodeMgmt *pMgmt, SVnodeObj *pVnode) {
|
void vmFreeQueue(SVnodeMgmt *pMgmt, SVnodeObj *pVnode) {
|
||||||
tWWorkerFreeQueue(&pMgmt->writePool, pVnode->pWriteQ);
|
tWWorkerFreeQueue(&pMgmt->writePool, pVnode->pWriteQ);
|
||||||
tWWorkerFreeQueue(&pMgmt->writePool, pVnode->pApplyQ);
|
tWWorkerFreeQueue(&pMgmt->applyPool, pVnode->pApplyQ);
|
||||||
tWWorkerFreeQueue(&pMgmt->syncPool, pVnode->pSyncQ);
|
tWWorkerFreeQueue(&pMgmt->syncPool, pVnode->pSyncQ);
|
||||||
tQWorkerFreeQueue(&pMgmt->queryPool, pVnode->pQueryQ);
|
tQWorkerFreeQueue(&pMgmt->queryPool, pVnode->pQueryQ);
|
||||||
tQWorkerFreeQueue(&pMgmt->fetchPool, pVnode->pFetchQ);
|
tQWorkerFreeQueue(&pMgmt->fetchPool, pVnode->pFetchQ);
|
||||||
|
|
@ -309,6 +305,11 @@ int32_t vmStartWorker(SVnodeMgmt *pMgmt) {
|
||||||
pWPool->max = tsNumOfVnodeWriteThreads;
|
pWPool->max = tsNumOfVnodeWriteThreads;
|
||||||
if (tWWorkerInit(pWPool) != 0) return -1;
|
if (tWWorkerInit(pWPool) != 0) return -1;
|
||||||
|
|
||||||
|
SWWorkerPool *pAPool = &pMgmt->applyPool;
|
||||||
|
pAPool->name = "vnode-apply";
|
||||||
|
pAPool->max = tsNumOfVnodeWriteThreads;
|
||||||
|
if (tWWorkerInit(pAPool) != 0) return -1;
|
||||||
|
|
||||||
SWWorkerPool *pSPool = &pMgmt->syncPool;
|
SWWorkerPool *pSPool = &pMgmt->syncPool;
|
||||||
pSPool->name = "vnode-sync";
|
pSPool->name = "vnode-sync";
|
||||||
pSPool->max = tsNumOfVnodeSyncThreads;
|
pSPool->max = tsNumOfVnodeSyncThreads;
|
||||||
|
|
@ -345,6 +346,7 @@ void vmStopWorker(SVnodeMgmt *pMgmt) {
|
||||||
tSingleWorkerCleanup(&pMgmt->monitorWorker);
|
tSingleWorkerCleanup(&pMgmt->monitorWorker);
|
||||||
tSingleWorkerCleanup(&pMgmt->mgmtWorker);
|
tSingleWorkerCleanup(&pMgmt->mgmtWorker);
|
||||||
tWWorkerCleanup(&pMgmt->writePool);
|
tWWorkerCleanup(&pMgmt->writePool);
|
||||||
|
tWWorkerCleanup(&pMgmt->applyPool);
|
||||||
tWWorkerCleanup(&pMgmt->syncPool);
|
tWWorkerCleanup(&pMgmt->syncPool);
|
||||||
tQWorkerCleanup(&pMgmt->queryPool);
|
tQWorkerCleanup(&pMgmt->queryPool);
|
||||||
tQWorkerCleanup(&pMgmt->fetchPool);
|
tQWorkerCleanup(&pMgmt->fetchPool);
|
||||||
|
|
|
||||||
|
|
@ -448,7 +448,7 @@ int32_t tqProcessTaskRecoverReq(STQ* pTq, SRpcMsg* pMsg) {
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t tqProcessTaskDispatchRsp(STQ* pTq, SRpcMsg* pMsg) {
|
int32_t tqProcessTaskDispatchRsp(STQ* pTq, SRpcMsg* pMsg) {
|
||||||
SStreamDispatchRsp* pRsp = pMsg->pCont;
|
SStreamDispatchRsp* pRsp = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
|
||||||
int32_t taskId = pRsp->taskId;
|
int32_t taskId = pRsp->taskId;
|
||||||
SStreamTask* pTask = taosHashGet(pTq->pStreamTasks, &taskId, sizeof(int32_t));
|
SStreamTask* pTask = taosHashGet(pTq->pStreamTasks, &taskId, sizeof(int32_t));
|
||||||
streamProcessDispatchRsp(pTask, &pTq->pVnode->msgCb, pRsp);
|
streamProcessDispatchRsp(pTask, &pTq->pVnode->msgCb, pRsp);
|
||||||
|
|
|
||||||
|
|
@ -688,6 +688,10 @@ typedef struct SSortedMergeOperatorInfo {
|
||||||
int32_t numOfResPerPage;
|
int32_t numOfResPerPage;
|
||||||
char** groupVal;
|
char** groupVal;
|
||||||
SArray *groupInfo;
|
SArray *groupInfo;
|
||||||
|
|
||||||
|
bool hasGroupId;
|
||||||
|
uint64_t groupId;
|
||||||
|
STupleHandle* prefetchedTuple;
|
||||||
} SSortedMergeOperatorInfo;
|
} SSortedMergeOperatorInfo;
|
||||||
|
|
||||||
typedef struct SSortOperatorInfo {
|
typedef struct SSortOperatorInfo {
|
||||||
|
|
@ -700,6 +704,10 @@ typedef struct SSortOperatorInfo {
|
||||||
|
|
||||||
int64_t startTs; // sort start time
|
int64_t startTs; // sort start time
|
||||||
uint64_t sortElapsed; // sort elapsed time, time to flush to disk not included.
|
uint64_t sortElapsed; // sort elapsed time, time to flush to disk not included.
|
||||||
|
|
||||||
|
STupleHandle *prefetchedTuple;
|
||||||
|
bool hasGroupId;
|
||||||
|
uint64_t groupId;
|
||||||
} SSortOperatorInfo;
|
} SSortOperatorInfo;
|
||||||
|
|
||||||
typedef struct STagFilterOperatorInfo {
|
typedef struct STagFilterOperatorInfo {
|
||||||
|
|
@ -759,7 +767,7 @@ void setTbNameColData(void* pMeta, const SSDataBlock* pBlock, SColumnInfoData
|
||||||
SInterval extractIntervalInfo(const STableScanPhysiNode* pTableScanNode);
|
SInterval extractIntervalInfo(const STableScanPhysiNode* pTableScanNode);
|
||||||
SColumn extractColumnFromColumnNode(SColumnNode* pColNode);
|
SColumn extractColumnFromColumnNode(SColumnNode* pColNode);
|
||||||
|
|
||||||
SSDataBlock* getSortedBlockData(SSortHandle* pHandle, SSDataBlock* pDataBlock, int32_t capacity, SArray* pColMatchInfo);
|
SSDataBlock* getSortedBlockData(SSortHandle* pHandle, SSDataBlock* pDataBlock, int32_t capacity, SArray* pColMatchInfo, SSortOperatorInfo* pInfo);
|
||||||
SSDataBlock* loadNextDataBlock(void* param);
|
SSDataBlock* loadNextDataBlock(void* param);
|
||||||
|
|
||||||
void setResultRowInitCtx(SResultRow* pResult, SqlFunctionCtx* pCtx, int32_t numOfOutput, int32_t* rowCellInfoOffset);
|
void setResultRowInitCtx(SResultRow* pResult, SqlFunctionCtx* pCtx, int32_t numOfOutput, int32_t* rowCellInfoOffset);
|
||||||
|
|
|
||||||
|
|
@ -3103,6 +3103,68 @@ static SSDataBlock* doMerge(SOperatorInfo* pOperator) {
|
||||||
return (pInfo->binfo.pRes->info.rows > 0) ? pInfo->binfo.pRes : NULL;
|
return (pInfo->binfo.pRes->info.rows > 0) ? pInfo->binfo.pRes : NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
SSDataBlock* getSortedMergeBlockData(SSortHandle* pHandle, SSDataBlock* pDataBlock, int32_t capacity, SArray* pColMatchInfo,
|
||||||
|
SSortedMergeOperatorInfo *pInfo) {
|
||||||
|
blockDataCleanup(pDataBlock);
|
||||||
|
|
||||||
|
SSDataBlock* p = tsortGetSortedDataBlock(pHandle);
|
||||||
|
if (p == NULL) {
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
blockDataEnsureCapacity(p, capacity);
|
||||||
|
|
||||||
|
while (1) {
|
||||||
|
STupleHandle* pTupleHandle = NULL;
|
||||||
|
if (pInfo->prefetchedTuple == NULL) {
|
||||||
|
pTupleHandle = tsortNextTuple(pHandle);
|
||||||
|
} else {
|
||||||
|
pTupleHandle = pInfo->prefetchedTuple;
|
||||||
|
pInfo->groupId = tsortGetGroupId(pTupleHandle);
|
||||||
|
pInfo->prefetchedTuple = NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (pTupleHandle == NULL) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
uint64_t tupleGroupId = tsortGetGroupId(pTupleHandle);
|
||||||
|
if (!pInfo->hasGroupId) {
|
||||||
|
pInfo->groupId = tupleGroupId;
|
||||||
|
pInfo->hasGroupId = true;
|
||||||
|
appendOneRowToDataBlock(p, pTupleHandle);
|
||||||
|
} else if (pInfo->groupId == tupleGroupId) {
|
||||||
|
appendOneRowToDataBlock(p, pTupleHandle);
|
||||||
|
} else {
|
||||||
|
pInfo->prefetchedTuple = pTupleHandle;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (p->info.rows >= capacity) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (p->info.rows > 0) {
|
||||||
|
int32_t numOfCols = taosArrayGetSize(pColMatchInfo);
|
||||||
|
for (int32_t i = 0; i < numOfCols; ++i) {
|
||||||
|
SColMatchInfo* pmInfo = taosArrayGet(pColMatchInfo, i);
|
||||||
|
ASSERT(pmInfo->matchType == COL_MATCH_FROM_SLOT_ID);
|
||||||
|
|
||||||
|
SColumnInfoData* pSrc = taosArrayGet(p->pDataBlock, pmInfo->srcSlotId);
|
||||||
|
SColumnInfoData* pDst = taosArrayGet(pDataBlock->pDataBlock, pmInfo->targetSlotId);
|
||||||
|
colDataAssign(pDst, pSrc, p->info.rows);
|
||||||
|
}
|
||||||
|
|
||||||
|
pDataBlock->info.rows = p->info.rows;
|
||||||
|
pDataBlock->info.capacity = p->info.rows;
|
||||||
|
pDataBlock->info.groupId = pInfo->groupId;
|
||||||
|
}
|
||||||
|
|
||||||
|
blockDataDestroy(p);
|
||||||
|
return (pDataBlock->info.rows > 0) ? pDataBlock : NULL;
|
||||||
|
}
|
||||||
|
|
||||||
static SSDataBlock* doSortedMerge(SOperatorInfo* pOperator) {
|
static SSDataBlock* doSortedMerge(SOperatorInfo* pOperator) {
|
||||||
if (pOperator->status == OP_EXEC_DONE) {
|
if (pOperator->status == OP_EXEC_DONE) {
|
||||||
return NULL;
|
return NULL;
|
||||||
|
|
@ -3111,7 +3173,7 @@ static SSDataBlock* doSortedMerge(SOperatorInfo* pOperator) {
|
||||||
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
|
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
|
||||||
SSortedMergeOperatorInfo* pInfo = pOperator->info;
|
SSortedMergeOperatorInfo* pInfo = pOperator->info;
|
||||||
if (pOperator->status == OP_RES_TO_RETURN) {
|
if (pOperator->status == OP_RES_TO_RETURN) {
|
||||||
return getSortedBlockData(pInfo->pSortHandle, pInfo->binfo.pRes, pOperator->resultInfo.capacity, NULL);
|
return getSortedMergeBlockData(pInfo->pSortHandle, pInfo->binfo.pRes, pOperator->resultInfo.capacity, NULL, pInfo);
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t numOfBufPage = pInfo->sortBufSize / pInfo->bufPageSize;
|
int32_t numOfBufPage = pInfo->sortBufSize / pInfo->bufPageSize;
|
||||||
|
|
|
||||||
|
|
@ -737,8 +737,8 @@ static bool isStateWindow(SStreamBlockScanInfo* pInfo) {
|
||||||
static bool prepareDataScan(SStreamBlockScanInfo* pInfo) {
|
static bool prepareDataScan(SStreamBlockScanInfo* pInfo) {
|
||||||
SSDataBlock* pSDB = pInfo->pUpdateRes;
|
SSDataBlock* pSDB = pInfo->pUpdateRes;
|
||||||
STimeWindow win = {
|
STimeWindow win = {
|
||||||
.skey = INT64_MIN,
|
.skey = INT64_MIN,
|
||||||
.ekey = INT64_MAX,
|
.ekey = INT64_MAX,
|
||||||
};
|
};
|
||||||
bool needRead = false;
|
bool needRead = false;
|
||||||
if (!isStateWindow(pInfo) && pInfo->updateResIndex < pSDB->info.rows) {
|
if (!isStateWindow(pInfo) && pInfo->updateResIndex < pSDB->info.rows) {
|
||||||
|
|
@ -846,7 +846,7 @@ static SSDataBlock* doStreamBlockScan(SOperatorInfo* pOperator) {
|
||||||
size_t total = taosArrayGetSize(pInfo->pBlockLists);
|
size_t total = taosArrayGetSize(pInfo->pBlockLists);
|
||||||
if (pInfo->blockType == STREAM_DATA_TYPE_SSDATA_BLOCK) {
|
if (pInfo->blockType == STREAM_DATA_TYPE_SSDATA_BLOCK) {
|
||||||
if (pInfo->validBlockIndex >= total) {
|
if (pInfo->validBlockIndex >= total) {
|
||||||
doClearBufferedBlocks(pInfo);
|
/*doClearBufferedBlocks(pInfo);*/
|
||||||
pOperator->status = OP_EXEC_DONE;
|
pOperator->status = OP_EXEC_DONE;
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -42,6 +42,8 @@ SOperatorInfo* createSortOperatorInfo(SOperatorInfo* downstream, SSDataBlock* pR
|
||||||
|
|
||||||
pInfo->pSortInfo = pSortInfo;
|
pInfo->pSortInfo = pSortInfo;
|
||||||
pInfo->pColMatchInfo = pColMatchColInfo;
|
pInfo->pColMatchInfo = pColMatchColInfo;
|
||||||
|
pInfo->hasGroupId = false;
|
||||||
|
pInfo->prefetchedTuple = NULL;
|
||||||
pOperator->name = "SortOperator";
|
pOperator->name = "SortOperator";
|
||||||
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_SORT;
|
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_SORT;
|
||||||
pOperator->blocking = true;
|
pOperator->blocking = true;
|
||||||
|
|
@ -81,8 +83,8 @@ void appendOneRowToDataBlock(SSDataBlock* pBlock, STupleHandle* pTupleHandle) {
|
||||||
pBlock->info.rows += 1;
|
pBlock->info.rows += 1;
|
||||||
}
|
}
|
||||||
|
|
||||||
SSDataBlock* getSortedBlockData(SSortHandle* pHandle, SSDataBlock* pDataBlock, int32_t capacity,
|
SSDataBlock* getSortedBlockData(SSortHandle* pHandle, SSDataBlock* pDataBlock, int32_t capacity, SArray* pColMatchInfo,
|
||||||
SArray* pColMatchInfo) {
|
SSortOperatorInfo* pInfo) {
|
||||||
blockDataCleanup(pDataBlock);
|
blockDataCleanup(pDataBlock);
|
||||||
|
|
||||||
SSDataBlock* p = tsortGetSortedDataBlock(pHandle);
|
SSDataBlock* p = tsortGetSortedDataBlock(pHandle);
|
||||||
|
|
@ -93,14 +95,33 @@ SSDataBlock* getSortedBlockData(SSortHandle* pHandle, SSDataBlock* pDataBlock, i
|
||||||
blockDataEnsureCapacity(p, capacity);
|
blockDataEnsureCapacity(p, capacity);
|
||||||
|
|
||||||
while (1) {
|
while (1) {
|
||||||
STupleHandle* pTupleHandle = tsortNextTuple(pHandle);
|
STupleHandle* pTupleHandle = NULL;
|
||||||
|
if (pInfo->prefetchedTuple == NULL) {
|
||||||
|
pTupleHandle = tsortNextTuple(pHandle);
|
||||||
|
} else {
|
||||||
|
pTupleHandle = pInfo->prefetchedTuple;
|
||||||
|
pInfo->groupId = tsortGetGroupId(pTupleHandle);
|
||||||
|
pInfo->prefetchedTuple = NULL;
|
||||||
|
}
|
||||||
|
|
||||||
if (pTupleHandle == NULL) {
|
if (pTupleHandle == NULL) {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
appendOneRowToDataBlock(p, pTupleHandle);
|
uint64_t tupleGroupId = tsortGetGroupId(pTupleHandle);
|
||||||
|
if (!pInfo->hasGroupId) {
|
||||||
|
pInfo->groupId = tupleGroupId;
|
||||||
|
pInfo->hasGroupId = true;
|
||||||
|
appendOneRowToDataBlock(p, pTupleHandle);
|
||||||
|
} else if (pInfo->groupId == tupleGroupId) {
|
||||||
|
appendOneRowToDataBlock(p, pTupleHandle);
|
||||||
|
} else {
|
||||||
|
pInfo->prefetchedTuple = pTupleHandle;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
if (p->info.rows >= capacity) {
|
if (p->info.rows >= capacity) {
|
||||||
return pDataBlock;
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -117,6 +138,7 @@ SSDataBlock* getSortedBlockData(SSortHandle* pHandle, SSDataBlock* pDataBlock, i
|
||||||
|
|
||||||
pDataBlock->info.rows = p->info.rows;
|
pDataBlock->info.rows = p->info.rows;
|
||||||
pDataBlock->info.capacity = p->info.rows;
|
pDataBlock->info.capacity = p->info.rows;
|
||||||
|
pDataBlock->info.groupId = pInfo->groupId;
|
||||||
}
|
}
|
||||||
|
|
||||||
blockDataDestroy(p);
|
blockDataDestroy(p);
|
||||||
|
|
@ -188,8 +210,8 @@ SSDataBlock* doSort(SOperatorInfo* pOperator) {
|
||||||
longjmp(pTaskInfo->env, code);
|
longjmp(pTaskInfo->env, code);
|
||||||
}
|
}
|
||||||
|
|
||||||
SSDataBlock* pBlock =
|
SSDataBlock* pBlock = getSortedBlockData(pInfo->pSortHandle, pInfo->binfo.pRes, pOperator->resultInfo.capacity,
|
||||||
getSortedBlockData(pInfo->pSortHandle, pInfo->binfo.pRes, pOperator->resultInfo.capacity, pInfo->pColMatchInfo);
|
pInfo->pColMatchInfo, pInfo);
|
||||||
|
|
||||||
if (pBlock != NULL) {
|
if (pBlock != NULL) {
|
||||||
pOperator->resultInfo.totalRows += pBlock->info.rows;
|
pOperator->resultInfo.totalRows += pBlock->info.rows;
|
||||||
|
|
@ -230,11 +252,11 @@ typedef struct SMultiwaySortMergeOperatorInfo {
|
||||||
SArray* pColMatchInfo; // for index map from table scan output
|
SArray* pColMatchInfo; // for index map from table scan output
|
||||||
|
|
||||||
SSDataBlock* pInputBlock;
|
SSDataBlock* pInputBlock;
|
||||||
int64_t startTs; // sort start time
|
int64_t startTs; // sort start time
|
||||||
|
|
||||||
bool hasGroupId;
|
bool hasGroupId;
|
||||||
uint64_t groupId;
|
uint64_t groupId;
|
||||||
STupleHandle *prefetchedTuple;
|
STupleHandle* prefetchedTuple;
|
||||||
} SMultiwaySortMergeOperatorInfo;
|
} SMultiwaySortMergeOperatorInfo;
|
||||||
|
|
||||||
int32_t doOpenMultiwaySortMergeOperator(SOperatorInfo* pOperator) {
|
int32_t doOpenMultiwaySortMergeOperator(SOperatorInfo* pOperator) {
|
||||||
|
|
@ -274,7 +296,7 @@ int32_t doOpenMultiwaySortMergeOperator(SOperatorInfo* pOperator) {
|
||||||
}
|
}
|
||||||
|
|
||||||
SSDataBlock* getMultiwaySortedBlockData(SSortHandle* pHandle, SSDataBlock* pDataBlock, int32_t capacity,
|
SSDataBlock* getMultiwaySortedBlockData(SSortHandle* pHandle, SSDataBlock* pDataBlock, int32_t capacity,
|
||||||
SArray* pColMatchInfo, SMultiwaySortMergeOperatorInfo* pInfo) {
|
SArray* pColMatchInfo, SMultiwaySortMergeOperatorInfo* pInfo) {
|
||||||
blockDataCleanup(pDataBlock);
|
blockDataCleanup(pDataBlock);
|
||||||
|
|
||||||
SSDataBlock* p = tsortGetSortedDataBlock(pHandle);
|
SSDataBlock* p = tsortGetSortedDataBlock(pHandle);
|
||||||
|
|
@ -285,7 +307,6 @@ SSDataBlock* getMultiwaySortedBlockData(SSortHandle* pHandle, SSDataBlock* pData
|
||||||
blockDataEnsureCapacity(p, capacity);
|
blockDataEnsureCapacity(p, capacity);
|
||||||
|
|
||||||
while (1) {
|
while (1) {
|
||||||
|
|
||||||
STupleHandle* pTupleHandle = NULL;
|
STupleHandle* pTupleHandle = NULL;
|
||||||
if (pInfo->prefetchedTuple == NULL) {
|
if (pInfo->prefetchedTuple == NULL) {
|
||||||
pTupleHandle = tsortNextTuple(pHandle);
|
pTupleHandle = tsortNextTuple(pHandle);
|
||||||
|
|
@ -314,7 +335,6 @@ SSDataBlock* getMultiwaySortedBlockData(SSortHandle* pHandle, SSDataBlock* pData
|
||||||
if (p->info.rows >= capacity) {
|
if (p->info.rows >= capacity) {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if (p->info.rows > 0) {
|
if (p->info.rows > 0) {
|
||||||
|
|
@ -337,7 +357,6 @@ SSDataBlock* getMultiwaySortedBlockData(SSortHandle* pHandle, SSDataBlock* pData
|
||||||
return (pDataBlock->info.rows > 0) ? pDataBlock : NULL;
|
return (pDataBlock->info.rows > 0) ? pDataBlock : NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
SSDataBlock* doMultiwaySortMerge(SOperatorInfo* pOperator) {
|
SSDataBlock* doMultiwaySortMerge(SOperatorInfo* pOperator) {
|
||||||
if (pOperator->status == OP_EXEC_DONE) {
|
if (pOperator->status == OP_EXEC_DONE) {
|
||||||
return NULL;
|
return NULL;
|
||||||
|
|
@ -351,12 +370,8 @@ SSDataBlock* doMultiwaySortMerge(SOperatorInfo* pOperator) {
|
||||||
longjmp(pTaskInfo->env, code);
|
longjmp(pTaskInfo->env, code);
|
||||||
}
|
}
|
||||||
|
|
||||||
SSDataBlock* pBlock =
|
SSDataBlock* pBlock = getMultiwaySortedBlockData(pInfo->pSortHandle, pInfo->binfo.pRes,
|
||||||
getMultiwaySortedBlockData(pInfo->pSortHandle,
|
pOperator->resultInfo.capacity, pInfo->pColMatchInfo, pInfo);
|
||||||
pInfo->binfo.pRes,
|
|
||||||
pOperator->resultInfo.capacity,
|
|
||||||
pInfo->pColMatchInfo,
|
|
||||||
pInfo);
|
|
||||||
|
|
||||||
if (pBlock != NULL) {
|
if (pBlock != NULL) {
|
||||||
pOperator->resultInfo.totalRows += pBlock->info.rows;
|
pOperator->resultInfo.totalRows += pBlock->info.rows;
|
||||||
|
|
@ -367,7 +382,7 @@ SSDataBlock* doMultiwaySortMerge(SOperatorInfo* pOperator) {
|
||||||
}
|
}
|
||||||
|
|
||||||
void destroyMultiwaySortMergeOperatorInfo(void* param, int32_t numOfOutput) {
|
void destroyMultiwaySortMergeOperatorInfo(void* param, int32_t numOfOutput) {
|
||||||
SMultiwaySortMergeOperatorInfo * pInfo = (SMultiwaySortMergeOperatorInfo*)param;
|
SMultiwaySortMergeOperatorInfo* pInfo = (SMultiwaySortMergeOperatorInfo*)param;
|
||||||
pInfo->binfo.pRes = blockDataDestroy(pInfo->binfo.pRes);
|
pInfo->binfo.pRes = blockDataDestroy(pInfo->binfo.pRes);
|
||||||
pInfo->pInputBlock = blockDataDestroy(pInfo->pInputBlock);
|
pInfo->pInputBlock = blockDataDestroy(pInfo->pInputBlock);
|
||||||
|
|
||||||
|
|
@ -387,9 +402,9 @@ int32_t getMultiwaySortMergeExplainExecInfo(SOperatorInfo* pOptr, void** pOptrEx
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
SOperatorInfo* createMultiwaySortMergeOperatorInfo(SOperatorInfo** downStreams, int32_t numStreams, SSDataBlock* pInputBlock,
|
SOperatorInfo* createMultiwaySortMergeOperatorInfo(SOperatorInfo** downStreams, int32_t numStreams,
|
||||||
SSDataBlock* pResBlock, SArray* pSortInfo, SArray* pColMatchColInfo,
|
SSDataBlock* pInputBlock, SSDataBlock* pResBlock, SArray* pSortInfo,
|
||||||
SExecTaskInfo* pTaskInfo) {
|
SArray* pColMatchColInfo, SExecTaskInfo* pTaskInfo) {
|
||||||
SMultiwaySortMergeOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SMultiwaySortMergeOperatorInfo));
|
SMultiwaySortMergeOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SMultiwaySortMergeOperatorInfo));
|
||||||
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
|
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
|
||||||
int32_t rowSize = pResBlock->info.rowSize;
|
int32_t rowSize = pResBlock->info.rowSize;
|
||||||
|
|
@ -413,7 +428,8 @@ SOperatorInfo* createMultiwaySortMergeOperatorInfo(SOperatorInfo** downStreams,
|
||||||
|
|
||||||
pInfo->bufPageSize = rowSize < 1024 ? 1024 : rowSize * 2;
|
pInfo->bufPageSize = rowSize < 1024 ? 1024 : rowSize * 2;
|
||||||
pInfo->sortBufSize = pInfo->bufPageSize * 16;
|
pInfo->sortBufSize = pInfo->bufPageSize * 16;
|
||||||
|
pInfo->hasGroupId = false;
|
||||||
|
pInfo->prefetchedTuple = NULL;
|
||||||
pOperator->pTaskInfo = pTaskInfo;
|
pOperator->pTaskInfo = pTaskInfo;
|
||||||
pOperator->fpSet =
|
pOperator->fpSet =
|
||||||
createOperatorFpSet(doOpenMultiwaySortMergeOperator, doMultiwaySortMerge, NULL, NULL,
|
createOperatorFpSet(doOpenMultiwaySortMergeOperator, doMultiwaySortMerge, NULL, NULL,
|
||||||
|
|
|
||||||
|
|
@ -528,16 +528,24 @@ static int32_t createInitialSources(SSortHandle* pHandle) {
|
||||||
if (pHandle->type == SORT_SINGLESOURCE_SORT) {
|
if (pHandle->type == SORT_SINGLESOURCE_SORT) {
|
||||||
SSortSource* source = taosArrayGetP(pHandle->pOrderedSource, 0);
|
SSortSource* source = taosArrayGetP(pHandle->pOrderedSource, 0);
|
||||||
taosArrayClear(pHandle->pOrderedSource);
|
taosArrayClear(pHandle->pOrderedSource);
|
||||||
|
|
||||||
|
bool hasGroupId = false;
|
||||||
|
SSDataBlock* prefetchedDataBlock = NULL;
|
||||||
|
|
||||||
while (1) {
|
while (1) {
|
||||||
SSDataBlock* pBlock = pHandle->fetchfp(source->param);
|
SSDataBlock* pBlock = NULL;
|
||||||
|
if (prefetchedDataBlock == NULL) {
|
||||||
|
pBlock = pHandle->fetchfp(source->param);
|
||||||
|
} else {
|
||||||
|
pBlock = prefetchedDataBlock;
|
||||||
|
prefetchedDataBlock = NULL;
|
||||||
|
}
|
||||||
|
|
||||||
if (pBlock == NULL) {
|
if (pBlock == NULL) {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pHandle->pDataBlock == NULL) {
|
if (!hasGroupId) {
|
||||||
pHandle->pDataBlock = createOneDataBlock(pBlock, false);
|
|
||||||
|
|
||||||
// calculate the buffer pages according to the total available buffers.
|
// calculate the buffer pages according to the total available buffers.
|
||||||
int32_t rowSize = blockDataGetRowSize(pBlock);
|
int32_t rowSize = blockDataGetRowSize(pBlock);
|
||||||
if (rowSize * 4 > 4096) {
|
if (rowSize * 4 > 4096) {
|
||||||
|
|
@ -549,29 +557,36 @@ static int32_t createInitialSources(SSortHandle* pHandle) {
|
||||||
// todo!!
|
// todo!!
|
||||||
pHandle->numOfPages = 1024;
|
pHandle->numOfPages = 1024;
|
||||||
sortBufSize = pHandle->numOfPages * pHandle->pageSize;
|
sortBufSize = pHandle->numOfPages * pHandle->pageSize;
|
||||||
|
|
||||||
|
hasGroupId = true;
|
||||||
|
pHandle->pDataBlock = createOneDataBlock(pBlock, false);
|
||||||
}
|
}
|
||||||
|
|
||||||
// perform the scalar function calculation before apply the sort
|
if (pHandle->pDataBlock->info.groupId == pBlock->info.groupId) {
|
||||||
if (pHandle->beforeFp != NULL) {
|
// perform the scalar function calculation before apply the sort
|
||||||
pHandle->beforeFp(pBlock, pHandle->param);
|
if (pHandle->beforeFp != NULL) {
|
||||||
}
|
pHandle->beforeFp(pBlock, pHandle->param);
|
||||||
|
}
|
||||||
|
// todo relocate the columns
|
||||||
|
int32_t code = blockDataMerge(pHandle->pDataBlock, pBlock);
|
||||||
|
if (code != 0) {
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
// todo relocate the columns
|
size_t size = blockDataGetSize(pHandle->pDataBlock);
|
||||||
int32_t code = blockDataMerge(pHandle->pDataBlock, pBlock);
|
if (size > sortBufSize) {
|
||||||
if (code != 0) {
|
// Perform the in-memory sort and then flush data in the buffer into disk.
|
||||||
return code;
|
int64_t p = taosGetTimestampUs();
|
||||||
}
|
blockDataSort(pHandle->pDataBlock, pHandle->pSortInfo);
|
||||||
|
|
||||||
size_t size = blockDataGetSize(pHandle->pDataBlock);
|
int64_t el = taosGetTimestampUs() - p;
|
||||||
if (size > sortBufSize) {
|
pHandle->sortElapsed += el;
|
||||||
// Perform the in-memory sort and then flush data in the buffer into disk.
|
|
||||||
int64_t p = taosGetTimestampUs();
|
|
||||||
blockDataSort(pHandle->pDataBlock, pHandle->pSortInfo);
|
|
||||||
|
|
||||||
int64_t el = taosGetTimestampUs() - p;
|
doAddToBuf(pHandle->pDataBlock, pHandle);
|
||||||
pHandle->sortElapsed += el;
|
}
|
||||||
|
} else {
|
||||||
doAddToBuf(pHandle->pDataBlock, pHandle);
|
prefetchedDataBlock = pBlock;
|
||||||
|
pHandle->pDataBlock = createOneDataBlock(pBlock, false);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -78,7 +78,6 @@ bool percentileFunctionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResultI
|
||||||
int32_t percentileFunction(SqlFunctionCtx *pCtx);
|
int32_t percentileFunction(SqlFunctionCtx *pCtx);
|
||||||
int32_t percentileFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock);
|
int32_t percentileFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock);
|
||||||
|
|
||||||
int32_t getApercentileMaxSize();
|
|
||||||
bool getApercentileFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv);
|
bool getApercentileFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv);
|
||||||
bool apercentileFunctionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResultInfo);
|
bool apercentileFunctionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResultInfo);
|
||||||
int32_t apercentileFunction(SqlFunctionCtx *pCtx);
|
int32_t apercentileFunction(SqlFunctionCtx *pCtx);
|
||||||
|
|
@ -86,6 +85,7 @@ int32_t apercentileFunctionMerge(SqlFunctionCtx* pCtx);
|
||||||
int32_t apercentileFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock);
|
int32_t apercentileFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock);
|
||||||
int32_t apercentilePartialFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock);
|
int32_t apercentilePartialFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock);
|
||||||
int32_t apercentileCombine(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx);
|
int32_t apercentileCombine(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx);
|
||||||
|
int32_t getApercentileMaxSize();
|
||||||
|
|
||||||
bool getDiffFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv);
|
bool getDiffFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv);
|
||||||
bool diffFunctionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResInfo);
|
bool diffFunctionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResInfo);
|
||||||
|
|
@ -103,13 +103,13 @@ int32_t topFunction(SqlFunctionCtx *pCtx);
|
||||||
int32_t bottomFunction(SqlFunctionCtx *pCtx);
|
int32_t bottomFunction(SqlFunctionCtx *pCtx);
|
||||||
int32_t topBotFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock);
|
int32_t topBotFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock);
|
||||||
|
|
||||||
int32_t getSpreadInfoSize();
|
|
||||||
bool getSpreadFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv);
|
bool getSpreadFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv);
|
||||||
bool spreadFunctionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResultInfo);
|
bool spreadFunctionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResultInfo);
|
||||||
int32_t spreadFunction(SqlFunctionCtx* pCtx);
|
int32_t spreadFunction(SqlFunctionCtx* pCtx);
|
||||||
int32_t spreadFunctionMerge(SqlFunctionCtx* pCtx);
|
int32_t spreadFunctionMerge(SqlFunctionCtx* pCtx);
|
||||||
int32_t spreadFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock);
|
int32_t spreadFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock);
|
||||||
int32_t spreadPartialFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock);
|
int32_t spreadPartialFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock);
|
||||||
|
int32_t getSpreadInfoSize();
|
||||||
|
|
||||||
bool getElapsedFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv);
|
bool getElapsedFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv);
|
||||||
bool elapsedFunctionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResultInfo);
|
bool elapsedFunctionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResultInfo);
|
||||||
|
|
@ -119,7 +119,10 @@ int32_t elapsedFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock);
|
||||||
bool getHistogramFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv);
|
bool getHistogramFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv);
|
||||||
bool histogramFunctionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResultInfo);
|
bool histogramFunctionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResultInfo);
|
||||||
int32_t histogramFunction(SqlFunctionCtx* pCtx);
|
int32_t histogramFunction(SqlFunctionCtx* pCtx);
|
||||||
|
int32_t histogramFunctionMerge(SqlFunctionCtx* pCtx);
|
||||||
int32_t histogramFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock);
|
int32_t histogramFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock);
|
||||||
|
int32_t histogramPartialFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock);
|
||||||
|
int32_t getHistogramInfoSize();
|
||||||
|
|
||||||
bool getHLLFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv);
|
bool getHLLFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv);
|
||||||
int32_t hllFunction(SqlFunctionCtx* pCtx);
|
int32_t hllFunction(SqlFunctionCtx* pCtx);
|
||||||
|
|
|
||||||
|
|
@ -503,6 +503,58 @@ static int32_t translateHistogram(SFunctionNode* pFunc, char* pErrBuf, int32_t l
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static int32_t translateHistogramImpl(SFunctionNode* pFunc, char* pErrBuf, int32_t len, bool isPartial) {
|
||||||
|
int32_t numOfParams = LIST_LENGTH(pFunc->pParameterList);
|
||||||
|
if (isPartial) {
|
||||||
|
if (4 != numOfParams) {
|
||||||
|
return invaildFuncParaNumErrMsg(pErrBuf, len, pFunc->functionName);
|
||||||
|
}
|
||||||
|
|
||||||
|
uint8_t colType = ((SExprNode*)nodesListGetNode(pFunc->pParameterList, 0))->resType.type;
|
||||||
|
if (!IS_NUMERIC_TYPE(colType)) {
|
||||||
|
return invaildFuncParaTypeErrMsg(pErrBuf, len, pFunc->functionName);
|
||||||
|
}
|
||||||
|
|
||||||
|
// param1 ~ param3
|
||||||
|
for (int32_t i = 1; i < numOfParams; ++i) {
|
||||||
|
SNode* pParamNode = nodesListGetNode(pFunc->pParameterList, i);
|
||||||
|
if (QUERY_NODE_VALUE != nodeType(pParamNode)) {
|
||||||
|
return invaildFuncParaTypeErrMsg(pErrBuf, len, pFunc->functionName);
|
||||||
|
}
|
||||||
|
|
||||||
|
SValueNode* pValue = (SValueNode*)pParamNode;
|
||||||
|
|
||||||
|
pValue->notReserved = true;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (((SExprNode*)nodesListGetNode(pFunc->pParameterList, 1))->resType.type != TSDB_DATA_TYPE_BINARY ||
|
||||||
|
((SExprNode*)nodesListGetNode(pFunc->pParameterList, 2))->resType.type != TSDB_DATA_TYPE_BINARY ||
|
||||||
|
((SExprNode*)nodesListGetNode(pFunc->pParameterList, 3))->resType.type != TSDB_DATA_TYPE_BIGINT) {
|
||||||
|
return invaildFuncParaTypeErrMsg(pErrBuf, len, pFunc->functionName);
|
||||||
|
}
|
||||||
|
|
||||||
|
pFunc->node.resType = (SDataType){.bytes = getHistogramInfoSize() + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY};
|
||||||
|
} else {
|
||||||
|
if (1 != numOfParams) {
|
||||||
|
return invaildFuncParaNumErrMsg(pErrBuf, len, pFunc->functionName);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (((SExprNode*)nodesListGetNode(pFunc->pParameterList, 0))->resType.type != TSDB_DATA_TYPE_BINARY) {
|
||||||
|
return invaildFuncParaTypeErrMsg(pErrBuf, len, pFunc->functionName);
|
||||||
|
}
|
||||||
|
|
||||||
|
pFunc->node.resType = (SDataType){.bytes = 512, .type = TSDB_DATA_TYPE_BINARY};
|
||||||
|
}
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
|
static int32_t translateHistogramPartial(SFunctionNode* pFunc, char* pErrBuf, int32_t len) {
|
||||||
|
return translateHistogramImpl(pFunc, pErrBuf, len, true);
|
||||||
|
}
|
||||||
|
static int32_t translateHistogramMerge(SFunctionNode* pFunc, char* pErrBuf, int32_t len) {
|
||||||
|
return translateHistogramImpl(pFunc, pErrBuf, len, false);
|
||||||
|
}
|
||||||
|
|
||||||
static int32_t translateHLL(SFunctionNode* pFunc, char* pErrBuf, int32_t len) {
|
static int32_t translateHLL(SFunctionNode* pFunc, char* pErrBuf, int32_t len) {
|
||||||
if (1 != LIST_LENGTH(pFunc->pParameterList)) {
|
if (1 != LIST_LENGTH(pFunc->pParameterList)) {
|
||||||
return invaildFuncParaNumErrMsg(pErrBuf, len, pFunc->functionName);
|
return invaildFuncParaNumErrMsg(pErrBuf, len, pFunc->functionName);
|
||||||
|
|
@ -1394,6 +1446,28 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
|
||||||
.getEnvFunc = getHistogramFuncEnv,
|
.getEnvFunc = getHistogramFuncEnv,
|
||||||
.initFunc = histogramFunctionSetup,
|
.initFunc = histogramFunctionSetup,
|
||||||
.processFunc = histogramFunction,
|
.processFunc = histogramFunction,
|
||||||
|
.finalizeFunc = histogramFinalize,
|
||||||
|
.pPartialFunc = "_histogram_partial",
|
||||||
|
.pMergeFunc = "_histogram_merge"
|
||||||
|
},
|
||||||
|
{
|
||||||
|
.name = "_histogram_partial",
|
||||||
|
.type = FUNCTION_TYPE_HISTOGRAM_PARTIAL,
|
||||||
|
.classification = FUNC_MGT_AGG_FUNC,
|
||||||
|
.translateFunc = translateHistogramPartial,
|
||||||
|
.getEnvFunc = getHistogramFuncEnv,
|
||||||
|
.initFunc = histogramFunctionSetup,
|
||||||
|
.processFunc = histogramFunction,
|
||||||
|
.finalizeFunc = histogramPartialFinalize
|
||||||
|
},
|
||||||
|
{
|
||||||
|
.name = "_histogram_merge",
|
||||||
|
.type = FUNCTION_TYPE_HISTOGRAM_MERGE,
|
||||||
|
.classification = FUNC_MGT_AGG_FUNC,
|
||||||
|
.translateFunc = translateHistogramMerge,
|
||||||
|
.getEnvFunc = getHistogramFuncEnv,
|
||||||
|
.initFunc = functionSetup,
|
||||||
|
.processFunc = histogramFunctionMerge,
|
||||||
.finalizeFunc = histogramFinalize
|
.finalizeFunc = histogramFinalize
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
|
|
|
||||||
|
|
@ -2187,9 +2187,7 @@ int32_t apercentilePartialFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
|
||||||
SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx);
|
SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx);
|
||||||
SAPercentileInfo* pInfo = (SAPercentileInfo*)GET_ROWCELL_INTERBUF(pResInfo);
|
SAPercentileInfo* pInfo = (SAPercentileInfo*)GET_ROWCELL_INTERBUF(pResInfo);
|
||||||
|
|
||||||
int32_t bytesHist = (int32_t)(sizeof(SAPercentileInfo) + sizeof(SHistogramInfo) + sizeof(SHistBin) * (MAX_HISTOGRAM_BIN + 1));
|
int32_t resultBytes = getApercentileMaxSize();
|
||||||
int32_t bytesDigest = (int32_t)(sizeof(SAPercentileInfo) + TDIGEST_SIZE(COMPRESSION));
|
|
||||||
int32_t resultBytes = TMAX(bytesHist, bytesDigest);
|
|
||||||
char *res = taosMemoryCalloc(resultBytes + VARSTR_HEADER_SIZE, sizeof(char));
|
char *res = taosMemoryCalloc(resultBytes + VARSTR_HEADER_SIZE, sizeof(char));
|
||||||
|
|
||||||
if (pInfo->algo == APERCT_ALGO_TDIGEST) {
|
if (pInfo->algo == APERCT_ALGO_TDIGEST) {
|
||||||
|
|
@ -3138,6 +3136,10 @@ int32_t elapsedFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
|
||||||
return functionFinalize(pCtx, pBlock);
|
return functionFinalize(pCtx, pBlock);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int32_t getHistogramInfoSize() {
|
||||||
|
return (int32_t)sizeof(SHistoFuncInfo) + HISTOGRAM_MAX_BINS_NUM * sizeof(SHistoFuncBin);
|
||||||
|
}
|
||||||
|
|
||||||
bool getHistogramFuncEnv(SFunctionNode* UNUSED_PARAM(pFunc), SFuncExecEnv* pEnv) {
|
bool getHistogramFuncEnv(SFunctionNode* UNUSED_PARAM(pFunc), SFuncExecEnv* pEnv) {
|
||||||
pEnv->calcMemSize = sizeof(SHistoFuncInfo) + HISTOGRAM_MAX_BINS_NUM * sizeof(SHistoFuncBin);
|
pEnv->calcMemSize = sizeof(SHistoFuncInfo) + HISTOGRAM_MAX_BINS_NUM * sizeof(SHistoFuncBin);
|
||||||
return true;
|
return true;
|
||||||
|
|
@ -3348,6 +3350,30 @@ int32_t histogramFunction(SqlFunctionCtx *pCtx) {
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int32_t histogramFunctionMerge(SqlFunctionCtx *pCtx) {
|
||||||
|
SInputColumnInfoData* pInput = &pCtx->input;
|
||||||
|
SColumnInfoData* pCol = pInput->pData[0];
|
||||||
|
ASSERT(pCol->info.type == TSDB_DATA_TYPE_BINARY);
|
||||||
|
|
||||||
|
SHistoFuncInfo* pInfo = GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx));
|
||||||
|
|
||||||
|
int32_t start = pInput->startRowIndex;
|
||||||
|
char* data = colDataGetData(pCol, start);
|
||||||
|
SHistoFuncInfo* pInputInfo = (SHistoFuncInfo *)varDataVal(data);
|
||||||
|
|
||||||
|
pInfo->normalized = pInputInfo->normalized;
|
||||||
|
pInfo->numOfBins = pInputInfo->numOfBins;
|
||||||
|
pInfo->totalCount += pInputInfo->totalCount;
|
||||||
|
for (int32_t k = 0; k < pInfo->numOfBins; ++k) {
|
||||||
|
pInfo->bins[k].lower = pInputInfo->bins[k].lower;
|
||||||
|
pInfo->bins[k].upper = pInputInfo->bins[k].upper;
|
||||||
|
pInfo->bins[k].count += pInputInfo->bins[k].count;
|
||||||
|
}
|
||||||
|
|
||||||
|
SET_VAL(GET_RES_INFO(pCtx), pInfo->numOfBins, pInfo->numOfBins);
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
int32_t histogramFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
|
int32_t histogramFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
|
||||||
SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx);
|
SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx);
|
||||||
SHistoFuncInfo* pInfo = GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx));
|
SHistoFuncInfo* pInfo = GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx));
|
||||||
|
|
@ -3384,6 +3410,24 @@ int32_t histogramFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
|
||||||
return pResInfo->numOfRes;
|
return pResInfo->numOfRes;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int32_t histogramPartialFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
|
||||||
|
SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx);
|
||||||
|
SHistoFuncInfo* pInfo = GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx));
|
||||||
|
int32_t resultBytes = getHistogramInfoSize();
|
||||||
|
char *res = taosMemoryCalloc(resultBytes + VARSTR_HEADER_SIZE, sizeof(char));
|
||||||
|
|
||||||
|
memcpy(varDataVal(res), pInfo, resultBytes);
|
||||||
|
varDataSetLen(res, resultBytes);
|
||||||
|
|
||||||
|
int32_t slotId = pCtx->pExpr->base.resSchema.slotId;
|
||||||
|
SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, slotId);
|
||||||
|
|
||||||
|
colDataAppend(pCol, pBlock->info.rows, res, false);
|
||||||
|
|
||||||
|
taosMemoryFree(res);
|
||||||
|
return 1;
|
||||||
|
}
|
||||||
|
|
||||||
bool getHLLFuncEnv(SFunctionNode* UNUSED_PARAM(pFunc), SFuncExecEnv* pEnv) {
|
bool getHLLFuncEnv(SFunctionNode* UNUSED_PARAM(pFunc), SFuncExecEnv* pEnv) {
|
||||||
pEnv->calcMemSize = sizeof(SHLLInfo);
|
pEnv->calcMemSize = sizeof(SHLLInfo);
|
||||||
return true;
|
return true;
|
||||||
|
|
|
||||||
Binary file not shown.
|
|
@ -901,12 +901,22 @@ int32_t toISO8601Function(SScalarParam *pInput, int32_t inputNum, SScalarParam *
|
||||||
|
|
||||||
if (hasFraction) {
|
if (hasFraction) {
|
||||||
int32_t fracLen = (int32_t)strlen(fraction) + 1;
|
int32_t fracLen = (int32_t)strlen(fraction) + 1;
|
||||||
char *tzInfo = strchr(buf, '+');
|
|
||||||
if (tzInfo) {
|
char *tzInfo;
|
||||||
|
if (buf[len - 1] == 'z' || buf[len - 1] == 'Z') {
|
||||||
|
tzInfo = &buf[len - 1];
|
||||||
memmove(tzInfo + fracLen, tzInfo, strlen(tzInfo));
|
memmove(tzInfo + fracLen, tzInfo, strlen(tzInfo));
|
||||||
} else {
|
} else {
|
||||||
tzInfo = strchr(buf, '-');
|
tzInfo = strchr(buf, '+');
|
||||||
memmove(tzInfo + fracLen, tzInfo, strlen(tzInfo));
|
if (tzInfo) {
|
||||||
|
memmove(tzInfo + fracLen, tzInfo, strlen(tzInfo));
|
||||||
|
} else {
|
||||||
|
//search '-' backwards
|
||||||
|
tzInfo = strrchr(buf, '-');
|
||||||
|
if (tzInfo) {
|
||||||
|
memmove(tzInfo + fracLen, tzInfo, strlen(tzInfo));
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
char tmp[32] = {0};
|
char tmp[32] = {0};
|
||||||
|
|
|
||||||
|
|
@ -59,7 +59,7 @@ int32_t streamTaskEnqueue(SStreamTask* pTask, SStreamDispatchReq* pReq, SRpcMsg*
|
||||||
|
|
||||||
// rsp by input status
|
// rsp by input status
|
||||||
void* buf = rpcMallocCont(sizeof(SMsgHead) + sizeof(SStreamDispatchRsp));
|
void* buf = rpcMallocCont(sizeof(SMsgHead) + sizeof(SStreamDispatchRsp));
|
||||||
((SMsgHead*)buf)->vgId = htonl(pReq->sourceVg);
|
((SMsgHead*)buf)->vgId = htonl(pReq->upstreamNodeId);
|
||||||
SStreamDispatchRsp* pCont = POINTER_SHIFT(buf, sizeof(SMsgHead));
|
SStreamDispatchRsp* pCont = POINTER_SHIFT(buf, sizeof(SMsgHead));
|
||||||
pCont->inputStatus = status;
|
pCont->inputStatus = status;
|
||||||
pCont->streamId = pReq->streamId;
|
pCont->streamId = pReq->streamId;
|
||||||
|
|
@ -78,7 +78,18 @@ int32_t streamProcessDispatchReq(SStreamTask* pTask, SMsgCb* pMsgCb, SStreamDisp
|
||||||
// 2.1. idle: exec
|
// 2.1. idle: exec
|
||||||
// 2.2. executing: return
|
// 2.2. executing: return
|
||||||
// 2.3. closing: keep trying
|
// 2.3. closing: keep trying
|
||||||
streamExec(pTask, pMsgCb);
|
if (pTask->execType != TASK_EXEC__NONE) {
|
||||||
|
streamExec(pTask, pMsgCb);
|
||||||
|
} else {
|
||||||
|
ASSERT(pTask->sinkType != TASK_SINK__NONE);
|
||||||
|
while (1) {
|
||||||
|
void* data = streamQueueNextItem(pTask->inputQueue);
|
||||||
|
if (data == NULL) return 0;
|
||||||
|
if (streamTaskOutput(pTask, data) < 0) {
|
||||||
|
ASSERT(0);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// 3. handle output
|
// 3. handle output
|
||||||
// 3.1 check and set status
|
// 3.1 check and set status
|
||||||
|
|
|
||||||
|
|
@ -22,6 +22,7 @@ int32_t tEncodeStreamDispatchReq(SEncoder* pEncoder, const SStreamDispatchReq* p
|
||||||
if (tEncodeI32(pEncoder, pReq->sourceTaskId) < 0) return -1;
|
if (tEncodeI32(pEncoder, pReq->sourceTaskId) < 0) return -1;
|
||||||
if (tEncodeI32(pEncoder, pReq->sourceVg) < 0) return -1;
|
if (tEncodeI32(pEncoder, pReq->sourceVg) < 0) return -1;
|
||||||
if (tEncodeI32(pEncoder, pReq->sourceChildId) < 0) return -1;
|
if (tEncodeI32(pEncoder, pReq->sourceChildId) < 0) return -1;
|
||||||
|
if (tEncodeI32(pEncoder, pReq->upstreamNodeId) < 0) return -1;
|
||||||
if (tEncodeI32(pEncoder, pReq->blockNum) < 0) return -1;
|
if (tEncodeI32(pEncoder, pReq->blockNum) < 0) return -1;
|
||||||
ASSERT(taosArrayGetSize(pReq->data) == pReq->blockNum);
|
ASSERT(taosArrayGetSize(pReq->data) == pReq->blockNum);
|
||||||
ASSERT(taosArrayGetSize(pReq->dataLen) == pReq->blockNum);
|
ASSERT(taosArrayGetSize(pReq->dataLen) == pReq->blockNum);
|
||||||
|
|
@ -42,6 +43,7 @@ int32_t tDecodeStreamDispatchReq(SDecoder* pDecoder, SStreamDispatchReq* pReq) {
|
||||||
if (tDecodeI32(pDecoder, &pReq->sourceTaskId) < 0) return -1;
|
if (tDecodeI32(pDecoder, &pReq->sourceTaskId) < 0) return -1;
|
||||||
if (tDecodeI32(pDecoder, &pReq->sourceVg) < 0) return -1;
|
if (tDecodeI32(pDecoder, &pReq->sourceVg) < 0) return -1;
|
||||||
if (tDecodeI32(pDecoder, &pReq->sourceChildId) < 0) return -1;
|
if (tDecodeI32(pDecoder, &pReq->sourceChildId) < 0) return -1;
|
||||||
|
if (tDecodeI32(pDecoder, &pReq->upstreamNodeId) < 0) return -1;
|
||||||
if (tDecodeI32(pDecoder, &pReq->blockNum) < 0) return -1;
|
if (tDecodeI32(pDecoder, &pReq->blockNum) < 0) return -1;
|
||||||
ASSERT(pReq->blockNum > 0);
|
ASSERT(pReq->blockNum > 0);
|
||||||
pReq->data = taosArrayInit(pReq->blockNum, sizeof(void*));
|
pReq->data = taosArrayInit(pReq->blockNum, sizeof(void*));
|
||||||
|
|
@ -94,6 +96,7 @@ int32_t streamBuildDispatchMsg(SStreamTask* pTask, SStreamDataBlock* data, SRpcM
|
||||||
.sourceTaskId = pTask->taskId,
|
.sourceTaskId = pTask->taskId,
|
||||||
.sourceVg = data->sourceVg,
|
.sourceVg = data->sourceVg,
|
||||||
.sourceChildId = pTask->childId,
|
.sourceChildId = pTask->childId,
|
||||||
|
.upstreamNodeId = pTask->nodeId,
|
||||||
.blockNum = blockNum,
|
.blockNum = blockNum,
|
||||||
};
|
};
|
||||||
|
|
||||||
|
|
@ -184,13 +187,17 @@ int32_t streamDispatch(SStreamTask* pTask, SMsgCb* pMsgCb) {
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
SStreamDataBlock* pBlock = streamQueueNextItem(pTask->outputQueue);
|
SStreamDataBlock* pBlock = streamQueueNextItem(pTask->outputQueue);
|
||||||
if (pBlock == NULL) return 0;
|
if (pBlock == NULL) {
|
||||||
|
atomic_store_8(&pTask->outputStatus, TASK_OUTPUT_STATUS__NORMAL);
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
ASSERT(pBlock->type == STREAM_DATA_TYPE_SSDATA_BLOCK);
|
ASSERT(pBlock->type == STREAM_DATA_TYPE_SSDATA_BLOCK);
|
||||||
|
|
||||||
SRpcMsg dispatchMsg = {0};
|
SRpcMsg dispatchMsg = {0};
|
||||||
SEpSet* pEpSet = NULL;
|
SEpSet* pEpSet = NULL;
|
||||||
if (streamBuildDispatchMsg(pTask, pBlock, &dispatchMsg, &pEpSet) < 0) {
|
if (streamBuildDispatchMsg(pTask, pBlock, &dispatchMsg, &pEpSet) < 0) {
|
||||||
ASSERT(0);
|
ASSERT(0);
|
||||||
|
atomic_store_8(&pTask->outputStatus, TASK_OUTPUT_STATUS__NORMAL);
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -65,6 +65,7 @@ static SArray* streamExecForQall(SStreamTask* pTask, SArray* pRes) {
|
||||||
}
|
}
|
||||||
qRes->type = STREAM_INPUT__DATA_BLOCK;
|
qRes->type = STREAM_INPUT__DATA_BLOCK;
|
||||||
qRes->blocks = pRes;
|
qRes->blocks = pRes;
|
||||||
|
/*qRes->sourceVg = pTask->nodeId;*/
|
||||||
if (streamTaskOutput(pTask, qRes) < 0) {
|
if (streamTaskOutput(pTask, qRes) < 0) {
|
||||||
streamQueueProcessFail(pTask->inputQueue);
|
streamQueueProcessFail(pTask->inputQueue);
|
||||||
taosArrayDestroy(pRes);
|
taosArrayDestroy(pRes);
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue