Merge pull request #13561 from taosdata/feature/stream
enh(stream): use same name rule with schemaless
This commit is contained in:
commit
428966dec0
|
@ -71,7 +71,7 @@ SEpSet getEpSet_s(SCorEpSet* pEpSet);
|
|||
#define colDataGetData(p1_, r_) \
|
||||
((IS_VAR_DATA_TYPE((p1_)->info.type)) ? colDataGetVarData(p1_, r_) : colDataGetNumData(p1_, r_))
|
||||
|
||||
#define IS_JSON_NULL(type,data) ((type) == TSDB_DATA_TYPE_JSON && *(data) == TSDB_DATA_TYPE_NULL)
|
||||
#define IS_JSON_NULL(type, data) ((type) == TSDB_DATA_TYPE_JSON && *(data) == TSDB_DATA_TYPE_NULL)
|
||||
|
||||
static FORCE_INLINE bool colDataIsNull_s(const SColumnInfoData* pColumnInfoData, uint32_t row) {
|
||||
if (!pColumnInfoData->hasNull) {
|
||||
|
@ -180,7 +180,7 @@ static FORCE_INLINE void colDataAppendDouble(SColumnInfoData* pColumnInfoData, u
|
|||
*(double*)p = *(double*)v;
|
||||
}
|
||||
|
||||
int32_t getJsonValueLen(const char *data);
|
||||
int32_t getJsonValueLen(const char* data);
|
||||
|
||||
int32_t colDataAppend(SColumnInfoData* pColumnInfoData, uint32_t currentRow, const char* pData, bool isNull);
|
||||
int32_t colDataMergeCol(SColumnInfoData* pColumnInfoData, uint32_t numOfRow1, int32_t* capacity,
|
||||
|
@ -223,7 +223,8 @@ int32_t blockDataTrimFirstNRows(SSDataBlock* pBlock, size_t n);
|
|||
|
||||
SSDataBlock* createOneDataBlock(const SSDataBlock* pDataBlock, bool copyData);
|
||||
|
||||
void blockCompressEncode(const SSDataBlock* pBlock, char* data, int32_t* dataLen, int32_t numOfCols, int8_t needCompress);
|
||||
void blockCompressEncode(const SSDataBlock* pBlock, char* data, int32_t* dataLen, int32_t numOfCols,
|
||||
int8_t needCompress);
|
||||
const char* blockCompressDecode(SSDataBlock* pBlock, int32_t numOfCols, int32_t numOfRows, const char* pData);
|
||||
|
||||
void blockDebugShowData(const SArray* dataBlocks, const char* flag);
|
||||
|
@ -231,6 +232,8 @@ void blockDebugShowData(const SArray* dataBlocks, const char* flag);
|
|||
int32_t buildSubmitReqFromDataBlock(SSubmitReq** pReq, const SArray* pDataBlocks, STSchema* pTSchema, int32_t vgId,
|
||||
tb_uid_t suid);
|
||||
|
||||
char* buildCtbNameByGroupId(const char* stbName, uint64_t groupId);
|
||||
|
||||
SSubmitReq* tdBlockToSubmit(const SArray* pBlocks, const STSchema* pSchema, bool createTb, int64_t suid,
|
||||
const char* stbFullName, int32_t vgId);
|
||||
|
||||
|
|
|
@ -145,26 +145,6 @@ SStreamDataSubmit* streamSubmitRefClone(SStreamDataSubmit* pSubmit);
|
|||
#if 0
|
||||
int32_t streamDataBlockEncode(void** buf, const SStreamDataBlock* pOutput);
|
||||
void* streamDataBlockDecode(const void* buf, SStreamDataBlock* pInput);
|
||||
|
||||
static FORCE_INLINE int32_t streamEnqueue1(SStreamQueue* queue, SStreamQueueItem* pItem) {
|
||||
int8_t inputStatus = atomic_load_8(&queue->enqueueStatus);
|
||||
if (inputStatus == TASK_INPUT_STATUS__NORMAL) {
|
||||
if (pItem->type == STREAM_INPUT__DATA_SUBMIT) {
|
||||
SStreamDataSubmit* pSubmitClone = streamSubmitRefClone((SStreamDataSubmit*)pItem);
|
||||
if (pSubmitClone == NULL) {
|
||||
atomic_store_8(&queue->enqueueStatus, TASK_INPUT_STATUS__FAILED);
|
||||
return -1;
|
||||
}
|
||||
taosWriteQitem(queue->queue, pSubmitClone);
|
||||
} else if (pItem->type == STREAM_INPUT__DATA_BLOCK) {
|
||||
taosWriteQitem(queue->queue, pItem);
|
||||
} else if (pItem->type == STREAM_INPUT__CHECKPOINT) {
|
||||
taosWriteQitem(queue->queue, pItem);
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
#endif
|
||||
|
||||
typedef struct {
|
||||
|
|
|
@ -195,7 +195,7 @@ typedef struct {
|
|||
|
||||
tmq_conf_t* tmq_conf_new() {
|
||||
tmq_conf_t* conf = taosMemoryCalloc(1, sizeof(tmq_conf_t));
|
||||
conf->withTbName = -1;
|
||||
conf->withTbName = false;
|
||||
conf->autoCommit = true;
|
||||
conf->autoCommitInterval = 5000;
|
||||
conf->resetOffset = TMQ_CONF__RESET_OFFSET__EARLIEAST;
|
||||
|
@ -256,13 +256,10 @@ tmq_conf_res_t tmq_conf_set(tmq_conf_t* conf, const char* key, const char* value
|
|||
|
||||
if (strcmp(key, "msg.with.table.name") == 0) {
|
||||
if (strcmp(value, "true") == 0) {
|
||||
conf->withTbName = 1;
|
||||
conf->withTbName = true;
|
||||
return TMQ_CONF_OK;
|
||||
} else if (strcmp(value, "false") == 0) {
|
||||
conf->withTbName = 0;
|
||||
return TMQ_CONF_OK;
|
||||
} else if (strcmp(value, "none") == 0) {
|
||||
conf->withTbName = -1;
|
||||
conf->withTbName = false;
|
||||
return TMQ_CONF_OK;
|
||||
} else {
|
||||
return TMQ_CONF_INVALID;
|
||||
|
|
|
@ -18,6 +18,7 @@
|
|||
#include "tcompare.h"
|
||||
#include "tglobal.h"
|
||||
#include "tlog.h"
|
||||
#include "tname.h"
|
||||
|
||||
int32_t taosGetFqdnPortFromEp(const char* ep, SEp* pEp) {
|
||||
pEp->port = 0;
|
||||
|
@ -99,7 +100,7 @@ void colDataTrim(SColumnInfoData* pColumnInfoData) {
|
|||
// TODO
|
||||
}
|
||||
|
||||
int32_t getJsonValueLen(const char *data) {
|
||||
int32_t getJsonValueLen(const char* data) {
|
||||
int32_t dataLen = 0;
|
||||
if (*data == TSDB_DATA_TYPE_NULL) {
|
||||
dataLen = CHAR_BYTES;
|
||||
|
@ -109,7 +110,7 @@ int32_t getJsonValueLen(const char *data) {
|
|||
dataLen = DOUBLE_BYTES + CHAR_BYTES;
|
||||
} else if (*data == TSDB_DATA_TYPE_BOOL) {
|
||||
dataLen = CHAR_BYTES + CHAR_BYTES;
|
||||
} else if (*data & TD_TAG_JSON) { // json string
|
||||
} else if (*data & TD_TAG_JSON) { // json string
|
||||
dataLen = ((STag*)(data))->len;
|
||||
} else {
|
||||
ASSERT(0);
|
||||
|
@ -137,7 +138,7 @@ int32_t colDataAppend(SColumnInfoData* pColumnInfoData, uint32_t currentRow, con
|
|||
int32_t dataLen = 0;
|
||||
if (type == TSDB_DATA_TYPE_JSON) {
|
||||
dataLen = getJsonValueLen(pData);
|
||||
}else {
|
||||
} else {
|
||||
dataLen = varDataTLen(pData);
|
||||
}
|
||||
|
||||
|
@ -1283,7 +1284,7 @@ static void doShiftBitmap(char* nullBitmap, size_t n, size_t total) {
|
|||
if (n % 8 == 0) {
|
||||
memmove(nullBitmap, nullBitmap + n / 8, newLen);
|
||||
} else {
|
||||
int32_t tail = n % 8;
|
||||
int32_t tail = n % 8;
|
||||
int32_t i = 0;
|
||||
uint8_t* p = (uint8_t*)nullBitmap;
|
||||
|
||||
|
@ -1301,7 +1302,7 @@ static void doShiftBitmap(char* nullBitmap, size_t n, size_t total) {
|
|||
}
|
||||
} else if (n > 8) {
|
||||
int32_t gap = len - newLen;
|
||||
while(i < newLen) {
|
||||
while (i < newLen) {
|
||||
uint8_t v = p[i + gap];
|
||||
p[i] = (v << tail);
|
||||
|
||||
|
@ -1316,7 +1317,6 @@ static void doShiftBitmap(char* nullBitmap, size_t n, size_t total) {
|
|||
}
|
||||
}
|
||||
|
||||
|
||||
static void colDataTrimFirstNRows(SColumnInfoData* pColInfoData, size_t n, size_t total) {
|
||||
if (IS_VAR_DATA_TYPE(pColInfoData->info.type)) {
|
||||
memmove(pColInfoData->varmeta.offset, &pColInfoData->varmeta.offset[n], (total - n) * sizeof(int32_t));
|
||||
|
@ -1544,7 +1544,8 @@ void blockDebugShowData(const SArray* dataBlocks, const char* flag) {
|
|||
*
|
||||
* TODO: colId should be set
|
||||
*/
|
||||
int32_t buildSubmitReqFromDataBlock(SSubmitReq** pReq, const SArray* pDataBlocks, STSchema* pTSchema, int32_t vgId, tb_uid_t suid) {
|
||||
int32_t buildSubmitReqFromDataBlock(SSubmitReq** pReq, const SArray* pDataBlocks, STSchema* pTSchema, int32_t vgId,
|
||||
tb_uid_t suid) {
|
||||
int32_t sz = taosArrayGetSize(pDataBlocks);
|
||||
int32_t bufSize = sizeof(SSubmitReq);
|
||||
for (int32_t i = 0; i < sz; ++i) {
|
||||
|
@ -1585,12 +1586,12 @@ int32_t buildSubmitReqFromDataBlock(SSubmitReq** pReq, const SArray* pDataBlocks
|
|||
int32_t dataLen = 0;
|
||||
for (int32_t j = 0; j < rows; ++j) { // iterate by row
|
||||
tdSRowResetBuf(&rb, POINTER_SHIFT(pDataBuf, msgLen)); // set row buf
|
||||
bool isStartKey = false;
|
||||
bool isStartKey = false;
|
||||
int32_t offset = 0;
|
||||
for (int32_t k = 0; k < colNum; ++k) { // iterate by column
|
||||
SColumnInfoData* pColInfoData = taosArrayGet(pDataBlock->pDataBlock, k);
|
||||
STColumn* pCol = &pTSchema->columns[k];
|
||||
void* var = POINTER_SHIFT(pColInfoData->pData, j * pColInfoData->info.bytes);
|
||||
STColumn* pCol = &pTSchema->columns[k];
|
||||
void* var = POINTER_SHIFT(pColInfoData->pData, j * pColInfoData->info.bytes);
|
||||
switch (pColInfoData->info.type) {
|
||||
case TSDB_DATA_TYPE_TIMESTAMP:
|
||||
if (!isStartKey) {
|
||||
|
@ -1599,15 +1600,18 @@ int32_t buildSubmitReqFromDataBlock(SSubmitReq** pReq, const SArray* pDataBlocks
|
|||
offset, k);
|
||||
|
||||
} else {
|
||||
tdAppendColValToRow(&rb, PRIMARYKEY_TIMESTAMP_COL_ID + k, TSDB_DATA_TYPE_TIMESTAMP, TD_VTYPE_NORM, var, true, offset, k);
|
||||
tdAppendColValToRow(&rb, PRIMARYKEY_TIMESTAMP_COL_ID + k, TSDB_DATA_TYPE_TIMESTAMP, TD_VTYPE_NORM, var,
|
||||
true, offset, k);
|
||||
}
|
||||
break;
|
||||
case TSDB_DATA_TYPE_NCHAR: {
|
||||
tdAppendColValToRow(&rb, PRIMARYKEY_TIMESTAMP_COL_ID + k, TSDB_DATA_TYPE_NCHAR, TD_VTYPE_NORM, var, true, offset, k);
|
||||
tdAppendColValToRow(&rb, PRIMARYKEY_TIMESTAMP_COL_ID + k, TSDB_DATA_TYPE_NCHAR, TD_VTYPE_NORM, var, true,
|
||||
offset, k);
|
||||
break;
|
||||
}
|
||||
case TSDB_DATA_TYPE_VARCHAR: { // TSDB_DATA_TYPE_BINARY
|
||||
tdAppendColValToRow(&rb, PRIMARYKEY_TIMESTAMP_COL_ID + k, TSDB_DATA_TYPE_VARCHAR, TD_VTYPE_NORM, var, true, offset, k);
|
||||
tdAppendColValToRow(&rb, PRIMARYKEY_TIMESTAMP_COL_ID + k, TSDB_DATA_TYPE_VARCHAR, TD_VTYPE_NORM, var, true,
|
||||
offset, k);
|
||||
break;
|
||||
}
|
||||
case TSDB_DATA_TYPE_VARBINARY:
|
||||
|
@ -1645,7 +1649,7 @@ int32_t buildSubmitReqFromDataBlock(SSubmitReq** pReq, const SArray* pDataBlocks
|
|||
}
|
||||
break;
|
||||
}
|
||||
offset += TYPE_BYTES[pCol->type]; // sum/avg would convert to int64_t/uint64_t/double during aggregation
|
||||
offset += TYPE_BYTES[pCol->type]; // sum/avg would convert to int64_t/uint64_t/double during aggregation
|
||||
}
|
||||
dataLen += TD_ROW_LEN(rb.pBuf);
|
||||
#ifdef TD_DEBUG_PRINT_ROW
|
||||
|
@ -1681,11 +1685,38 @@ int32_t buildSubmitReqFromDataBlock(SSubmitReq** pReq, const SArray* pDataBlocks
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
char* buildCtbNameByGroupId(const char* stbName, uint64_t groupId) {
|
||||
SArray* tags = taosArrayInit(0, sizeof(void*));
|
||||
SSmlKv* pTag = taosMemoryCalloc(1, sizeof(SSmlKv));
|
||||
pTag->key = "group_id";
|
||||
pTag->keyLen = strlen(pTag->key);
|
||||
pTag->type = TSDB_DATA_TYPE_UBIGINT;
|
||||
pTag->u = groupId;
|
||||
taosArrayPush(tags, &pTag);
|
||||
|
||||
void* cname = taosMemoryCalloc(1, TSDB_TABLE_NAME_LEN + 1);
|
||||
|
||||
RandTableName rname = {
|
||||
.tags = tags,
|
||||
.sTableName = stbName,
|
||||
.sTableNameLen = strlen(stbName),
|
||||
.childTableName = cname,
|
||||
};
|
||||
|
||||
buildChildTableName(&rname);
|
||||
|
||||
taosMemoryFree(pTag);
|
||||
taosArrayDestroy(tags);
|
||||
|
||||
ASSERT(rname.childTableName && rname.childTableName[0]);
|
||||
return rname.childTableName;
|
||||
}
|
||||
|
||||
SSubmitReq* tdBlockToSubmit(const SArray* pBlocks, const STSchema* pTSchema, bool createTb, int64_t suid,
|
||||
const char* stbFullName, int32_t vgId) {
|
||||
SSubmitReq* ret = NULL;
|
||||
SArray* tagArray = taosArrayInit(1, sizeof(STagVal));
|
||||
if(!tagArray) {
|
||||
if (!tagArray) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
return NULL;
|
||||
}
|
||||
|
@ -1703,15 +1734,12 @@ SSubmitReq* tdBlockToSubmit(const SArray* pBlocks, const STSchema* pTSchema, boo
|
|||
|
||||
if (createTb) {
|
||||
SVCreateTbReq createTbReq = {0};
|
||||
char* cname = taosMemoryCalloc(1, TSDB_TABLE_FNAME_LEN);
|
||||
snprintf(cname, TSDB_TABLE_FNAME_LEN, "%s:%ld", stbFullName, pDataBlock->info.groupId);
|
||||
char* cname = buildCtbNameByGroupId(stbFullName, pDataBlock->info.groupId);
|
||||
createTbReq.name = cname;
|
||||
createTbReq.flags = 0;
|
||||
createTbReq.type = TSDB_CHILD_TABLE;
|
||||
createTbReq.ctb.suid = suid;
|
||||
|
||||
|
||||
|
||||
STagVal tagVal = {.cid = 1,
|
||||
.type = TSDB_DATA_TYPE_UBIGINT,
|
||||
.pData = (uint8_t*)&pDataBlock->info.groupId,
|
||||
|
|
|
@ -13,7 +13,7 @@
|
|||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
#include "tstream.h"
|
||||
#include "streamInc.h"
|
||||
|
||||
#if 0
|
||||
int32_t streamDataBlockEncode(void** buf, const SStreamDataBlock* pOutput) {
|
||||
|
@ -74,7 +74,6 @@ FAIL:
|
|||
}
|
||||
|
||||
static FORCE_INLINE void streamDataSubmitRefInc(SStreamDataSubmit* pDataSubmit) {
|
||||
//
|
||||
atomic_add_fetch_32(pDataSubmit->dataRef, 1);
|
||||
}
|
||||
|
||||
|
|
|
@ -116,11 +116,9 @@ int32_t streamBuildDispatchMsg(SStreamTask* pTask, SStreamDataBlock* data, SRpcM
|
|||
*ppEpSet = &pTask->fixedEpDispatcher.epSet;
|
||||
downstreamTaskId = pTask->fixedEpDispatcher.taskId;
|
||||
} else if (pTask->dispatchType == TASK_DISPATCH__SHUFFLE) {
|
||||
// TODO get ctbName
|
||||
char ctbName[TSDB_TABLE_FNAME_LEN + 22] = {0};
|
||||
// TODO get ctbName for each block
|
||||
SSDataBlock* pBlock = taosArrayGet(data->blocks, 0);
|
||||
sprintf(ctbName, "%s:%ld", pTask->shuffleDispatcher.stbFullName, pBlock->info.groupId);
|
||||
// get vg and ep
|
||||
char* ctbName = buildCtbNameByGroupId(pTask->shuffleDispatcher.stbFullName, pBlock->info.groupId);
|
||||
// TODO: get hash function by hashMethod
|
||||
|
||||
// get groupId, compute hash value
|
||||
|
|
Loading…
Reference in New Issue