博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
Kudu基本操作及概念
阅读量:4670 次
发布时间:2019-06-09

本文共 10468 字,大约阅读时间需要 34 分钟。

Kudu:

    针对 Apache Hadoop 平台而开发的列式存储管理器。
使用场景:
    适用于那些既有随机访问,也有批量数据扫描的复合场景。
    高计算量的场景。
    使用了高性能的存储设备,包括使用更多的内存。
    支持数据更新,避免数据反复迁移。
    支持跨地域的实时数据备份和查询。
    
kudu的关键机制:
1.模仿数据库,以二维表的形式组织数据,创建表的时候需要指定schema。所以只支持结构化数据。
2.每个表指定一个或多个主键。
3.支持insert/update/delete,这些修改操作全部要指定主键。
4.read操作,只支持scan原语。
5.一致性模型,默认支持snapshot ,这个可以保证scan和单个客户端 read-you-writes一致性保证。更强的一致性保证,提供manually propagate timestamps between clients或者commit-wait。
6.cluster类似hbase简单的M-S结构,master支持备份。
7.单个表支持水平分割,partitions叫tablets,单行一定在一个tablets里面,支持范围,以及list等更灵活的分区键。
8.使用Raft 协议,可以根据SLA指定备份块数量。
9.列式存储
10.delta flushes,数据先更新到内存中,最后在合并到最终存储中,有专门到后台进程负责。
11.Lazy Materialization ,对一些选择性谓词,可以帮助跳过很多不必要的数据。
12.支持和MR/SPARK/IMPALA等集成,支持Locality ,Columnar Projection ,Predicate pushdown 等。
注意:
1、建表的时候要求所有的tserver节点都活着
2、根据raft机制,允许(replication的副本数-)/ 2宕掉,集群还会正常运行,否则会报错找不到ip:7050(7050是rpc的通信端口号),需要注意一个问题,第一次运行的时候要保证集群处于正常状态下,也就是所有的服务都启动,如果运行过程中,允许(replication的副本数-)/ 2宕掉
3、读操作,只要有一台活着的情况下,就可以运行
maven 依赖:

org.apache.kudu
kudu-spark2_2.11
1.7.0
junit
junit
4.12
org.apache.spark
spark-core_2.11
2.3.0
org.apache.spark
spark-sql_2.11
2.3.0
org.apache.kudu
kudu-client
1.7.0
org.apache.kudu
kudu-client-tools
1.7.0

 

Java 代码:

import org.apache.kudu.ColumnSchema;import org.apache.kudu.Schema;import org.apache.kudu.Type;import org.apache.kudu.client.*;import org.apache.kudu.spark.kudu.KuduContext;import org.apache.spark.SparkConf;import org.apache.spark.SparkContext;import org.apache.spark.sql.Dataset;import org.apache.spark.sql.SparkSession;import org.apache.spark.sql.types.DataTypes;import org.apache.spark.sql.types.StructField;import org.apache.spark.sql.types.StructType;import org.junit.Test;import java.util.ArrayList;import java.util.Arrays;import java.util.List;/** * @Author:Xavier * @Data:2019-02-22 09:25 **/public class KuduOption {    // master地址    private static final String KUDU_MASTER = "nn02:7051";    private static String tableName = "KuduTest";    //创建表    @Test    public void CreateTab() {        // 创建kudu的数据库链接        KuduClient client = new KuduClient.KuduClientBuilder(KUDU_MASTER).build();        try {            // 设置表的schema(模式)            List
columns = new ArrayList(2); columns.add(new ColumnSchema.ColumnSchemaBuilder("key", Type.STRING).key(true).build()); columns.add(new ColumnSchema.ColumnSchemaBuilder("value", Type.STRING).build()); Schema schema = new Schema(columns); //创建表时提供的所有选项 CreateTableOptions options = new CreateTableOptions(); // 设置表的replica备份和分区规则 List
rangeKeys = new ArrayList<>(); rangeKeys.add("key"); // 一个replica options.setNumReplicas(1); // 用列rangeKeys做为分区的参照 options.setRangePartitionColumns(rangeKeys); client.createTable(tableName, schema, options); // 添加key的hash分区 //options.addHashPartitions(parcols, 3); } catch (Exception e) { e.printStackTrace(); } finally { try { client.shutdown(); } catch (Exception e) { e.printStackTrace(); } } } //向表内插入新数据 @Test public void InsertData() { // 创建kudu的数据库链接 KuduClient client = new KuduClient.KuduClientBuilder(KUDU_MASTER).build(); try { // 打开表 KuduTable table = client.openTable(tableName); // 创建写session,kudu必须通过session写入 KuduSession session = client.newSession(); // 采取Flush方式 手动刷新 session.setFlushMode(SessionConfiguration.FlushMode.MANUAL_FLUSH); session.setMutationBufferSpace(3000); System.out.println("-------start--------" + System.currentTimeMillis()); for (int i = 1; i < 6100; i++) { Insert insert = table.newInsert(); // 设置字段内容 PartialRow row = insert.getRow(); row.addString("key", i+""); row.addString(1, "value"+i); session.flush(); session.apply(insert); } System.out.println("-------end--------" + System.currentTimeMillis()); } catch (Exception e) { e.printStackTrace(); } finally { try { client.shutdown(); } catch (Exception e) { e.printStackTrace(); } } } //更新数据 @Test public void kuduUpdateTest() { // 创建kudu的数据库链接 KuduClient client = new KuduClient.KuduClientBuilder(KUDU_MASTER).build(); try { KuduTable table = client.openTable(tableName); KuduSession session = client.newSession(); Update update = table.newUpdate(); PartialRow row = update.getRow(); // row.addString("key", 998 + ""); row.addString("value", "updata Data " + 10); session.flush(); session.apply(update);// System.out.print(operationResponse.getRowError()); } catch (Exception e) { e.printStackTrace(); } finally { try { client.shutdown(); } catch (Exception e) { e.printStackTrace(); } } } //根据主键删除数据 @Test public void deleteData(){ KuduClient client=new KuduClient.KuduClientBuilder(KUDU_MASTER).build(); try { KuduTable table=client.openTable(tableName); KuduSession session=client.newSession(); Delete delete=table.newDelete(); PartialRow row=delete.getRow(); row.addString("key","992"); session.apply(delete); } catch (KuduException e) { e.printStackTrace(); } } //扫描数据 @Test public void SearchData() { // 创建kudu的数据库链接 KuduClient client = new KuduClient.KuduClientBuilder(KUDU_MASTER).build(); try { KuduTable table = client.openTable(tableName); List
projectColumns = new ArrayList<>(1); projectColumns.add("value"); KuduScanner scanner = client.newScannerBuilder(table) .setProjectedColumnNames(projectColumns) .build(); while (scanner.hasMoreRows()) { RowResultIterator results = scanner.nextRows(); while (results.hasNext()) { RowResult result = results.next(); System.out.println(result.getString(0)); } } } catch (Exception e) { e.printStackTrace(); } finally { try { client.shutdown(); } catch (Exception e) { e.printStackTrace(); } } } //条件扫描数据 @Test public void searchDataByCondition(){ KuduClient client =new KuduClient.KuduClientBuilder(KUDU_MASTER).build(); try { KuduTable table=client.openTable(tableName); KuduScanner.KuduScannerBuilder scannerBuilder=client.newScannerBuilder(table); //设置搜索的条件 KuduPredicate predicate=KuduPredicate. newComparisonPredicate( table.getSchema().getColumn("key"),//设置要值的谓词(字段) KuduPredicate.ComparisonOp.EQUAL,//设置搜索逻辑 "991");//设置搜索条件值 scannerBuilder.addPredicate(predicate); // 开始扫描 KuduScanner scanner=scannerBuilder.build(); while(scanner.hasMoreRows()){ RowResultIterator iterator=scanner.nextRows(); while(iterator.hasNext()){ RowResult result=iterator.next(); System.out.println("输出: "+result.getString(0)+"--"+result.getString("value")); } } } catch (KuduException e) { e.printStackTrace(); } } //删除表 @Test public void DelTab() { KuduClient client = new KuduClient.KuduClientBuilder(KUDU_MASTER).build(); try { client.deleteTable(tableName); } catch (Exception e) { e.printStackTrace(); } finally { try { client.shutdown(); } catch (Exception e) { e.printStackTrace(); } } } // @Test public void searchBysparkSql() { SparkSession sparkSession = getSparkSession(); List
fields = Arrays.asList( DataTypes.createStructField("key", DataTypes.StringType, true), DataTypes.createStructField("value", DataTypes.StringType, true)); StructType schema = DataTypes.createStructType(fields); Dataset ds = sparkSession.read().format("org.apache.kudu.spark.kudu"). schema(schema).option("kudu.master", "nn02:7051").option("kudu.table", "KuduTest").load(); ds.registerTempTable("abc"); sparkSession.sql("select * from abc").show(); } @Test public void checkTableExistByKuduContext() { SparkSession sparkSession = getSparkSession(); KuduContext context = new KuduContext("172.19.224.213:7051", sparkSession.sparkContext()); System.out.println(tableName + " is exist = " + context.tableExists(tableName)); } public SparkSession getSparkSession() { SparkConf conf = new SparkConf().setAppName("test") .setMaster("local[*]") .set("spark.driver.userClassPathFirst", "true"); conf.set("spark.sql.crossJoin.enabled", "true"); SparkContext sparkContext = new SparkContext(conf); SparkSession sparkSession = SparkSession.builder().sparkContext(sparkContext).getOrCreate(); return sparkSession; }}

 

转载于:https://www.cnblogs.com/xavier-xd/p/10417805.html

你可能感兴趣的文章
Nginx 常用命令总结
查看>>
hall wrong behavior
查看>>
Markdown test
查看>>
Collection集合
查看>>
int最大值+1为什么是-2147483648最小值-1为什么是2147483647
查看>>
【C++】const在不同位置修饰指针变量
查看>>
github新项目挂历模式
查看>>
编写jquery插件
查看>>
敏捷开发笔记
查看>>
神秘海域:顶级工作室“顽皮狗”成长史(下)
查看>>
C++指针、引用知多少?
查看>>
services 系统服务的启动、停止、卸载
查看>>
Fiddler 网页采集抓包利器__手机app抓包
查看>>
Number and String
查看>>
java中的值传递和引用传递2<原文:http://blog.csdn.net/niuniu20008/article/details/2953785>...
查看>>
css实现背景图片模糊
查看>>
什么是runtime?什么是webgl?
查看>>
秋季学习总结
查看>>
categorical_crossentropy VS. sparse_categorical_crossentropy
查看>>
强引用,弱引用,4种Java引用浅解(涉及jvm垃圾回收)
查看>>