今天教大家借助一款框架快速实现一个数据库,这个框架就是Calcite,下面会带大家通过两个例子快速教会大家怎么实现,一个是可以通过 SQL 语句的方式可以直接查询文件内容,第二个是模拟 Mysql 查询功能,以及最后告诉大家怎么实现 SQL 查询 Kafka 数据。
Calcite 是一个用于优化异构数据源的查询处理的可插拔基础框架(他是一个框架),可以将任意数据(Any data, Anywhere)DML 转换成基于 SQL 的 DML 引擎,并且我们可以选择性的使用它的部分功能。
当我们需要自建一个数据库的时候,数据可以为任何格式的,比如text、word、xml、mysql、es、csv、第三方接口数据等等,我们只有数据,我们想让这些数据支持 SQL 形式动态增删改查。
另外,像Hive、Drill、Flink、Phoenix 和 Storm 等项目中,数据处理系统都是使用 Calcite 来做 SQL 解析和查询优化,当然,还有部分用来构建自己的 JDBC driver。
就是将标准 SQL(可以理解为Mysql)关键词以及关键词之间的字符串截取出来,每一个token,会被封装为一个SqlNode,SqlNode会衍生很多子类,比如Select会被封装为SqlSelect,当前 SqlNode 也能反解析为 SQL 文本。
某个字段的名称和类型信息
多个 RelDataTypeField 组成了 RelDataType,可以理解为数据行
一个完整的表的信息
所有元数据的组合,可以理解为一组 Table 或者库的概念
org.apache.calcite
calcite-core
1.32.0
model.json 里面主要描述或者说告诉 Calcite 如何创建 Schema,也就是告诉框架怎么创建出库。
{
"version": "1.0",
"defaultSchema": "CSV",
"schemas": [
{
"name": "CSV",
"type": "custom",
"factory": "csv.CsvSchemaFactory",
"operand": {
"directory": "csv"
}
}
]
}
接下来还需要定义一个 csv 文件,用来定义表结构。
NAME:string,MONEY:string
aixiaoxian,10000万
xiaobai,10000万
adong,10000万
maomao,10000万
xixi,10000万
zizi,10000万
wuwu,10000万
kuku,10000万
整个项目的结构大概就是这样:
在上述文件中指定的包路径下去编写 CsvSchemaFactory 类,实现 SchemaFactory 接口,并且实现里面唯一的方法 create 方法,创建Schema(库)。
public class CsvSchemaFactory implements SchemaFactory {
@Override
public Schema create(SchemaPlus parentSchema, String name,
Mapoperand) {
final String directory = (String) operand.get("directory");
File directoryFile = new File(directory);
return new CsvSchema(directoryFile, "scannable");
}
}
有了 SchemaFactory,接下来需要自定义 Schema 类。
自定义的 Schema 需要实现 Schema 接口,但是直接实现要实现的方法太多,我们去实现官方的 AbstractSchema 类,这样就只需要实现一个方法就行(如果有其他定制化需求可以实现原生接口)。
核心的逻辑就是createTableMap方法,用于创建出 Table 表。
他会扫描指定的Resource下面的所有 csv 文件,将每个文件映射成Table对象,最终以map形式返回,Schema接口的其他几个方法会用到这个对象。
@Override
protected MapgetTableMap() {
if (tableMap == null) {
tableMap = createTableMap();
}
return tableMap;
}
private MapcreateTableMap() {
final Source baseSource = Sources.of(directoryFile);
File[] files = directoryFile.listFiles((dir, name) -> {
final String nameSansGz = trim(name, ".gz");
return nameSansGz.endsWith(".csv");
});
if (files == null) {
System.out.println("directory " + directoryFile + " not found");
files = new File[0];
}
final ImmutableMap.Builderbuilder = ImmutableMap.builder();
for (File file : files) {
Source source = Sources.of(file);
final Source sourceSansCsv = source.trimOrNull(".csv");
if (sourceSansCsv != null) {
final Table table = createTable(source);
builder.put(sourceSansCsv.relative(baseSource).path(), table);
}
}
return builder.build();
}
Schema 有了,并且数据文件 csv 也映射成 Table 了,一个 csv 文件对应一个 Table。
接下来我们去自定义 Table,自定义 Table 的核心是我们要定义字段的类型和名称,以及如何读取 csv文件。
先获取数据类型和名称,即单表结构,从csv文件头中获取(当前文件头需要我们自己定义,包括规则我们也可以定制化)。
public abstract class CsvTable extends AbstractTable {
protected final Source source;
protected final @Nullable RelProtoDataType protoRowType;
private @Nullable RelDataType rowType;
private @Nullable ListfieldTypes;
CsvTable(Source source, @Nullable RelProtoDataType protoRowType) {
this.source = source;
this.protoRowType = protoRowType;
}
@Override
public RelDataType getRowType(RelDataTypeFactory typeFactory) {
if (protoRowType != null) {
return protoRowType.apply(typeFactory);
}
if (rowType == null) {
rowType = CsvEnumerator.deduceRowType((JavaTypeFactory) typeFactory, source,
null);
}
return rowType;
}
public ListgetFieldTypes(RelDataTypeFactory typeFactory) {
if (fieldTypes == null) {
fieldTypes = new ArrayList<>();
CsvEnumerator.deduceRowType((JavaTypeFactory) typeFactory, source,
fieldTypes);
}
return fieldTypes;
}
public static RelDataType deduceRowType(JavaTypeFactory typeFactory,
Source source, @Nullable ListfieldTypes) {
final Listtypes = new ArrayList<>();
final Listnames = new ArrayList<>();
try (CSVReader reader = openCsv(source)) {
String[] strings = reader.readNext();
if (strings == null) {
strings = new String[]{"EmptyFileHasNoColumns:boolean"};
}
for (String string : strings) {
final String name;
final RelDataType fieldType;
final int colon = string.indexOf(':');
if (colon >= 0) {
name = string.substring(0, colon);
String typeString = string.substring(colon + 1);
Matcher decimalMatcher = DECIMAL_TYPE_PATTERN.matcher(typeString);
if (decimalMatcher.matches()) {
int precision = Integer.parseInt(decimalMatcher.group(1));
int scale = Integer.parseInt(decimalMatcher.group(2));
fieldType = parseDecimalSqlType(typeFactory, precision, scale);
} else {
switch (typeString) {
case "string":
fieldType = toNullableRelDataType(typeFactory, SqlTypeName.VARCHAR);
break;
case "boolean":
fieldType = toNullableRelDataType(typeFactory, SqlTypeName.BOOLEAN);
break;
case "byte":
fieldType = toNullableRelDataType(typeFactory, SqlTypeName.TINYINT);
break;
case "char":
fieldType = toNullableRelDataType(typeFactory, SqlTypeName.CHAR);
break;
case "short":
fieldType = toNullableRelDataType(typeFactory, SqlTypeName.SMALLINT);
break;
case "int":
fieldType = toNullableRelDataType(typeFactory, SqlTypeName.INTEGER);
break;
case "long":
fieldType = toNullableRelDataType(typeFactory, SqlTypeName.BIGINT);
break;
case "float":
fieldType = toNullableRelDataType(typeFactory, SqlTypeName.REAL);
break;
case "double":
fieldType = toNullableRelDataType(typeFactory, SqlTypeName.DOUBLE);
break;
case "date":
fieldType = toNullableRelDataType(typeFactory, SqlTypeName.DATE);
break;
case "timestamp":
fieldType = toNullableRelDataType(typeFactory, SqlTypeName.TIMESTAMP);
break;
case "time":
fieldType = toNullableRelDataType(typeFactory, SqlTypeName.TIME);
break;
default:
LOGGER.warn(
"Found unknown type: {} in file: {} for column: {}. Will assume the type of "
+ "column is string.",
typeString, source.path(), name);
fieldType = toNullableRelDataType(typeFactory, SqlTypeName.VARCHAR);
break;
}
}
} else {
name = string;
fieldType = typeFactory.createSqlType(SqlTypeName.VARCHAR);
}
names.add(name);
types.add(fieldType);
if (fieldTypes != null) {
fieldTypes.add(fieldType);
}
}
} catch (IOException e) {
}
if (names.isEmpty()) {
names.add("line");
types.add(typeFactory.createSqlType(SqlTypeName.VARCHAR));
}
return typeFactory.createStructType(Pair.zip(names, types));
}
}
获取文件中的数据,上面把Table的表结构字段名称和类型都获取到了以后,就剩最后一步了,获取文件中的数据。我们需要自定义一个类,实现 ScannableTable 接口,并且实现里面唯一的方法 scan 方法,其实本质上就是读文件,然后把文件的每一行的数据和上述获取的 fileType 进行匹配。
@Override
public Enumerable