Elasticsearch系列---实现分布式锁 (2)

响应结果:

{ "_index": "files-lock", "_type": "lock", "_id": "1", "_version": 1, "result": "created", "_shards": { "total": 2, "successful": 1, "failed": 0 }, "_seq_no": 0, "_primary_term": 1 }

线程1、线程2查询锁信息

{ "_index": "files-lock", "_type": "lock", "_id": "1", "_version": 1, "found": true, "_source": { "process_id": "181ab3ee-28cc-4339-ba35-69802e06fe42" } }

线程2传入的process_id为181ab3ee-28cc-4339-ba35-69802e06fe42,尝试加锁,失败,此时应该启动重试机制

POST /files-lock/lock/1/_update { "upsert": { "process_id": "a6d13529-86c0-4422-b95a-aa0a453625d5" }, "script": { "id": "document-lock", "params": { "process_id": "a6d13529-86c0-4422-b95a-aa0a453625d5" } } }

提示该文档已经被别的线程(线程1)锁住了,你不能更新了,响应报文如下:

{ "error": { "root_cause": [ { "type": "remote_transport_exception", "reason": "[node-1][192.168.17.137:9300][indices:data/write/update[s]]" } ], "type": "illegal_argument_exception", "reason": "failed to execute script", "caused_by": { "type": "script_exception", "reason": "runtime error", "painless_class": "java.lang.String", "to_string": "already locked by other thread", "java_class": "java.lang.String", "script_stack": [ "Debug.explain('already locked by other thread'); } ", " ^---- HERE" ], "script": "judge-lock", "lang": "painless", "caused_by": { "type": "painless_explain_error", "reason": null } } }, "status": 400 }

线程1执行事务方法

POST /files/file/1/_update { "doc": { "name":"README1.txt" } }

线程1的事务方法执行完成,并通过删除id为1的文档,相当于释放锁

DELETE /files-lock/lock/1

线程2在线程1执行事务的期间,一直在模拟挂起,重试的操作,直到线程1完成释放锁,然后线程2加锁成功

POST /files-lock/lock/1/_update { "upsert": { "process_id": "a6d13529-86c0-4422-b95a-aa0a453625d5" }, "script": { "id": "document-lock", "params": { "process_id": "a6d13529-86c0-4422-b95a-aa0a453625d5" } } }

结果:

{ "_index": "files-lock", "_type": "lock", "_id": "1", "_version": 3, "found": true, "_source": { "process_id": "a6d13529-86c0-4422-b95a-aa0a453625d5" } }

此时锁的process_id变成线程2传入的"a6d13529-86c0-4422-b95a-aa0a453625d5"

{ "_index": "files-lock", "_type": "lock", "_id": "1", "_version": 3, "found": true, "_source": { "process_id": "a6d13529-86c0-4422-b95a-aa0a453625d5" } }

这样基于ES的行锁操作控制过程就完成了。

脚本解释

update+upsert操作,如果该记录没加锁(此时document为空),执行upsert操作,设置process_id,如果已加锁,执行script
script内的逻辑是:判断传入参数与当前doc的process_id,如果不相等,说明有别的线程尝试对有锁的doc进行加锁操作,Debug.explain表示抛出一个异常。

process_id可以由Java应用系统里生成,如UUID。

如果两个process_id相同,说明当前执行的线程与加锁的线程是同一个,ctx.op = 'noop'表示什么都不做,返回成功的响应,Java客户端拿到成功响应的报文,就可以继续下一步的操作,一般这里的下一步就是执行事务方法。

点评

文档级别的锁颗粒度小,并发性高,吞吐量大,类似于数据库的行锁。

共享锁与排他锁 概念

共享锁:允许多个线程获取同一条数据的共享锁进行读操作
排他锁:同一条数据只能有一个线程获取排他锁,然后进行增删改操作

互斥性:共享锁与排他锁是互斥的,如果这条数据有共享锁存在,那么排他锁无法加上,必须得共享锁释放完了,排他锁才能加上。
反之也成立,如果这条数据当前被排他锁锁信,那么其他的排他锁不能加,共享锁也加不上。必须等这个排他锁释放完了,其他锁才加得上。
有人在改数据,就不允许别人来改,也不让别人来读。

读写锁的分离
如果只是读数据,每个线程都可以加一把共享锁,此时该数据的共享锁数量一直递增,如果这时有写数据的请求(写请求是排他锁),由于互斥性,必须等共享锁全部释放完,写锁才加得上。
有人在读数据,就不允许别人来改。

案例实验

我们先创建一个共享锁的脚本:

# 读操作加锁脚本 POST _scripts/rw-lock { "script": { "lang": "painless", "source": "if (ctx._source.lock_type == 'exclusive') { Debug.explain('one thread is writing data, the lock is exclusive now'); } ctx._source.lock_count++" } } # 读操作完毕释放锁脚本 POST _scripts/rw-unlock { "script": { "lang": "painless", "source": "if ( --ctx._source.lock_count == 0) { ctx.op = 'delete' }" } }

每次有一个线程读数据时,执行一次加锁操作

POST /files-lock/lock/1/_update { "upsert": { "lock_type": "shared", "lock_count": 1 }, "script": { "id": "rw-lock" } }

在多个页面上尝试,可以看到lock_count在逐一递增,模拟多个线程同时读一个文档的操作。

在有线程读文档,还未释放的情况下,尝试对该文档加一个排他锁

PUT /files-lock/lock/1/_create { "lock_type": "exclusive" }

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/wpdpdw.html