六、HBASE原理介绍

大纲:

认识HBase

HBase架构

HBase读写流程

HBase使用场景

HBase的MapReduce

1、认识HBase

定义:

HBase是一个高可靠性、高性能、面向列、可伸缩的分布式存储系统,利用HBase技术可在廉价PC Server上搭建大规模结构化的存储集群。HBase的目标是存储并处理大型数据,具体来说是仅需使用普通的硬件配置,就能够处理由成千上万的行和列所组成的大型数据。

与MapReduce的离线批处理计算框架不同,HBase是一个可以随机访问的存储和检索数据平台,弥补了HDFS不能随机访问数据的缺陷,适合实时性要求不是非常高的业务场景。HBase存储的都是Byte数组,它不介意数据类型,允许动态、灵活的数据模型。

Hadoop 2.0生态系统中的各层结构

4

其中HBase位于结构化存储层,HDFS为HBase提供了高可靠性的底层存储支持, MapReduce为HBase提供了高性能的批处理能力,Zookeeper为HBase提供了稳定服务和failover机制,Pig和Hive为HBase提供了进行数据统计处理的高层语言支持,Sqoop则为HBase提供了便捷的RDBMS数据导入功能,使业务数据从传统数据库向HBase迁移变的非常方便。

设计思路:

HBase是一个分布式的数据库,使用Zookeeper管理集群,使用HDFS作为底层存储。在架构层面上由HMaster(Zookeeper选举产生的Leader)和多个HRegionServer组成,基本架构如下图所示:

5

在HBase的概念中,HRegionServer对应集群中的一个节点,一个HRegionServer负责管理多个HRegion,而一个HRegion代表一张表的一部分数据。在HBase中,一张表可能会需要很多个HRegion来存储数据,每个HRegion中的数据并不是杂乱无章的。HBase在管理HRegion的时候会给每个HRegion定义一个Rowkey的范围,落在特定范围内的数据将交给特定的Region,从而将负载分摊到多个节点,这样就充分利用了分布式的优点和特性。另外,HBase会自动调节Region所处的位置,如果一个HRegionServer过热,即大量的请求落在这个HRegionServer管理的HRegion上,HBase就会把HRegion移动到相对空闲的其它节点,依次保证集群环境被充分利用。

2、HBase架构

HBase由HMaster和HRegionServer组成,同样遵从主从服务器架构。HBase将逻辑上的表划分成多个数据块即HRegion,存储在HRegionServer中。HMaster负责管理所有的HRegionServer,它本身并不存储任何数据,而只是存储数据到HRegionServer的映射关系(元数据)。集群中的所有节点通过Zookeeper进行协调,并处理HBase运行期间可能遇到的各种问题。HBase的基本架构如下图所示:

1

Client

使用HBase的RPC机制与HMaster和HRegionServer进行通信,提交请求和获取结果。对于管理类操作,Client与HMaster进行RPC;对于数据读写类操作,Client与HRegionServer进行RPC。

Zookeeper

通过将集群各节点状态信息注册到Zookeeper中,使得HMaster可随时感知各个HRegionServer的健康状态,而且也能避免HMaster的单点问题。

HMaster

管理所有的HRegionServer,告诉其需要维护哪些HRegion,并监控所有HRegionServer的运行状态。当一个新的HRegionServer登录到HMaster时,HMaster会告诉它等待分配数据;而当某个HRegion死机时,HMaster会把它负责的所有HRegion标记为未分配,然后再把它们分配到其他HRegionServer中。HMaster没有单点问题,HBase可以启动多个HMaster,通过Zookeeper的选举机制保证集群中总有一个HMaster运行,从而提高了集群的可用性。

HRegion

当表的大小超过预设值的时候,HBase会自动将表划分为不同的区域,每个区域包含表中所有行的一个子集。对用户来说,每个表是一堆数据的集合,靠主键(RowKey)来区分。从物理上来说,一张表被拆分成了多块,每一块就是一个HRegion。我们用表名+开始/结束主键,来区分每一个HRegion,一个HRegion会保存一个表中某段连续的数据,一张完整的表数据是保存在多个HRegion中的。

HRegionServer

HBase中的所有数据从底层来说一般都是保存在HDFS中的,用户通过一系列HRegionServer获取这些数据。集群一个节点上一般只运行一个HRegionServer,且每一个区段的HRegion只会被一个HRegionServer维护。HRegionServer主要负责响应用户I/O请求,向HDFS文件系统读写数据,是HBase中最核心的模块。HRegionServer内部管理了一系列HRegion对象,每个HRegion对应了逻辑表中的一个连续数据段。HRegion由多个HStore组成,每个HStore对应了逻辑表中的一个列族的存储,可以看出每个列族其实就是一个集中的存储单元。因此,为了提高操作效率,最好将具备共同I/O特性的列放在一个列族中。

HStore

它是HBase存储的核心,由MemStore和StoreFiles两部分组成。MemStore是内存缓冲区,用户写入的数据首先会放入MemStore,当MemStore满了以后会Flush成一个StoreFile(底层实现是HFile),当StoreFile的文件数量增长到一定阈值后,会触发Compact合并操作,将多个StoreFiles合并成一个StoreFile,合并过程中会进行版本合并和数据删除操作。因此,可以看出HBase其实只有增加数据,所有的更新和删除操作都是在后续的Compact过程中进行的,这样使得用户的写操作只要进入内存就可以立即返回,保证了HBaseI/O的高性能。当StoreFiles Compact后,会逐步形成越来越大的StoreFile,当单个StoreFile大小超过一定阈值后,会触发Split操作,同时把当前的HRegion Split成2个HRegion,父HRegion会下线,新分出的2个子HRegion会被HMaster分配到相应的HRegionServer,使得原先1个HRegion的负载压力分流到2个HRegion上。

HLog

每个HRegionServer中都有一个HLog对象,它是一个实现了Write Ahead Log的预写日志类。在每次用户操作将数据写入MemStore的时候,也会写一份数据到HLog文件中,HLog文件会定期滚动刷新,并删除旧的文件(已持久化到StoreFile中的数据)。当HMaster通过Zookeeper感知到某个HRegionServer意外终止时,HMaster首先会处理遗留的 HLog文件,将其中不同HRegion的HLog数据进行拆分,分别放到相应HRegion的目录下,然后再将失效的HRegion重新分配,领取到这些HRegion的HRegionServer在加载 HRegion的过程中,会发现有历史HLog需要处理,因此会Replay HLog中的数据到MemStore中,然后Flush到StoreFiles,完成数据恢复。

ROOT表和META表

HBase的所有HRegion元数据被存储在.META.表中,随着HRegion的增多,.META.表中的数据也会增大,并分裂成多个新的HRegion。为了定位.META.表中各个HRegion的位置,把.META.表中所有HRegion的元数据保存在-ROOT-表中,最后由Zookeeper记录-ROOT-表的位置信息。 所有客户端访问用户数据前,需要首先访问Zookeeper获得-ROOT-的位置,然后访问-ROOT-表获得.META.表的位置,最后根据.META.表中的信息确定用户数据存放的位置,如下图所示:

6

-ROOT-表永远不会被分割,它只有一个HRegion,这样可以保证最多只需要三次跳转就可以定位任意一个HRegion。为了加快访问速度,.META.表的所有HRegion全部保存在内存中。客户端会将查询过的位置信息缓存起来,且缓存不会主动失效。如果客户端根据缓存信息还访问不到数据,则询问相关.META.表的Region服务器,试图获取数据的位置,如果还是失败,则询问-ROOT-表相关的.META.表在哪里。最后,如果前面的信息全部失效,则通过ZooKeeper重新定位HRegion的信息。所以如果客户端上的缓存全部是失效,则需要进行6次网络来回,才能定位到正确的HRegion。

逻辑模型:

HBase是一个类似于BigTable的分布式数据库,它是一个稀疏的长期存储的(存在HDFS上)、多维度的、排序的映射表。这张表的索引是行关键字、列关键字和时间戳。HBase的数据都是字符串,没有类型。

2

可以将一个表想象成一个大的映射关系,通过行键、行键+时间戳或行键+列(列族:列修饰符),就可以定位特定数据。由于HBase是稀疏存储数据的,所以某些列可以是空白的。上表给出了com.cnn.www网站的数据存放逻辑视图,表中仅有一行数据,行的唯一标识为“com.cnn.www”,对这行数据的每一次逻辑修改都有一个时间戳关联对应。表中共有四列:contents:html、anchor:cnnsi.com、anchor:my.look.ca、mime:type,每一列以前缀的方式给出其所属的列族。

行键(RowKey)是数据行在表中的唯一标识,并作为检索记录的主键。在HBase中访问表中的行只有三种方式:通过某个行键访问、给定行键的范围访问、全表扫描。行键可以是任意字符串(最大长度64KB)并按照字典序进行存储。对于那些经常一起读取的行,需要对键值精心设计,以便它们能放在一起存储。

3、HBase的读写流程

HRegionServer数据存储关系图

3

上文提到,HBase使用MemStore和StoreFile存储对表的更新。数据在更新时首先写入HLog和MemStore。MemStore中的数据是排序的,当MemStore累计到一定阈值时,就会创建一个新的MemStore,并且将老的MemStore添加到Flush队列,由单独的线程Flush到磁盘上,成为一个StoreFile。与此同时,系统会在Zookeeper中记录一个CheckPoint,表示这个时刻之前的数据变更已经持久化了。当系统出现意外时,可能导致MemStore中的数据丢失,此时使用HLog来恢复CheckPoint之后的数据。

StoreFile是只读的,一旦创建后就不可以再修改。因此Hbase的更新其实是不断追加的操作。当一个Store中的StoreFile达到一定阈值后,就会进行一次合并操作,将对同一个key的修改合并到一起,形成一个大的StoreFile。当StoreFile的大小达到一定阈值后,又会对 StoreFile进行切分操作,等分为两个StoreFile。

写操作流程

1. Client通过Zookeeper的调度,向HRegionServer发出写数据请求,在HRegion中写数据。

2. 数据被写入HRegion的MemStore,直到MemStore达到预设阈值。

3. MemStore中的数据被Flush成一个StoreFile。

4. 随着StoreFile文件的不断增多,当其数量增长到一定阈值后,触发Compact合并操作,将多个StoreFile合并成一个StoreFile,同时进行版本合并和数据删除。

5. StoreFiles通过不断的Compact合并操作,逐步形成越来越大的StoreFile。

6. 单个StoreFile大小超过一定阈值后,触发Split操作,把当前HRegion Split成2个新的HRegion。父HRegion会下线,新Split出的2个子HRegion会被HMaster分配到相应的HRegionServer 上,使得原先1个HRegion的压力得以分流到2个HRegion上。

读操作流程

1. client访问Zookeeper,查找-ROOT-表,获取.META.表信息。

2. 从.META.表查找,获取存放目标数据的HRegion信息,从而找到对应的HRegionServer。

3. 通过HRegionServer获取需要查找的数据。

4. HRegionserver的内存分为MemStore和BlockCache两部分,MemStore主要用于写数据,BlockCache主要用于读数据。读请求先到MemStore中查数据,查不到就到BlockCache中查,再查不到就会到StoreFile上读,并把读的结果放入BlockCache。

4、HBase使用场景

半结构化或非结构化数据

对于数据结构字段不够确定或杂乱无章,很难按一个概念去进行抽取的数据适合用HBase。如随着业务发展需要存储更多的字段时,RDBMS需要停机维护更改表结构,而HBase支持动态增加。

记录非常稀疏

RDBMS的行有多少列是固定的,为空的列浪费了存储空间。而HBase为空的列不会被存储,这样既节省了空间又提高了读性能。

多版本数据

根据RowKey和列标识符定位到的Value可以有任意数量的版本值(时间戳不同),因此对于需要存储变动历史记录的数据,用HBase将非常方便。

超大数据量

当数据量越来越大,RDBMS数据库撑不住了,就出现了读写分离策略,通过一个Master专门负责写操作,多个Slave负责读操作,服务器成本倍增。随着压力增加,Master撑不住了,这时就要分库了,把关联不大的数据分开部署,一些join查询不能用了,需要借助中间层。随着数据量的进一步增加,一个表的记录越来越大,查询就变得很慢,于是又得搞分表,比如按ID取模分成多个表以减少单个表的记录数。经历过这些事的人都知道过程是多么的折腾。采用HBase就简单了,只需要在集群中加入新的节点即可,HBase会自动水平切分扩展,跟Hadoop的无缝集成保障了数据的可靠性(HDFS)和海量数据分析的高性能(MapReduce)。

5、HBase的MapReduce

7

HBase中Table和Region的关系,有些类似HDFS中File和Block的关系。由于HBase提供了配套的与MapReduce进行交互的API如TableInputFormat和TableOutputFormat,可以将HBase的数据表直接作为Hadoop MapReduce的输入和输出,从而方便了MapReduce应用程序的开发,基本不需要关注HBase系统自身的处理细节。

七、HBase开发实例上

大纲:

演示HBase的put,delete,get,scan的java代码编写

使用eclipse创建一个项目,引入指定的jar包,其中这个过程也可以通过maven来构建项目,在pom.xml中添加hbase的依赖如下:

<repositories>
	<repository>
		<id>cloudera</id>
		<url>https://repository.cloudera.com/artifactory/cloudera-repos/</url>
	</repository>
</repositories>

<dependencies>
	<dependency>
		<groupId>junit</groupId>
		<artifactId>junit</artifactId>
		<version>3.8.1</version>
		<scope>test</scope>
	</dependency>
	<dependency>
		<groupId>org.apache.hadoop</groupId>
		<artifactId>hadoop-common</artifactId>
		<version>2.6.0-cdh5.7.1</version>
	</dependency>
	<dependency>
		<groupId>org.apache.hadoop</groupId>
		<artifactId>hadoop-hdfs</artifactId>
		<version>2.6.0-cdh5.7.1</version>
	</dependency>
	<dependency>
		<groupId>org.apache.hbase</groupId>
		<artifactId>hbase-client</artifactId>
		<version>1.2.0-cdh5.7.1</version>
	</dependency>
	<dependency>
		<groupId>org.apache.hbase</groupId>
		<artifactId>hbase-server</artifactId>
		<version>1.2.0-cdh5.7.1</version>
	</dependency>
</dependencies>

接着直接编写java代码

首先需要设置HBase的配置,如ZooKeeper的地址、端口号等等。可以通过org.apache.hadoop.conf.Configuration.set方法手工设置HBase的配置信息,也可以直接将HBase的hbase-site.xml配置文件引入项目即可。下面给出配置代码:

// 声明静态配置
private static Configuration conf = null;
static {
  conf = HBaseConfiguration.create();
  conf.set("hbase.zookeeper.quorum", "localhost");
  conf.set("hbase.zookeeper.property.clientPort", "2181");
}

HBase的常用操作包括建表、插入表数据、删除表数据、获取一行数据、表扫描、删除列族、删除表等等,下面给出具体代码。

// 创建数据库表
public static void createTable(String tableName, String[] columnFamilys) throws IOException {
  // 建立一个数据库的连接
  Connection conn = ConnectionFactory.createConnection(conf);
  // 创建一个数据库管理员
  HBaseAdmin hAdmin = (HBaseAdmin) conn.getAdmin();
  if (hAdmin.tableExists(tableName)) {
    System.out.println(tableName + "表已存在");
    conn.close();
    System.exit(0);
  } else {
    // 新建一个表描述
    HTableDescriptor tableDesc = new HTableDescriptor(TableName.valueOf(tableName));
    // 在表描述里添加列族
    for (String columnFamily : columnFamilys) {
      tableDesc.addFamily(new HColumnDescriptor(columnFamily));
    }
    // 根据配置好的表描述建表
    hAdmin.createTable(tableDesc);
    System.out.println("创建" + tableName + "表成功");
  }
  conn.close();
}
// 添加一条数据
public static void addRow(String tableName, String rowKey, String columnFamily, String column, String value)
throws IOException {
  // 建立一个数据库的连接
  Connection conn = ConnectionFactory.createConnection(conf);
  // 获取表
  HTable table = (HTable) conn.getTable(TableName.valueOf(tableName));
  // 通过rowkey创建一个put对象
  Put put = new Put(Bytes.toBytes(rowKey));
  // 在put对象中设置列族、列、值
  put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes(column), Bytes.toBytes(value));
  // 插入数据,可通过put(List<Put>)批量插入
  table.put(put);
  // 关闭资源
  table.close();
  conn.close();
}
// 通过rowkey获取一条数据
public static void getRow(String tableName, String rowKey) throws IOException {
  // 建立一个数据库的连接
  Connection conn = ConnectionFactory.createConnection(conf);
  // 获取表
  HTable table = (HTable) conn.getTable(TableName.valueOf(tableName));
  // 通过rowkey创建一个get对象
  Get get = new Get(Bytes.toBytes(rowKey));
  // 输出结果
  Result result = table.get(get);
  for (Cell cell : result.rawCells()) {
    System.out.println(
      "行键:" + new String(CellUtil.cloneRow(cell)) + "\t" +
      "列族:" + new String(CellUtil.cloneFamily(cell)) + "\t" +
      "列名:" + new String(CellUtil.cloneQualifier(cell)) + "\t" +
      "值:" + new String(CellUtil.cloneValue(cell)) + "\t" +
      "时间戳:" + cell.getTimestamp());
  }
  // 关闭资源
  table.close();
  conn.close();
}
// 全表扫描
public static void scanTable(String tableName) throws IOException {
  // 建立一个数据库的连接
  Connection conn = ConnectionFactory.createConnection(conf);
  // 获取表
  HTable table = (HTable) conn.getTable(TableName.valueOf(tableName));
  // 创建一个扫描对象
  Scan scan = new Scan();
  // 扫描全表输出结果
  ResultScanner results = table.getScanner(scan);
  for (Result result : results) {
    for (Cell cell : result.rawCells()) {
      System.out.println(
        "行键:" + new String(CellUtil.cloneRow(cell)) + "\t" +
        "列族:" + new String(CellUtil.cloneFamily(cell)) + "\t" +
        "列名:" + new String(CellUtil.cloneQualifier(cell)) + "\t" +
        "值:" + new String(CellUtil.cloneValue(cell)) + "\t" +
        "时间戳:" + cell.getTimestamp());
    }
  }
  // 关闭资源
  results.close();
  table.close();
  conn.close();
}
// 删除一条数据
public static void delRow(String tableName, String rowKey) throws IOException {
  // 建立一个数据库的连接
  Connection conn = ConnectionFactory.createConnection(conf);
  // 获取表
  HTable table = (HTable) conn.getTable(TableName.valueOf(tableName));
  // 删除数据
  Delete delete = new Delete(Bytes.toBytes(rowKey));
  table.delete(delete);
  // 关闭资源
  table.close();
  conn.close();
}
// 删除多条数据
public static void delRows(String tableName, String[] rows) throws IOException {
  // 建立一个数据库的连接
  Connection conn = ConnectionFactory.createConnection(conf);
  // 获取表
  HTable table = (HTable) conn.getTable(TableName.valueOf(tableName));
  // 删除多条数据
  List<Delete> list = new ArrayList<Delete>();
  for (String row : rows) {
    Delete delete = new Delete(Bytes.toBytes(row));
    list.add(delete);
  }
  table.delete(list);
  // 关闭资源
  table.close();
  conn.close();
}
// 删除列族
public static void delColumnFamily(String tableName, String columnFamily) throws IOException {
  // 建立一个数据库的连接
  Connection conn = ConnectionFactory.createConnection(conf);
  // 创建一个数据库管理员
  HBaseAdmin hAdmin = (HBaseAdmin) conn.getAdmin();
  // 删除一个表的指定列族
  hAdmin.deleteColumn(tableName, columnFamily);
  // 关闭资源
  conn.close();
}
// 删除数据库表
public static void deleteTable(String tableName) throws IOException {
  // 建立一个数据库的连接
  Connection conn = ConnectionFactory.createConnection(conf);
  // 创建一个数据库管理员
  HBaseAdmin hAdmin = (HBaseAdmin) conn.getAdmin();
  if (hAdmin.tableExists(tableName)) {
    // 失效表
    hAdmin.disableTable(tableName);
    // 删除表
    hAdmin.deleteTable(tableName);
    System.out.println("删除" + tableName + "表成功");
    conn.close();
  } else {
    System.out.println("需要删除的" + tableName + "表不存在");
    conn.close();
    System.exit(0);
  }
}

 

八、mapreduce开发实例中

大纲:

演示实例讲解

演示编写mapreduce实例

处理前的数据样例:

1

三个维度,第一个维度是名字,第二个维度是某人某人某天在某网站停留的时间,第三个维度是网站的名字。

处理后的数据样例:

2

编写过程:

在eclipse中建立工程,名为:mapreduce2

建立一个包,名为com.jkb.mapreduce

在包中创建一个类,名为UserWeb

主要代码如下:

public static class WebMap extends Mapper<Object, Text, Text, Text>{
	
	@Override
	public void map(Object key, Text value, Context context) throws IOException, InterruptedException{
		String[] line = value.toString().split("\t");
		if(line.length == 3){
			String name = line[0];
			String time = line[1];
			String infor = line[2];
			context.write(new Text(name + "\t" + time), new Text(time + "\t" + info));
		}
	}
}

public static classs Partition extends Partitione<Text, Text>{

	@Override
	public int getPartition(Text key, Text value, int number){
		String name = key.toString().split("\t")[0];
		int hash = name.hashCode();
		return Math.abs(hash % number);
	}
}

public static class Sort extends WritableComparator{
	
	protected Sort(){
		super(Text.class, true);
	}
	
	@Override
	public int compare(WritableComparable w1, WritalbeComparable w2){
		Text h1 = new Text(((Text)w1).toString().split("\t")[0]);
		Text h2 = new Text(((Text)w2).toString().split("\t")[0]);
		
		IntWritable M1 = new IntWritable(Integer.valueOf((Text)w1.toString().split("\t")[1]));
		IntWritable M2 = new IntWritable(Integer.valueOf((Text)w2.toString().split("\t")[1]));
		
		int R;
		if(h1.equals(h2)){
			R = M2.compareTo(M1);
		}else{
			R = h1.compareTo(h2);
		}
		return R;
	}
}

public static class Group extends WritableComparator{
	protected Group(){
		super(Text.class, true);
	}
	
	@Override
	public int compare(WritableComparable w1, WritalbeComparable w2){
		Text h1 = new Text(((Text)w1).toString().split("\t")[0]);
		Text h2 = new Text(((Text)w2).toString().split("\t")[0]);
		
		int R;
		if(h1.equals(h2)){
			R = 0;
		}else{
			R = h1.compareTo(h2);
		}
		
		return R;
		
	}
}

public static class WebReduce extends Reducer<Text, Text, IntWritable, Text>{
	
	@Override
	public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
		int count = 0;
		String name = key.toString().split("\t")[0];
		for(Text t : values){
			count++;
			StringBuffer buffer = new StringBuffer();
			buffer.append(name);
			buffer.append("\t");
			buffer.appedn(t.toString());
			context.write(new IntWritable(count), new Text(buffer.toString()));
		}
	}
}

public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
	if(args.length != 4){
		System.out.println("error");
		System.exit(0);
	}
	int SplitMB = Integer.valueOf(args[2]);
	String dst = args[0];
	String out = args[1];
	
	Configuration conf = new Configuration();
	conf.set("mapreduce.input.fileinputformat.split.maxsize", String.valueOf(SplitMB * 1024 * 1024));
	conf.set("mapred.min.split.size", String.valueOf(SplitMB * 1024 * 1024));
	conf.set("mapreduce.input.fileinputformat.split.minsize.per.node", String.valueOf(SplitMB * 1024 * 1024));
	conf.set("mapreduce.input.fileinputformat.split.minsize.per.rack", String.valueOf(SplitMB * 1024 * 1024));
	
	Job job = new Job(conf);
	FileInputFormat.addInputPath(job, new Path(dst));
	FileOutputFormat.setOutputPath(job, new Path(out));
	job.setNumReduceTasks(Integer.valueOf(args[3]));
	job.setPartitionerClass(Partition.class);
	job.setGroupingComparatorClass(Group.class);
	job.setSortCompartorClass(Sort.class);
	job.setMapperClass(WebMap.class);
	job.setReducerClass(WebReduce.class);
	job.setMapOutputKeyClass(Text.class);
	job.setMapOutputValueClass(Text.class);
	job.setOutputKeyClass(IntWritable.class);
	job.setOutputValueClass(Text.class);
	job.setJarByClass(UserWeb.class);
	job.waitForCompletion(true);
}

程序完成之后,打包为userweb.jar。上传至集群。

数据在 /edu/mr2

使用hadoop jar userweb.jar com.jkb.mapreduce.UserWeb /edu/mr2 /edu/out 128 1 执行。

 

如果觉得我的文章对你有用,请随意赞赏