0%

1. 新建仓库

上 github 新建一个仓库,仓库名必须为 .github.io 格式,其中 是你 github 的昵称。不要问我为什么,问我我也不想回答你。

2. 全局安装hexo

打开命令行,输入下面的命令,全局安装 hexo。

1
npm install -g hexo

3. 初始化项目

Just one 命令,hexo init,就会自动构建一个 hexo 项目,紧接着执行 hexo s,浏览器访问 localhost:4000 就可以看到效果啦!过程一定要快,凉了就不好吃了!

1
2
3
4
hexo init

// 本地运行
hexo s

4. 部署到github

快速在项目根目录下找到 _congif.yml,找到 deploy 字段并填写完整,下面请开始你上学时最擅长的事情:抄写!

1
2
3
4
5
6
# Deployment
## Docs: https://hexo.io/docs/deployment.html
deploy:
type: git
repo: <你的仓库地址> # https://github.com/TJ-XiaJiaHao/TJ-XiaJiaHao.github.io
branch: master

抄写完毕!但是!我们需要额外的一个工具来帮助我们推到仓库上,那就是!那就是!那就是 hexo-deployer-git。搞它!

1
npm install hexo-deployer-git --save

万事俱备,只欠东南风!执行下面两个命令,就可以把项目自动部署到 github 上。
这俩命令顺序一定不能错

1
2
3
hexo clean

hexo deploy
  1. 查看效果

浏览器访问:https://lujianyun06.github.io/ 即可看到效果。
至此,已经完成了个人博客的搭建,也可以在浏览器中访问,下面将介绍一些常用的野外生存技能。

如何创建新文章

1
2
3
4
// [layout] 为布局,可选项为 `post`、`page`、`draft`,这将决定文章所在文件路径。
// <title> 为文章标题
// 如 hexo new post 除了帅气,我还有啥!
hexo new [layout] <title>

更换主题皮肤
风格不喜欢?换之。

更换主题流程:下载主题 -> 配置主题,以 xoxo 为例

下载到themes文件夹下

git clone https://github.com/KevinOfNeu/hexo-theme-xoxo xoxo

修改 _config.yml 配置

theme: xoxo
部署优化
每次都要执行 hexo clean 和 hexo deploy,不如写个新的脚本

1
2
3
// package.json
"dev": "hexo s",
"build": "hexo clean && hexo deploy"

部署命令

1
npm run build

如何引用本地图片?

首先在source下创建images page文件夹,必须用以下命令创建,否则没有index.md, 图片会显示错误

1
hexo new page images

然后把图片img1.png放到 hexo_dir/source/images/ 中,然后可以直接在文章中用

1
![图片的alt](/images/img1.png 'title')

来使用

》如果没有交换机,那么routekey就必须是队列名,否则通道不知道要把消息传到哪个队列中去

》RabbitMQ 默认将消息顺序发送给下一个消费者,这样,每个消费者会得到相同数量的消息。即轮询(round-robin)分发消息。

假如信息a已经被发送给了消费者1,那么一定会等到信息a的ack返回后,才会把信息b发送给消费者2(即下一个消费者),不管消费者1空闲与否或者其他消费者空闲与否,这里应该叫“轮流,顺上来”的概念
即默认情况下,当一个信息被发送给一个消费者,那么下一个信息一定会被发送给下一个消费者,而不是其他消费者。
当然,只有1个消费者时,由于它自己就是下一个消费者,那就一直是它获得消息
怎样才能做到按照每个消费者的能力分配消息呢?联合使用 Qos 和 Acknowledge 就可以做到。
basicQos 方法设置了当前信道最大预获取(prefetch)消息数量为1。消息从队列异步推送给消费者,消费者的 ack 也是异步发送给队列,从队列的视角去看,总是会有一批消息已推送但尚未获得 ack 确认,Qos 的 prefetchCount 参数就是用来限制这批未确认消息数量的。设为1时,队列只有在收到消费者发回的上一条消息 ack 确认后,才会向该消费者发送下一条消息。prefetchCount 的默认值为0,即没有限制,队列会将所有消息尽快发给消费者。

  1. 轮询分发 :使用任务队列的优点之一就是可以轻易的并行工作。如果我们积压了好多工作,我们可以通过增加工作者(消费者)来解决这一问题,使得系统的伸缩性更加容易。在默认情况下,RabbitMQ将逐个发送消息到在序列中的下一个消费者(而不考虑每个任务的时长等等,且是提前一次性分配,并非一个一个分配)。平均每个消费者获得相同数量的消息。这种方式分发消息机制称为Round-Robin(轮询)。

  2. 公平分发 :虽然上面的分配法方式也还行,但是有个问题就是:比如:现在有2个消费者,所有的奇数的消息都是繁忙的,而偶数则是轻松的。按照轮询的方式,奇数的任务交给了第一个消费者,所以一直在忙个不停。偶数的任务交给另一个消费者,则立即完成任务,然后闲得不行。而RabbitMQ则是不了解这些的。这是因为当消息进入队列,RabbitMQ就会分派消息。它不看消费者为应答的数目,只是盲目的将消息发给轮询指定的消费者。

为了解决这个问题,我们使用basicQos( prefetchCount = 1)方法,来限制RabbitMQ只发不超过1条的消息给同一个消费者。当消息处理完毕后,有了反馈,才会进行第二次发送。
还有一点需要注意,使用非公平分发,必须关闭自动应答,改为手动应答。

1
2
3
4
5
6
7
8
9
10
// 同一时刻服务器只会发一条消息给消费者
channel.basicQos(1);

//开启这行 表示使用手动确认模式
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);

同时改为手动确认:
// 监听队列,false表示手动返回完成状态,true表示自动
channel.basicConsume(QUEUE_NAME, false, consumer);
这样的话,消费快的消费者能得到更多的消息

》如果交换机设置为fanout类型,那么routekey是不会起作用的,交换机一定会把它收到的所有消息都发给所有与它绑定的队列,只有当类型为topic或direct时才会起作用

》消息无法由exchange路由到合适的队列的处理方法(这种情况一定是direct或topic,fanout不存在这种问题)

》1.设置mandatory参数为true

上一篇文章中我们知道,生产者将消息发送到RabbitMQ的交换器中通过RoutingKey与BindingKey的匹配将之路由到具体的队列中以供消费者消费。那么当我们通过匹配规则找不到队列的时候,消息将何去何从呢?Rabbit给我们提供了两种方式。mandatory与备份交换器。
mandatory参数是channel.BasicPublish方法中的参数。其主要功能是消息传递过程中不可达目的地时将消息返回给生产者。当mandatory 参数设为true 时,交换器无法根据自身的类型和路由键找到一个符合条件的队列,那么RabbitMQ 会调用BasicReturn 命令将消息返回给生产者。当mandatory 参数设置为false 时。则消息直接被丢弃。其运转流程与实现代码如下(以C# RabbitMQ.Client 3.6.9为例):
注意,这和死信队列没关系,死信是已经在队列中的内容过期后的处理措施,这个mandatory是交换器把这个消息路由不到任何消息队列的处理方法。
1
2
3
4
5
6
7
8
9
10
String message = "hello world";
channel.basicPublish("eee", "", true, properties, message.getBytes());
System.out.println("[x] Sent '" + message + "'");

channel.addReturnListener(new ReturnListener() {
public void handleReturn(int replyCode, String replyText, String exchange, String routingKey, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("replyCode="+replyCode + " replyText="+replyText+" exchange="+exchange + " routeKey="+routingKey+" body="+new String(body)); //#1
}
});
注意,由于是异步的,如果主线程先结束了,#1处的代码可能还没执行就整体结束了。

》设置备份exchange

当消息不能路由到队列时,通过mandatory设置参数,我们可以将消息返回给生产者处理。但这样会有一个问题,就是生产者需要开一个回调的函数来处理不能路由到的消息,这无疑会增加生产者的处理逻辑。备份交换器(Altemate Exchange)则提供了另一种方式来处理不能路由的消息。备份交换器可以将未被路由的消息存储在RabbitMQ中,在需要的时候去处理这些消息。其主要实现代码如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
Map<String, Object> arguments = new HashMap<String, Object>(16);
arguments.put("alternate-exchange", "backup");
//普通交换器和普通队列
channel.exchangeDeclare("normal", "direct", true, false, arguments);
channel.queueDeclare("q1", false, false, false, null);
channel.queueBind("q1", "normal", "r1");

//备份交换器和备份队列
channel.exchangeDeclare("backup", "fanout", true, false, null);
channel.queueDeclare("qb", false, false, false, null);
channel.queueBind("qb", "backup", "");

channel.basicPublish("normal", "", null, "abcdef".getBytes());

备份交换器其实和普通的交换器没有太大的区别,为了方便使用,建议设置为fanout类型,若设置为direct 或者topic的类型。需要注意的是,消息被重新发送到备份交换器时的路由键和从生产者发出的路由键是一样的。考虑这样一种情况,如果备份交换器的类型是direct,并且有一个与其绑定的队列,假设绑定的路由键是key1,当某条携带路由键为key2 的消息被转发到这个备份交换器的时候,备份交换器没有匹配到合适的队列,则消息丢失。如果消息携带的路由键为key1,则可以存储到队列中。
对于备份交换器,有以下几种特殊情况:

  • 如果设置的备份交换器不存在,客户端和RabbitMQ 服务端都不会有异常出现,此时消息会丢失。
  • 如果备份交换器没有绑定任何队列,客户端和RabbitMQ 服务端都不会有异常出现,此时消息会丢失。
  • 如果备份交换器没有任何匹配的队列,客户端和RabbitMQ 服务端都不会有异常出现,此时消息会丢失。
  • 如果备份交换器和mandatory参数一起使用,那么mandatory参数无效。

》设置消息的TTL:

目前有两种方法可以设置消息的TTL。第一种方法是通过队列属性设置,队列中所有消息都有相同的过期时间。第二种方法是对消息本身进行单独设置,每条消息的TTL可以不同。如果两种方法一起使用,则消息的TTL 以两者之间较小的那个数值为准。消息在队列中的生存时间一旦超过设置的TTL值时,就会变成"死信" (Dead Message) ,消费者将无法再收到该消息。(有关死信队列请往下看)
1
2
3
4
5
6
7
8
9
10
11
12
方法一:给队列参数上加上x-message-ttl属性,这样所有进入该队列的消息都会有统一的过期时间
Map<String, Object> arguments = new HashMap<String, Object>(16);
arguments.put("x-message-ttl", 10000);
arguments.put("x-max-priority", 10);
channel.exchangeDeclare("eee", "fanout", true, false, null);
channel.queueDeclare(QUEUE_NAME, false, false, false, arguments);
方法二:给消息上加上属性:
AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder()
.expiration("20000") //表示20000ms
.build();
String message = "hello world";
channel.basicPublish("eee", "", properties, message.getBytes());

注意:对于第一种设置队列TTL属性的方法,一旦消息过期,就会从队列中抹去,而在第二种方法中,即使消息过期,也不会马上从队列中抹去,因为每条消息是否过期是在即将投递到消费者之前判定的。Why?在第一种方法里,队列中己过期的消息肯定在队列头部, RabbitMQ 只要定期从队头开始扫描是否有过期的消息即可。而第二种方法里,每条消息的过期时间不同,如果要删除所有过期消息势必要扫描整个队列,所以不如等到此消息即将被消费时再判定是否过期,如果过期再进行删除即可。

》设置队列的TTL

注意,这里和上述通过队列设置消息的TTL不同。上面删除的是消息,而这里删除的是队列。通过channel.queueDeclare 方法中的x-expires参数可以控制队列被自动删除前处于未使用状态的时间。这个未使用的意思是队列上没有任何的消费者,队列也没有被重新声明,并且在过期时间段内也未调用过channel.basicGet命令。
设置队列里的TTL可以应用于类似RPC方式的回复队列,在RPC中,许多队列会被创建出来,但是却是未被使用的(有关RabbitMQ实现RPC请往下看)。RabbitMQ会确保在过期时间到达后将队列删除,但是不保障删除的动作有多及时。在RabbitMQ 重启后, 持久化的队列的过期时间会被重新计算。用于表示过期时间的x-expires参数以毫秒为单位, 井且服从和x-message-ttl一样的约束条件,不同的是它不能设置为0(会报错)。
1
2
3
4
Map<String, Object> arguments = new HashMap<String, Object>(16);
arguments.put("x-expires", 2000);
channel.exchangeDeclare("eee", "fanout", true, false, null);
channel.queueDeclare(QUEUE_NAME, false, false, false, arguments);

》死信队列

DLX(Dead-Letter-Exchange)死信交换器,当消息在一个队列中变成死信之后,它能被重新被发送到另一个交换器中,这个交换器就是DLX ,绑定DLX的队列就称之为死信队列。

消息变成死信主要有以下几种情况:

  • 消息被拒绝(BasicReject/BasicNack) ,井且设置requeue 参数为false;
  • 消息过期;
  • 队列达到最大长度。
    DLX也是一个正常的交换器,和一般的交换器没有区别,它能在任何的队列上被指定,实际上就是设置某个队列的属性。当这个队列中存在死信时,RabbitMQ 就会自动地将这个消息重新发布到设置的DLX上去,进而被路由到另一个队列,即死信队列。可以监听这个队列中的消息、以进行相应的处理。
    通过在channel.queueDeclare 方法中设置x-dead-letter-exchange参数来为这个队列添加DLX。其示例代码如下:

》对于队列参数的设置,map的内容必须是在声明队列之前就有了,不能设置了arguments后再往map中放东西

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
Map<String, Object> arguments = new HashMap<String, Object>(16);
//死信队列配置 ----------------
String dlxExchangeName = "dlx.exchange";
String dlxQueueName = "dlx.queue";
String dlxRoutingKey = "#";

// 为队列设置队列交换器
arguments.put("x-dead-letter-exchange", dlxExchangeName); //只能设置死信交换器,不能直接把死信队列与其他队列绑定
// 设置队列中的消息 3s 钟后过期
arguments.put("x-message-ttl", 3000);
channel.exchangeDeclare("eee", "fanout", true, false, null);
channel.queueDeclare(QUEUE_NAME, true, false, false, arguments);
//给arguments中设置内容不能放在这句后面不然参数不起作用
channel.queueBind(QUEUE_NAME, "eee", "”);

// 创建死信交换器和队列
channel.exchangeDeclare(dlxExchangeName, "topic", true, false, null);
channel.queueDeclare(dlxQueueName, true, false, false, null);
channel.queueBind(dlxQueueName, dlxExchangeName, "#");

》必须有一个交换器与之绑定。死信队列一定要持久化,但设置死信队列的队列不一定要持久化**

死信队列和死信交换器的声明和普通队列,交换器没什么区别(除了一定要持久化),表明它是死信交换器的位置是把它的名字设置为一个其他队列的“x-dead-letter-exchange” 参数

》可以用过期队列+死信队列 来模拟延迟队列**

生产者将消息发送到过期时间为n的队列中,这个队列并未有消费者来消费消息,当过期时间到达时,消息会通过死信交换器被转发到死信队列中。而消费者从死信队列中消费消息。这个时候就达到了生产者发布了消息在讲过了n时间后消费者消费了消息,起到了延迟消费的作用。
 延迟队列在我们的项目中可以应用于很多场景,如:下单后两个消息取消订单,七天自动收货,七天自动好评,密码冻结后24小时解冻,以及在分布式系统中消息补偿机制(1s后补偿,10s后补偿,5m后补偿......)。

》优先级队列:

就像我们生活中的“特殊”人士一样,我们的业务上也存在一些“特殊”消息,可能需要优先进行处理,在生活上我们可能会对这部分特殊人士开辟一套VIP通道,而Rabbit同样也有这样的VIP通道(前提是在3.5的版本以后),即优先级队列,队列中的消息会有优先级。优先级高的消息具备优先被消费的特权。针对这些VIP消息,我们只需做两件事:
我们只需做两件事情:
1.将队列声明为优先级队列,即在创建队列的时候添加参数 x-max-priority 以指定最大的优先级,值为0-255(整数)。
2.为优先级消息添加优先级。
其示例代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
channel.exchangeDeclare("exchange.priority", "direct", true);//定义交换器
Map<String, Object> arguments = new HashMap<String, Object>(16);
// 为队列设置队列交换器
arguments.put("x-dead-letter-exchange", dlxExchangeName);
// 设置队列中的消息 ms 钟后过期
arguments.put("x-message-ttl", 10000);
arguments.put("x-max-priority", 10);args.Add("x-max-priority", 10);//定义优先级队列的最大优先级为10
channel.queueDeclare("queue.priority", true, false, false, args);//定义优先级队列
channel.queueBind("queue.priority", "exchange.priority", "priorityKey");//队列交换器绑定
AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder()
.priority(8)
.build();
var message = “testMsg8";
//发布消息
channel.BasicPublish("exchange.priority", "priorityKey", properties, message);

注意:没有指定优先级的消息会将优先级以0对待。 对于超过优先级队列所定最大优先级的消息,优先级以最大优先级对待。对于相同优先级的消息,后进的排在前面。如果在消费者的消费速度大于生产者的速度且Broker 中没有消息堆积的情况下, 对发送的消息设置优先级也就没有什么实际意义。因为生产者刚发送完一条消息就被消费者消费了,那么就相当于Broker 中至多只有一条消息,对于单条消息来说优先级是没有什么意义的。
关于优先级队列,好像违背了队列这种数据结构先进先出的原则,其具体是怎么实现的在这里就不过多讨论。

何时创建队列
从前面的文章我们知道,RabbitMQ可以选择在生产者创建队列,也可以在消费者端创建队列,也可以提前创建好队列,而生产者消费者直接使用即可。
RabbitMQ的消息存储在队列中,交换器的使用并不真正耗费服务器的性能,而队列会。如在实际业务应用中,需要对所创建的队列的流量、内存占用及网卡占用有一个清晰的认知,预估其平均值和峰值,以便在固定硬件资源的情况下能够进行合理有效的分配。
按照RabbitMQ官方建议,生产者和消费者都应该尝试创建(这里指声明操作)队列。这虽然是一个很好的建议,但是在我看来这个时间上没有最好的方案,只有最适合的方案。我们往往需要结合业务、资源等方面在各种方案里面选择一个最适合我们的方案。
如果业务本身在架构设计之初己经充分地预估了队列的使用情况,完全可以在业务程序上线之前在服务器上创建好(比如通过页面管理、RabbitMQ命令或者更好的是从配置中心下发),这样业务程序也可以免去声明的过程,直接使用即可。预先创建好资源还有一个好处是,可以确保交换器和队列之间正确地绑定匹配。很多时候,由于人为因素、代码缺陷等,发送消息的交换器并没有绑定任何队列,那么消息将会丢失:或者交换器绑定了某个队列,但是发送消息时的路由键无法与现存的队列匹配,那么消息也会丢失。当然可以配合mandatory参数或者备份交换器(关于mandatory参数的使用详细可参考我的上一篇文章) 来提高程序的健壮性。与此同时,预估好队列的使用情况非常重要,如果在后期运行过程中超过预定的阈值,可以根据实际情况对当前集群进行扩容或者将相应的队列迁移到其他集群。迁移的过程也可以对业务程序完全透明。此种方法也更有利于开发和运维分工,便于相应资源的管理。如果集群资源充足,而即将使用的队列所占用的资源又在可控的范围之内,为了增加业务程序的灵活性,也完全可以在业务程序中声明队列。至于是使用预先分配创建资源的静态方式还是动态的创建方式,需要从业务逻辑本身、公司运维体系和公司硬件资源等方面考虑。
也就是说,如果预先创建好了队列或者交换器,就不用再queueDeclare或exchangeDeclare了,直接用即可。

》持久化策略

持久化及策略#
作为一个内存中间件,在保证了速度的情况下,不可避免存在如内存数据库同样的问题,即丢失问题。持久化可以提高RabbitMQ 的可靠性,以防在异常情况(重启、关闭、宕机等)下的数据丢失。RabbitMQ的持久化分为三个部分:交换器的持久化、队列的持久化和消息的持久化。

  1. 交换器的持久化
    交换器的持久化是通过在声明队列是将durable 参数置为true 实现的(该参数默认为false)。如果交换器不设置持久化,那么在RabbitMQ 服务重启之后,相关的交换器元数据会丢失,不过消息不会丢失,只是不能将消息发送到这个交换器中了。对一个长期使用的交换器来说,建议将其置为持久化的。
  2. 队列的持久化
    队列的持久化是通过在声明队列时将durable 参数置为true 实现的(该参数默认为false),如果队列不设置持久化,那么在RabbitMQ 服务重启之后,相关队列的元数据会丢失,此时数据也会丢失。正所谓”皮之不存,毛将焉附”,队列都没有了,消息又能存在哪里呢?
  3. 消息的持久化
    队列的持久化能保证其本身的元数据不会因异常情况而丢失,但是并不能保证内部所存储的消息不会丢失。要确保消息不会丢失,需要将其设置为持久化。通过将消息的投递模式(BasicProperties中的DeliveryMode属性)设置为2即可实现消息的持久化。
    因此,消息如果要想在Rabbit重启、关闭、宕机时能够恢复,需要做到以下三点:
  • 把消息的投递模式设置为2
  • 发送到持久化的交换器
  • 到达持久化的队列
    注意:RabbitMQ 确保持久化消息能从服务器重启中恢复的方式是将它们写入磁盘上的一个持久化日志文件中。当发布一条持久化消息到持久化交换器时,Rabbit会在日志提交到日志文件后才发送响应(开启生产者确认机制)。之后,如果消息到了非持久化队列,它会自动从日志文件中删除,并且无法在服务器重启后恢复。因此单单只设置队列持久化,重启之后消息会丢失;单单只设置消息的持久化,重启之后队列消失,继而消息也丢失。单单设置消息持久化而不设置队列的持久化是毫无意义的。当从持久化队列中消费了消息后(并且确认后),RabbitMQ会在持久化日志中把这条消息标记为等待垃圾收集。而在消费持久化消息之前,若RabbitMQ服务器重启,会自动重建交换器、队列以及绑定,重播持久化日志文件中的消息到合适的队列或者交换器上(取决于宕机时,消息处在路由的哪个环节)。
    为了保障消息不会丢失,也许我们可以简单粗暴的将所有的消息标记为持久化,但这样我们会付出性能的代价。写入磁盘的速度比写入内存的速度慢得不只一点点。对于可靠性不是那么高的消息可以不采用持久化处理以提高整体的吞吐量。在选择是否要将消息持久化时,需要在可靠性和吐吞量之间做一个权衡。
    将交换器、队列、消息都设置了持久化之后就能百分之百保证数据不丢失了吗?
  • 从消费者来说,如果在订阅消费队列时将noAck参数设置为true ,那么当消费者接收到相关消息之后,还没来得及处理就宕机了,这样也算数据丢失。
  • 在持久化的消息正确存入RabbitMQ 之后,还需要有一段时间(虽然很短,但是不可忽视〉才能存入磁盘之中。RabbitMQ 并不会为每条消息都进行同步存盘的处理,可能仅仅保存到操作系统缓存之中而不是物理磁盘之中。如果在这段时间内RabbitMQ 服务节点发生了岩机、重启等异常情况,消息保存还没来得及落盘,那么这些消息将会丢失。
    关于第一个问题,可以通过消费者确认机制来解决。而第二个问题可以通过生产者确认机制来解决,也可以使用镜像队列机制(镜像队列机制,将在运维篇总结)。生产者确认消费者确认请往下看。

》生产者确认机制

上文我们知道,在使用RabbitMQ的时候,可以通过消息持久化操作来解决因为服务器的异常崩溃而导致的消息丢失,除此之外,我们还会遇到一个问题,当消息的生产者将消息发送出去之后,消息到底有没有正确地到达服务器呢?如果不进行特殊配置,默认情况下发送消息的操作是不会返回任何信息给生产者的,也就是默认情况下生产者是不知道消息有没有正确地到达服务器。如果在消息到达服务器之前己经丢失,持久化操作也解决不了这个问题,因为消息根本没有到达服务器,何谈持久化?
RabbitMQ针对这个问题,提供了两种解决方式:
  • 通过事务机制实现:
  • 通过发送方确认(publisher confirm)机制实现。

1.RabbitMQ 事务机制#

RabbitMQ 客户端中与事务机制相关的方法有三个:channel.txSelect(用于将当前信道设置为事务模式);channel.txCommit(用于提交事务),channel.txRollback(用于回滚事务)。在通过channel.txSelect方法开启事务之后,我们便可以发布消息给RabbitMQ了,如果事务提交成功,则消息一定到达了RabbitMQ 中,如果在事务提交执行之前由于RabbitMQ异常崩溃或者其他原因抛出异常,这个时候我们便可以将其捕获,进而通过执行channel.txRollback方法来实现事务回滚。示例代码如下所示:(如果在事务提交后异常,则就算执行rollback也不能回滚了)
1
2
3
4
5
6
7
8
channel.txSelect();
try {
channel.basicPublish("eee", "r1", true, null, "dasd12*(!@*#!".getBytes());
channel.txCommit();
}catch (Exception e){
System.out.println("rollback");
channel.txRollback();
}
事务确实能够解决消息发送方和RabbitMQ之间消息确认的问题,只有消息成功被RabbitMQ接收,事务才能提交成功,否则便可在捕获异常之后进行事务回滚,与此同时可以进行消息重发。但是使用事务同样会带来一些问题。
  • 会阻塞,发布者必须等待broker处理每个消息。
  • 事务是重量级的,每次提交都需要fsync(),需要耗费大量的时间
  • 事务非常耗性能,会降低RabbitMQ的消息吞吐量。

2.发送方确认机制#

前面介绍了RabbitMQ可能会遇到的一个问题,即消息发送方(生产者〉并不知道消息是否真正地到达了RabbitMQ。随后了解到在AMQP协议层面提供了事务机制来解决这个问题,但是采用事务机制实现会严重降低RabbitMQ的消息吞吐量,这里就引入了一种轻量级的方式一发送方确认(publisher confirm)机制。生产者将信道设置成confirm确认)模式,一旦信道进入confirm模式,所有在该信道上面发布的消息都会被指派一个唯一的ID( 从1开始),一旦消息被投递到所有匹配的队列之后(或者是没有合适的队列而被丢弃。总之是消息确确实实到了rabbitmq上),RabbitMQ就会发送一个确认(BasicAck) 给生产者(包含消息的唯一ID),这就使得生产者知晓消息已经正确到达了目的地了。(注意,一旦消息到了rabbitmq(即到了队列或者被丢弃)就返回确认,而不是等到被消费)如果消息和队列是可持久化的,那么确认消息会在消息写入磁盘之后发出。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
》同步方法:
channel.confirmSelect(); //一定要写明这句,表示将信道设置为确认模式,否则会报错:Confirms not selected
String message = "qazwsx";
channel.basicPublish("eee", "", false, null, message.getBytes());
boolean result = channel.waitForConfirms();
if(result){
System.out.println("arrived");
}else {
System.out.println("no way");
}
》异步方法:
channel.confirmSelect(); ////一定要写明这句,表示将信道设置为确认模式,否则confrimListener的ack/nack方法不会被调用
String message = "qazwsx";
channel.basicPublish("eee", "r1", false, null, message.getBytes());
channel.addConfirmListener(new ConfirmListener() {
public void handleAck(long deliveryTag, boolean multiple) throws IOException {
System.out.println("arrived");
}
public void handleNack(long deliveryTag, boolean multiple) throws IOException {
System.out.println("no way");
}
});
关于生产者确认机制同样会有一些问题,broker(也就是rabbitmq系统,exchange和queue等的统称)不能保证消息会被confirm,只知道将会进行confirm。这样如果broker与生产者之间的连接断开,导致生产者不能收到确认消息,可能会重复进行发布。总之,生产者确认模式给客户端提供了一种较为轻量级的方式,能够跟踪哪些消息被broker处理,哪些可能因为broker宕掉或者网络失败的情况而重新发布。
注意:事务机制和publisher confirm机制两者是互斥的,不能共存。如果企图将已开启事务模式的信道再设置为publisher confirm模式, RabbitMQ会报错,或者如果企图将已开启publisher confirm模式的信道设置为事务模式, RabbitMQ也会报错。在性能上来看,而到底应该选择事务机制还是Confirm机制,则需要结合我们的业务场景。

》消费者确认机制

由于生产者和消费者不直接通信,生产者只负责把消息发送到队列,消费者只负责从队列获取消息(不管是push还是pull)。
消息被”消费”后,是需要从队列中删除的.那怎么确认消息被”成功消费”了呢?
是消费者从队列获取到消息后,broker 就从队列中删除该消息?
那如果消费者收到消息后,还没来得及”消费”它,或者说还没来得及进行业务逻辑处理时,消费者所在的信道或者连接因某种原因断开了,那这条消息岂不是就被无情的抛弃了…
我们更期望的是,消费者从队列获取到消息后,broker 暂时不删除该条消息,等到消费者”成功消费”掉该消息后,再删除它。
所以需要一个机制来确认生产者发送的消息被消费者”成功消费”。
RabbitMQ 提供了一种叫做”消费者确认”的机制.
如上,消费者在消费消息的同时,Rabbit会同步给予消费者一个DeliveryTag,这个DeliveryTag就像我们数据库中的主键,消费者在消费完毕后拿着这个DeliveryTag去Rabbit确认或拒绝这个消息。
消费者确认
消费者确认分两种:自动确认和手动确认.
在自动确认模式中,消息在发送到消费者后即被认为”成功消费”.这种模式可以降低吞吐量(只要消费者可以跟上),以降低交付和消费者处理的安全性.这种模式通常被称为“即发即忘”.与手动确认模型不同,如果消费者的TCP连接或通道在真正的”成功消费”之前关闭,则服务器发送的消息将丢失.因此,自动消息确认应被视为不安全,并不适用于所有工作负载. channel.basicConsume(QUEUE_NAME, true, consumer); //第二个参数是autoAck,为true则证明开启自动确认。
使用自动确认模式时需要考虑的另一件事是消费者过载.手动确认模式通常与有界信道预取(BasicQos方法)一起使用,该预取限制了信道上未完成(“进行中”)的消息的数量.但是,自动确认没有这种限制.因此,消费者可能会被消息的发送速度所淹没,可能会导致消息积压并耗尽堆或使操作系统终止其进程.某些客户端库将应用TCP反压(停止从套接字读取,直到未处理的交付积压超过某个限制).因此,仅建议能够以稳定的速度有效处理消息的消费者使用自动确认模式。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
QueueingConsumer consumer = new QueueingConsumer(channel);

//自动确认:
channel.basicConsume(QUEUE_NAME, true, consumer);
channel.basicQos(1);
//手动确认:拉模式:只有当broker收到消费确认后,才从队列中删除这一条消息。
channel.basicConsume(QUEUE_NAME, false, consumer);
channel.basicQos(1);
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
String message = new String(delivery.getBody());
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); //发出消费者确认
basicAck(deliveryTag, multiple):
第一个参数deliveryTag
就是接受的消息的deliveryTag,可以通过delivery.getEnvelope().getDeliveryTag()获得
第二个参数 multiple
如果为true,确认之前接受到的消息;如果为false,只确认当前消息。
如果为true就表示连续取得多条消息才发会确认,和计算机网络的中tcp协议接受分组的累积确认十分相似,能够提高效率。

void basicAck(long deliveryTag, boolean multiple);
void basicReject(long deliveryTag, boolean requeue);
void basicNack(long deliveryTag, boolean multiple, boolean requeue);
* deliveryTag:可以看作消息的编号,它是一个64位的长整型值,最大值是9223372036854775807
* requeue:如果requeue 参数设置为true,则RabbitMQ会重新将这条消息存入队列,以便可以发送给下一个订阅的消费者;如果requeue 参数设置为false,则RabbitMQ立即会把消息从队列中移除,而不会把它发送给新的消费者。
* BasicReject命令一次只能拒绝一条消息,如果想要批量拒绝消息,则可以使用Basic.Nack这个命令。
* multiple:在BasicAck中,multiple 参数设置为true 则表示确认deliveryTag编号之前所有已被当前消费者确认的消息。在BasicNack中,multiple 参数设置为true 则表示拒绝deliveryTag 编号之前所有未被当前消费者确认的消息。
说明:将channel.BasicReject 或者channel.BasicNack中的requeue设置为false ,可以启用"死信队列"的功能。
上述requeue,都会将消息重新存入队列发送给下一个消费者(也有可能是其它消费者)。关于requeue还有下面一种用法。可以选择是否补发给当前的consumer。
//补发消息 true退回到queue中 /false只补发给当前的consumer
channel.basicRecover(true);
注意:RabbitMQ仅仅通过Consumer的连接中断来确认该Message并没有被正确处理。也就是说,RabbitMQ给了Consumer足够长的时间来做数据处理。如果忘记了ack,那么后果很严重。当Consumer退出时,Message会重新分发。然后RabbitMQ会占用越来越多的内存,由于RabbitMQ会长时间运行,这个“内存泄漏”是致命的。

》消息传输保障

消息可靠传输一般是业务系统接入消息中间件时首要考虑的问题,一般消息中间件的消息传输保障分为三个层级。

  • At most once: 最多一次。消息可能会丢失,但绝不会重复传输。
  • At least once: 最少一次。消息绝不会丢失,但可能会重复传输。
  • Exactly once: 恰好一次。每条消息肯定会被传输一次且仅传输一次。
    RabbitMQ 支持其中的”最多一次”和”最少一次”。其中”最少一次”投递实现需要考虑以下这个几个方面的内容:
  1. 消息生产者需要开启事务机制或者publisher confirm 机制,以确保消息可以可靠地传输到RabbitMQ 中。
  2. 消息生产者需要配合使用mandatory参数或者备份交换器来确保消息能够从交换器路由到队列中,进而能够保存下来而不会被丢弃。
  3. 消息和队列都需要进行持久化处理,以确保RabbitMQ服务器在遇到异常情况时不会造成消息丢失。
  4. 消费者在消费消息的同时需要将autoAck设置为false,然后通过手动确认的方式去确认己经正确消费的消息,以避免在消费端引起不必要的消息丢失。
    “最多一次”的方式就无须考虑以上那些方面,生产者随意发送,消费者随意消费,不过这样很难确保消息不会重复消费。
    “恰好一次”是RabbitMQ目前无法保障的(目前我也不知道哪个中间件能够保证)。消费者在消费完一条消息之后向RabbitMQ 发送确认BasicAck命令,此时由于网络断开或者其他原因造成RabbitMQ并没有收到这个确认命令,那么RabbitMQ不会将此条消息标记删除。在重新建立连接之后,消费者还是会消费到这一条消息,这就造成了重复消费。再考虑一种情况,生产者在使用publisher confirm机制的时候,发送完一条消息等待RabbitMQ 返回确认通知,此时网络断开,生产者捕获到异常情况,为了确保消息可靠性选择重新发送,这样RabbitMQ中就有两条同样的消息,在消费的时候,消费者就会重复消费。而解决重复消费可以通过消费者幂等等方式来解决。

最近按照默认方法安装brew特别慢,但是想切换从国内源安装,得到的都是老版本的切换方法:

https://segmentfault.com/a/1190000018360813

因为安装brew的脚本现在不是ruby脚本,改成了shell脚本,而且脚本地址也改变了,所以也需要改变一下安装策略:

首先把https://raw.githubusercontent.com/Homebrew/install/master/install.sh上的所有内容拷贝到~目录下的brew_install 文件中(没有就新建)

修改代码(修改为中科大源):

1
2
3
4
5
6
7
8
9
10
11
12
else
HOMEBREW_PREFIX_DEFAULT="/home/linuxbrew/.linuxbrew"
HOMEBREW_CACHE="${HOME}/.cache/Homebrew"

STAT="stat --printf"
CHOWN="/bin/chown"
CHGRP="/bin/chgrp"
GROUP="$(id -gn)"
fi
# BREW_REPO="https://github.com/Homebrew/brew” 修改前
BREW_REPO="git://mirrors.ustc.edu.cn/brew.git” #修改后
CORE_TAP_REPO="git://mirrors.ustc.edu.cn/homebrew-core.git” #新增

修改过后把brew_install文件变为可执行

1
chmod a+x brew_install

然后用执行脚本的方式执行:

1
/bin/bash brew_install

速度简直和外网不是一个level!

但是执行到brew-core还是会巨慢无比,此时可以ctrl-c 可以停掉,

创建目录/usr/local/Homebrew/Library/Taps/homebrew/homebrew-core

并执行

1
git remote set-url origin https://mirrors.ustc.edu.cn/homebrew-core.git

然后重新执行

1
/bin/bash brew_install

本文大部分内容借鉴自https://baijiahao.baidu.com/s?id=1633338040568845450&wfr=spider&for=pc,感谢大神的辛勤劳动!

跳表就是跳跃的表,其实归根结底是好多层的链表,最底层是正常的数据链表,往上走是代表性的索引

越往上走一个索引代表的节点就越多,其实跳表和B* 树有些类似,

B* 树

1

跳表:

越往上走,一个值能代表的范围越广。

都是:如果在当前层大于本节点,小于下一个节点,则往下走,否则继续往前走

跳表。跳表是redis的一个核心组件,也同时被广泛地运用到了各种缓存地实现当中,它的主要优点,就是可以跟红黑树、AVL等平衡树一样,做到比较稳定地插入、查询与删除。理论插入查询删除的算法时间复杂度为O(logN)。
3

什么是跳表

链表,相信大家都不陌生,维护一个有序的链表是一件非常简单的事情,我们都知道,在一个有序的链表里面,查询跟插入的算法复杂度都是O(n)。
4

我们能不能进行优化呢,比如我们一次比较两个呢?那样不就可以把时间缩小一半?
5

同理,如果我们4个4个比,那不就更快了?
6

跳表就是这样的一种数据结构,结点是跳过一部分的,从而加快了查询的速度。跳表跟红黑树又有什么差别呢?既然两者的算法复杂度差不多,为什么Redis要使用跳表而不使用红黑树呢?跳表相对于红黑树,主要有这几个优点:1.代码相对简单,手写个跳表还有可能,手写个红黑树试试?

2.如果我们要查询一个区间里面的值,用平衡树可能会麻烦。这里的麻烦指的是实现和理解上,平衡二叉树查询一段区间也是可以做到的。3.删除一段区间,这个如果是平衡二叉树,就会相当困难,毕竟设计到树的平衡问题,而跳表则没有这种烦恼。好了,相信你对跳表已经有一些认识了,我们来简单介绍平衡二叉树的几个基本操作。

查询

假如我们要查询11,那么我们从最上层出发,发现下一个是5,再下一个是13,已经大于11,所以进入下一层,下一层的一个是9,查找下一个,下一个又是13,再次进入下一层。最终找到11。
7

是不是非常的简单?我们可以把查找的过程总结为一条三目表达式(下一个是否大于结果?下一个:下一层)。理解跳表的查询过程非常重要,试试看查询其他数字,只要你理解了查询,后面两种都非常简单。

插入

插入的时候,首先要进行查询,然后从最底层开始,插入被插入的元素。然后看看从下而上,是否需要逐层插入。可是到底要不要插入上一层呢?我们都知道,我们想每层的跳跃都非常高效,越是平衡就越好(第一层1级跳,第二层2级跳,第3层4级跳,第4层8级跳)。但是用算法实现起来,确实非常地复杂的,并且要严格地按照2地指数次幂,我们还要对原有地结构进行调整。所以跳表的思路是抛硬币,听天由命,产生一个随机数,50%概率再向上扩展,否则就结束。这样子,每一个元素能够有X层的概率为0.5^(X-1)次方。反过来,第X层有多少个元素的数学期望大家也可以算一下。

删除

同插入一样,删除也是先查找,查找到了之后,再从下往上逐个删除。比较简单,就不再赘叙。

跳表和二叉搜索树比较:
8

总结

跳表,用了计算机中一场非常用的解决问题的思路,随机。随机在深度学习与人工智能领域运用得非常的广泛。

mysql执行

1
select @@tx_isolaton;

保错:

ERROR 1193 (HY000): Unknown system variable ‘tx_isolaton’

解决方法:mysql8及以上tx_isolaton更名为transaction_isolation,执行

1
select @@transaction_isolation;

即可解决。

跟着大神的步骤一步一步做,加油!
任务列表:http://blog.51cto.com/ticktick/1956269

在 Android 平台绘制一张图片,使用至少 3 种不同的 API,ImageView,SurfaceView,自定义 View
imageView和surfaceView,为了方便都是直接在activity里面写的函数,直接在onCreateView里面调用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
import android.content.res.Resources;
import android.graphics.Bitmap;
import android.graphics.BitmapFactory;
import android.graphics.Canvas;
import android.graphics.Matrix;
import android.graphics.Rect;
import android.graphics.drawable.BitmapDrawable;
import android.graphics.drawable.Drawable;
import android.support.v7.app.AppCompatActivity;
import android.os.Bundle;
import android.view.SurfaceHolder;
import android.view.SurfaceView;
import android.widget.ImageView;

public class MainActivity extends AppCompatActivity {

@Override
protected void onCreate(Bundle savedInstanceState) {
super.onCreate(savedInstanceState);
setContentView(R.layout.activity_main);
displaySurfaceView();
}

void displayImg(){
final ImageView iv = findViewById(R.id.iv_lu);
//1
// iv.setImageResource(R.mipmap.wechatimg1);

//2
// Drawable drawable = getResources().getDrawable(R.mipmap.wechatimg1, null);
// iv.setImageDrawable(drawable);
//3
Bitmap bitmap = BitmapFactory.decodeResource(getResources(), R.mipmap.wechatimg1);
iv.setImageBitmap(bitmap);
}

void displaySurfaceView(){
final SurfaceView sv = findViewById(com.example.lujianyun.va.R.id.sv_lu);

final SurfaceHolder holder = sv.getHolder();
final Thread t = new Thread(new Runnable() {
@Override
public void run() {
Canvas canvas = holder.lockCanvas();
Bitmap bitmap = BitmapFactory.decodeResource(getResources(), com.example.lujianyun.va.R.mipmap.wechatimg1);
canvas.drawBitmap(bitmap, 0, 0, null);
holder.unlockCanvasAndPost(canvas);
holder.lockCanvas(new Rect(0,0,0,0));
holder.unlockCanvasAndPost(canvas);
}
});

holder.addCallback(new SurfaceHolder.Callback() {
@Override
public void surfaceCreated(SurfaceHolder holder) {
t.start();
}

@Override
public void surfaceChanged(SurfaceHolder holder, int format, int width, int height) {

}

@Override
public void surfaceDestroyed(SurfaceHolder holder) {

}
});



}
}

自定义view

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
public class DrawView extends View {

String TAG = "DrawView";
Bitmap bitmap;
Bitmap orginBitmap;
boolean bitmapFinished = false;

public DrawView(Context context, @Nullable AttributeSet attrs) {
super(context, attrs);
orginBitmap = BitmapFactory.decodeResource(getResources(), R.mipmap.wechatimg1);


}

@Override
protected void onMeasure(int widthMeasureSpec, int heightMeasureSpec) {
if (!bitmapFinished) { //由于onmeasure要多次调用,所以创建对象的工作只做一次,防止耗时
int w = MeasureSpec.getSize(widthMeasureSpec);
int h = MeasureSpec.getSize(heightMeasureSpec);
bitmap = Bitmap.createScaledBitmap(orginBitmap, w, h, false);
bitmapFinished = true;
}
super.onMeasure(widthMeasureSpec,heightMeasureSpec);
}

@Override
protected void onDraw(Canvas canvas) {
super.onDraw(canvas);
if (bitmap != null)
canvas.drawBitmap(bitmap, 0, 0, null);
}

}

布局文件:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
<?xml version="1.0" encoding="utf-8"?>
<LinearLayout xmlns:android="http://schemas.android.com/apk/res/android"
xmlns:app="http://schemas.android.com/apk/res-auto"
xmlns:tools="http://schemas.android.com/tools"
android:layout_width="300dp"
android:layout_height="300dp"
tools:context=".MainActivity"
android:clipChildren="false">

<ImageView
android:id="@+id/iv_lu"
android:visibility="gone"
android:layout_width="wrap_content"
android:layout_height="wrap_content"
app:layout_constraintBottom_toBottomOf="parent"
app:layout_constraintLeft_toLeftOf="parent"
app:layout_constraintRight_toRightOf="parent"
app:layout_constraintTop_toTopOf="parent" />

<SurfaceView
android:id="@+id/sv_lu"
android:visibility="gone"
android:layout_width="wrap_content"
android:layout_height="wrap_content" />

<com.example.lll.va.DrawView
android:layout_width="wrap_content"
android:layout_height="wrap_content" />

</LinearLayout>

先贴一个好贴:https://www.cnblogs.com/ALittleDust/p/5935983.html

YUV

YUV是一种颜色空间,基于YUV的颜色编码是流媒体的常用编码方式。Y表示流明,U、V表示色度、浓度,这种表达方式起初是为了彩色电视与黑白电视之间的信号兼容。 对于图像每一点,Y确定其亮度,UV确认其彩度。
Y’CbCr也称为YUV,是YUV的压缩版本,不同之处在于Y’CbCr用于数字图像领域,YUV用于模拟信号领域,MPEG、DVD、摄像机中常说的YUV其实是Y’CbCr,二者转换为RGBA的转换矩阵是不同的。Y’为亮度,Cb、Cr分量代表当前颜色对蓝色和红色的偏移程度。
在这里插入图片描述
Y’=0.5时,Cb、Cr构成的颜色平面
如果输出Y’CbCr三个分量的值,那么会是这样的。
在这里插入图片描述
由上到下依次为Y’、Cb、Cr
为了方便,以下文中YUV特指Y’CbCr。
YUV颜色编码的作用

YUV编码是image/video pipeline的重要组成。比如常用的I420相对于RGB24(RGB三个分量各8个字节)的编码格式,只需要一半的存储容量。在流数据传输时降低了带宽压力。
在这里插入图片描述
YUV颜色编码在video pipeline中的运用
YUV颜色编码格式

YUV色彩编码格式由其色度抽样方式和存储方式决定。
YUV 采样

对光信号采样是把光由模拟信号变为数字信号,这不是我们要做的事,我们要做的是把射线透照产生的模拟图像变为数字图像,两者不是一码事。所以不应该对光信号谈采样定理。
对图像的采样,是把模拟图像变成数字图像,描述模拟图像的是连续的信息,但要把它变成离散的信息,
YUV采样
YUV的一个优点是色度通道可以具有比Y通道更低的采样率而不会显着降低感知质量。 称为A:B:C表示法的符号用于描述U和V相对于Y的采样频率:

  • 4:4:4表示没有色度通道的下采样。
  • 4:2:2表示2:1水平下采样,没有垂直下采样。 对于每两个U或V样本,每条扫描线包含四个Y样本。
  • 4:2:0表示2:1水平下采样,2:1垂直下采样。
  • 4:1:1表示4:1水平下采样,没有垂直下采样。 每个扫描线包含每个U或V样本的四个Y样本。 4:1:1采样不如其他格式常见,本文不再详细讨论。
    图1显示了4:4:4图片中使用的采样网格。 Luma样本由十字表示,色度样本由圆表示。
    Figure 1 展示了 4:4:4 格式的图片所用的采样网格. Luma samples are represented by a cross, and chroma samples are represented by a circle.(亮度信息是X,色度信息是O)
    在这里插入图片描述
    Figure 1. YUV 4:4:4 sample positions
    4:2:2采样的主要形式在ITU-R建议书BT.601中定义。 Figure 2 shows the sampling grid defined by this standard.
    在这里插入图片描述
    Figure 2. YUV 4:2:2 sample positions
    有两种常见的4:2:0采样变体。 其中一个用于MPEG-2视频,另一个用于MPEG-1和ITU-T建议H.261和H.263。 图3显示了MPEG-1方案中使用的采样网格,图4显示了MPEG-2方案中使用的采样网格。(视频使用的采样网格,就是视频中每一帧(一张图片)的采样网格)
    在这里插入图片描述
    Figure 3. YUV 4:2:0 sample positions (MPEG-1 scheme)
    在这里插入图片描述
    Figure 4. YUV 4:2:0 sample positions (MPEG-2 scheme)
    与MPEG-1方案相比,在MPEG-2方案和为4:2:2和4:4:4格式定义的采样网格之间进行转换更为简单。 因此,MPEG-2方案在Windows中是首选,应被视为4:2:0格式的默认解释。
    先记住下面这段话,以后提取每个像素的YUV分量会用到。
    (每一个像素对应一个Y)
  1. YUV 4:4:4采样,每一个Y对应一组UV分量。(每个像素对应一组UV)
  2. YUV 4:2:2采样,每两个Y共用一组UV分量。 (两个像素共用一组UV)
  3. YUV 4:2:0采样,每四个Y共用一组UV分量(专指12bit/pixel的格式)。(四个像素共用一组UV)
    4:2:0采样还有每四个Y共用2组UV分量(为16bit[pixel的格式)
    YUV存储方式

宏像素(macropixel):一组包含了若干个亮度,色度(可能还有透明度)的数据组。有可能一个像素对应一个宏像素,也有可能几个像素共用一个宏像素

YUV存储像素信息有两种格式:
packed:Y,U,V的信息存储在同一个存储平面中(或者其中的两个存储在同一数组中),像素被组织成一组宏像素,,布局取决于格式
planar:Y,U,V 被存储为3个不同平面里
简而言之:packed就是宏像素混合存储,planar就是每种宏像素分开存储

存储平面(surface):一个抽象的平面表示了像素存储的方式,其中的单位是一个宏像素
原点:屏幕左上角
步幅stride:存储平面的宽度,始终为正
对齐:surface字对齐
4:4:4 格式, 32 Bits per Pixel

单个4:4:4格式,使用FOURCC代码AYUV。 这是一种打包格式,其中每个像素被编码为四个连续字节,按以下顺序排列。(每个像素对应一个宏像素,如像素p0对应宏像素 V0U0Y0A0)

在这里插入图片描述
Figure 5. AYUV memory layout
标为A的字节代表透明度
4:2:2 Formats, 16 Bits per Pixel
两种 4:2:2 格式, 编码方式如下:

  • YUY2
  • UYVY
    都是packed格式, 其中每个宏像素用于两个像素,编码为四个连续的字节。 这导致色度的水平下采样两倍。
    YUY2

在YUY2格式中,数据可以被视为无符号字符值的数组,其中第一个字节包含第一个Y样本,第二个字节包含第一个U(Cb)样本,第三个字节包含第二个Y样本,以及 第四个字节包含第一个V(Cr)样本,如图6(每两个像素对应一个宏像素,如像素p0,p1对应宏像素 Y0U0Y1V0, 第一个像素的显示信息为Y0U0V0,第二个像素的显示信息为Y1U0V0。相当于32个bit存储了2像素的信息,就叫做每个像素16bit,但实际上每个像素的完整信息仍需要12bit,只是其中8bit与另一个共用)

在这里插入图片描述Figure 6. YUY2 memory layout
如果图像被放置为两个小端WORD值的数组,则第一个WORD在最低有效位(LSB)中包含Y0,在最高有效位(MSB)中包含U. 第二个WORD在LSB中包含Y1,在MSB中包含V.
YUY2是MicrosoftDirectX®视频加速(DirectX VA)的首选4:2:2像素格式。 预计这将是支持4:2:2视频的DirectX VA加速器的中期要求。
UYVY
这种格式与YUY2相同,只是字节顺序颠倒了 - 也就是说,色度和亮度字节被翻转(图7)。 如果图像被寻址为两个小端WORD值的数组,则第一个WORD在LSB中包含U,在MSB中包含Y0,第二个WORD在LSB中包含V,在MSB中包含Y1。(每两个像素对应一个宏像素,如像素p0,p1对应宏像素 Y0U0Y1V0, 第一个像素的显示信息为Y0U0V0,第二个像素的显示信息为Y1U0V0)
在这里插入图片描述
Figure 7. UYVY memory layout
4:2:0 Formats, 16 Bits per Pixel

两种4:2:0 格式,16位/像素,编码方式如下:

  • IMC1
  • IMC3
    这两种都是planar格式. 色度通道在水平和垂直维度上次采样系数为2
    IMC1
    所有的Y首先在内存中存储, 就像一个无符号 char 值类型的数组. 紧接着是 V (Cr) 样本, 然后是U (Cb) 样本. V 和 U 平面的宽度和Y平面的宽度一样, 所以会导致一些没有用到的内存, 如图8. (4个像素对应一个宏像素, p0,p1,p2,p3对应 Y0Y1Y2Y3V0V1U0U1,p0的显示信息为Y0V0U0,p1的显示信息为Y1V0U0,p2的显示信息为Y2V1U1,p3的显示信息为Y3V1U1,U,V可能不是这么安排的。每个像素对应一个Y,对应一种U,V的组合,4个像素分别用各自的亮度信息,共用4个色度信息,平均每个像素使用16bit)
    在这里插入图片描述
    Figure 8. IMC1 memory layout
    IMC3

此格式与IMC1相同,但U和V平面交换次序:
在这里插入图片描述
Figure 9. IMC3 memory layout
4:2:0 Formats, 12 Bits per Pixel

4种 4:2:0 12bit/pixel 格式 ,存储方式如下(4个像素用4个Y,1个U,一个V)

  • IMC2
  • IMC4
  • YV12
  • NV12
    在这些所有格式中,色度通道在水平和垂直维度上次采样系数为2

IMC2
该格式与IMC1相同,除了V(Cr)和U(Cb)线在半步边界处交错。 换句话说,色度区域中的每个全步幅线以一行V样本开始,接着从下一个半步幅边界开始是一行U样本,(图10)。 这种布局比IMC1更有效地使用地址空间。 它将色度地址空间减半,因此总地址空间减少了25%。 在4:2:0格式中,IMC2是仅次于NV12的第二好的格式。
在这里插入图片描述
Figure 10. IMC2 memory layout
IMC4

此格式与IMC2相同,但U(Cb)和V(Cr)行交换:
在这里插入图片描述
Figure 11. IMC4 memory layout
YV12

所有Y样本首先在内存中显示为无符号char值的数组。 该阵列紧接着是所有V(Cr)样本。 V平面的步幅是Y平面的一半,V平面包含Y平面一半的行数。 紧接着V平面的所有U(Cb)样本,具有与V平面相同的步幅和行数(图12)。
在这里插入图片描述
Figure 12. YV12 memory layout
NV12

所有Y样本首先在内存中存储,Y平面是无符号char值的数组,共有偶数行。 Y平面后面紧跟着一个无符号字符值数组,其中包含打包的U(Cb)和V(Cr)样本,如图13所示。当组合的UV数组作为小端WORD值的数组进行寻址时, LSB包含U值,MSB包含V值。 NV12是DirectX VA的首选4:2:0像素格式。 预计这将是支持4:2:0视频的DirectX VA加速器的中期要求。
在这里插入图片描述
Figure 13. NV12 memory layout
I420多用于传输

摄像头采集得到的数据是NV12
在这里插入图片描述

任务:2. 在 Android 平台使用 AudioRecord 和 AudioTrack API 完成音频 PCM 数据的采集和播放,并实现读写音频 wav 文件

记得加权限:

1
2
3
<uses-permission android:name="android.permission.RECORD_AUDIO"/>
<uses-permission android:name="android.permission.READ_EXTERNAL_STORAGE"/>
<uses-permission android:name="android.permission.WRITE_EXTERNAL_STORAGE"/>

AudioRecord使用流程:
确定好采样率,通道类型,编码方式,录制来源
由AudioRecord.getMinbufferSize获得能接受的最小buffer大小
new一个AudioRecord对象 audioRecord ,第一个参数表示的是音频录制来源。注意的是record和track都有通道(channel)参数,但一个是in一个是out,不要搞反了
开始录制:audioRecord.startRecord() 同时要开启一个线程,该线程所做的工作是打开一个output流创建一个文件。不断用audioRecord.read把录制的数据写到一个比特数组,然后把比特数组写到output流中
暂停录制:audioRecord.stop,但不release,而且output流也停止写入,但不释放
恢复录制,audioRecord.startRecord() ,继续读取audioRecord的数据并写入output流,
停止录制:audioRecord.stop,audioRecord.release释放资源,output流flush一下然后关闭
这样录制的音频是pcm格式的,也就是没有文件头的原始文件,要转成wav文件,只需要保持数据部分不变,然后再文件开始加入wav的文件头
AudioTrack使用流程:
确定好采样率,通道类型,编码方式
由AudioTrack.getMinbufferSize获得能接受的最小buffer大小
new一个AudioTrack对象 audioTrack ,第一个参数表示的是音频播放的类型,可选的有演讲、音乐、影视等(可能是针对每种不同的场景类型做一些优化) 。最后一个参数mode是表示创建的模式
MODE_STATIC 其中音频数据从Java传输到本地层仅一次,然后音频开始播放。
MODE_STREAM 当音频播放时,音频数据从Java流到本地层。(两个先后顺序不同)
开始播放:audioTrack.play() 同时要开启一个线程,该线程所做的工作是打开一个input流读取音频文件(audioTrack既可以读原始的pcm文件,也可以读加入文件头的wav文件), 不断把文件到一个比特数组,然后用audioTrack.write把比特数组写入.
暂停播放:audioTrack.stop,但不release,而且input流也停止读入,但不释放(文件指针还停留在此)
恢复播放,audioTrack.play,让input流在停止的地方继续入读,
停止播放:audioTrack.stop,audioTrack.release释放资源,关闭input流(这样input流保留的文件指针也就丢失了,下一次就会从头读)
AudioRecord的使用:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
import android.app.Activity;
import android.media.AudioFormat;
import android.media.AudioRecord;
import android.media.MediaRecorder;
import android.os.Environment;
import android.util.Log;
import android.view.View;
import android.widget.Button;

import com.example.lll.va.R;

import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException;

public class AudioRecorder {

private String tag = "AudioRecorder";
//音频输入-麦克风
private final static int AUDIO_INPUT = MediaRecorder.AudioSource.MIC;
//采用频率
//44100是目前的标准,但是某些设备仍然支持22050,16000,11025
//采样频率一般共分为22.05KHz、44.1KHz、48KHz三个等级
private final static int AUDIO_SAMPLE_RATE = 16000;
//声道 单声道
private final static int AUDIO_CHANNEL = AudioFormat.CHANNEL_IN_MONO;
//编码
private final static int AUDIO_ENCODING = AudioFormat.ENCODING_PCM_16BIT;

private byte data[];
private boolean isRecording = false;

private AudioRecord audioRecord = null; // 声明 AudioRecord 对象
private int recordBufSize = 0; // 声明recoordBufffer的大小字段
private Button btnStart;
private Button btnStop;

public static void startTask2byAudioRecord(Activity activity){
final AudioRecorder audioRecorder = new AudioRecorder();
audioRecorder.createAudioRecord();
audioRecorder.btnStart = activity.findViewById(R.id.btn_start_record_audio);
audioRecorder.btnStop = activity.findViewById(R.id.btn_stop_record_audio);
audioRecorder.btnStart.setOnClickListener(new View.OnClickListener() {
@Override
public void onClick(View v) {
audioRecorder.startRecord();
}
});
audioRecorder.btnStop.setOnClickListener(new View.OnClickListener() {
@Override
public void onClick(View v) {
audioRecorder.stopRecord();
}
});
}

public void createAudioRecord() {
recordBufSize = AudioRecord.getMinBufferSize(AUDIO_SAMPLE_RATE,
AUDIO_CHANNEL, AUDIO_ENCODING); //audioRecord能接受的最小的buffer大小
audioRecord = new AudioRecord(AUDIO_INPUT, AUDIO_SAMPLE_RATE, AUDIO_CHANNEL, AUDIO_ENCODING, recordBufSize);
data = new byte[recordBufSize];
}

public void startRecord() {
audioRecord.startRecording();
isRecording = true;
thread_w.start();
}

private String filename = Environment.getExternalStorageDirectory() + "/test";
Thread thread_w = new Thread(new Runnable() {
@Override
public void run() {

FileOutputStream os = null;

try {
os = new FileOutputStream(filename);
} catch (FileNotFoundException e) {
e.printStackTrace();
}

if (null != os) {
Log.d(tag, "isRecording = " + isRecording);
while (isRecording) {
int read = audioRecord.read(data, 0, recordBufSize);
Log.d(tag, "read size = " + read);
// 如果读取音频数据没有出现错误,就将数据写入到文件
if (AudioRecord.ERROR_INVALID_OPERATION != read) {
try {
os.write(data);
Log.d(tag, "os writr = " + data);
} catch (IOException e) {
e.printStackTrace();
}
}
}

try {
os.flush();
os.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
});

public void stopRecord() {
isRecording = false;
audioRecord.stop();
audioRecord.release();
// thread_w = null;
PcmToWavUtil util = new PcmToWavUtil(AUDIO_SAMPLE_RATE, AUDIO_CHANNEL, AUDIO_ENCODING);
util.pcmToWav(filename, filename + ".wav");
}
}

给生成的pcm文件加入wav文件头的工具类:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
import android.media.AudioFormat;
import android.media.AudioRecord;

import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;

public class PcmToWavUtil {

/**
* 缓存的音频大小
*/
private int mBufferSize;
/**
* 采样率
*/
private int mSampleRate;
/**
* 声道数
*/
private int mChannel;


/**
* @param sampleRate sample rate、采样率
* @param channel channel、声道
* @param encoding Audio data format、音频格式
*/
PcmToWavUtil(int sampleRate, int channel, int encoding) {
this.mSampleRate = sampleRate;
this.mChannel = channel;
this.mBufferSize = AudioRecord.getMinBufferSize(mSampleRate, mChannel, encoding);
}


/**
* pcm文件转wav文件
*
* @param inFilename 源文件路径
* @param outFilename 目标文件路径
*/
public void pcmToWav(String inFilename, String outFilename) {
FileInputStream in;
FileOutputStream out;
long totalAudioLen;
long totalDataLen;
long longSampleRate = mSampleRate;
int channels = mChannel == AudioFormat.CHANNEL_IN_MONO ? 1 : 2;
long byteRate = 16 * mSampleRate * channels / 8;
byte[] data = new byte[mBufferSize];
try {
in = new FileInputStream(inFilename);
out = new FileOutputStream(outFilename);
totalAudioLen = in.getChannel().size();
totalDataLen = totalAudioLen + 36;

writeWaveFileHeader(out, totalAudioLen, totalDataLen,
longSampleRate, channels, byteRate);
while (in.read(data) != -1) {
out.write(data);
}
in.close();
out.close();
} catch (IOException e) {
e.printStackTrace();
}
}


/**
* 加入wav文件头
*/
private void writeWaveFileHeader(FileOutputStream out, long totalAudioLen,
long totalDataLen, long longSampleRate, int channels, long byteRate)
throws IOException {
byte[] header = new byte[44];
// RIFF/WAVE header
header[0] = 'R';
header[1] = 'I';
header[2] = 'F';
header[3] = 'F';
header[4] = (byte) (totalDataLen & 0xff);
header[5] = (byte) ((totalDataLen >> 8) & 0xff);
header[6] = (byte) ((totalDataLen >> 16) & 0xff);
header[7] = (byte) ((totalDataLen >> 24) & 0xff);
//WAVE
header[8] = 'W';
header[9] = 'A';
header[10] = 'V';
header[11] = 'E';
// 'fmt ' chunk
header[12] = 'f';
header[13] = 'm';
header[14] = 't';
header[15] = ' ';
// 4 bytes: size of 'fmt ' chunk
header[16] = 16;
header[17] = 0;
header[18] = 0;
header[19] = 0;
// format = 1
header[20] = 1;
header[21] = 0;
header[22] = (byte) channels;
header[23] = 0;
header[24] = (byte) (longSampleRate & 0xff);
header[25] = (byte) ((longSampleRate >> 8) & 0xff);
header[26] = (byte) ((longSampleRate >> 16) & 0xff);
header[27] = (byte) ((longSampleRate >> 24) & 0xff);
header[28] = (byte) (byteRate & 0xff);
header[29] = (byte) ((byteRate >> 8) & 0xff);
header[30] = (byte) ((byteRate >> 16) & 0xff);
header[31] = (byte) ((byteRate >> 24) & 0xff);
// block align
header[32] = (byte) (2 * 16 / 8);
header[33] = 0;
// bits per sample
header[34] = 16;
header[35] = 0;
//data
header[36] = 'd';
header[37] = 'a';
header[38] = 't';
header[39] = 'a';
header[40] = (byte) (totalAudioLen & 0xff);
header[41] = (byte) ((totalAudioLen >> 8) & 0xff);
header[42] = (byte) ((totalAudioLen >> 16) & 0xff);
header[43] = (byte) ((totalAudioLen >> 24) & 0xff);
out.write(header, 0, 44);
}
}

AudioTrack的使用:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
import android.app.Activity;
import android.content.Context;
import android.media.AudioAttributes;
import android.media.AudioFormat;
import android.media.AudioManager;
import android.media.AudioRecord;
import android.media.AudioTrack;
import android.media.MediaRecorder;
import android.os.Environment;
import android.os.Handler;
import android.os.Message;
import android.util.Log;
import android.view.View;
import android.widget.Button;

import com.example.lll.va.R;

import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.IOException;

public class AudioTracker {
private String tag = "AudioTracker";
//采用频率
private final static int AUDIO_SAMPLE_RATE = 16000;
//声道 单声道
private final static int AUDIO_CHANNEL = AudioFormat.CHANNEL_OUT_MONO;
//编码
private final static int AUDIO_ENCODING = AudioFormat.ENCODING_PCM_16BIT;
private AudioTrack audioTrack;
private Activity activity;
private int buffersize;
private byte[] data;
private boolean isPlay = false;
private boolean isFirstPlay = true; //用firstPlay和isPlay组合,可以完成暂停和继续播放的功能

public static void startTask2byAudioTrack(Activity activity) {
final AudioTracker audioTracker = new AudioTracker();
audioTracker.activity = activity;
Button btnPlay = activity.findViewById(R.id.btn_play_audio);
Button btnStop = activity.findViewById(R.id.btn_stop_audio);
Button btnPause = activity.findViewById(R.id.btn_pause_audio);
btnPlay.setOnClickListener(new View.OnClickListener() {
@Override
public void onClick(View v) {
audioTracker.startPlay();
}
});
btnStop.setOnClickListener(new View.OnClickListener() {
@Override
public void onClick(View v) {
audioTracker.stopPlay();
}
});
btnPause.setOnClickListener(new View.OnClickListener() {
@Override
public void onClick(View v) {
audioTracker.pause();
}
});

}

//AudioAttributes取代了流类型的概念(例如参见AudioManager.STREAM_MUSIC或AudioManager.STREAM_ALARM),
// 用于定义音频播放的行为。 通过允许应用程序定义,属性允许应用程序指定比在流类型中传达的信息更多的信息:


public void initAudioTrack() {
if (isFirstPlay) {
buffersize = AudioTrack.getMinBufferSize(AUDIO_SAMPLE_RATE,
AUDIO_CHANNEL, AUDIO_ENCODING); //audioTracker能接受的最小的buffer大小
audioTrack = new AudioTrack(AudioAttributes.CONTENT_TYPE_MUSIC, AUDIO_SAMPLE_RATE, AUDIO_CHANNEL
, AudioFormat.ENCODING_PCM_16BIT, buffersize, AudioTrack.MODE_STREAM);
data = new byte[buffersize];
}

}

public void startPlay() {
initAudioTrack();
audioTrack.play();
isPlay = true;
playThread.start();

}

public void pause() {
audioTrack.stop();
isPlay = false;
Log.d(tag, "pasue ");
}

public void stopPlay() {
audioTrack.stop();
isPlay = false;
isFirstPlay = true;
audioTrack.release();
Log.d(tag, "stop ");
is = null;
}

FileInputStream is = null;
String fileName = Environment.getExternalStorageDirectory() + "/test.wav";
Thread playThread = new Thread(new Runnable() {
@Override
public void run() {
File file = new File(fileName);

try {
if (isFirstPlay) { //如果是首次播放(停止后再播放也算首次)则初始化流
is = new FileInputStream(file);
isFirstPlay = false;
}
Log.d(tag, "is = " + is);
} catch (FileNotFoundException e) {
e.printStackTrace();
}
writeData();
}
});

private void writeData() {
if (is == null) return;
while (isPlay) {
try {
int read = is.read(data);
Log.d(tag, "read = " + read);
if (read != 0 && read != -1) {
audioTrack.write(data, 0, read);
} else {
stopPlay();
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
}

布局就是很简单的layout和几个分别控制开始、暂停、结束的button,就不贴了