在JobTracker中,有这样的一个属性queueManager,它的类型是org.apache.Hadoop.mapred.QueueManager,那么它被JobTracker用来干啥的呢?在上一篇博文中,我们已经大概知道了,JobTracker使用QueueManager来验证一个用户对Job是否具有某些操作权限,如提交等。所以在本文,我将集中讨论QueueManager类。
先来看看QueueManager的类图:
简单的介绍一下这几个属性:
queueNames:存放jobs所属的queues的名字(通过配置文件中的mapred.queues.names配置);
aclsMap:存放每一个queue每一个operation的ACLs;
aclsEnabed:是否开启ACL验证(通过配置文件中的mapred.acls.enabled配置);
schedulerInfoObjects:存放job队列的调度信息;
从这几个属性,我们可以清楚的看出JobTracker是按照queue来管理job的操作控制权限,调度Job的执行的。先来看看QueueManager是如何通过配置文件来初始化的:
private void initialize(Configuration conf) {
aclsEnabled = conf.getBoolean("mapred.acls.enabled", false);
String[] queues = conf.getStrings("mapred.queue.names", new String[] {JobConf.DEFAULT_QUEUE_NAME});
addToSet(queueNames, queues);
// for every queue, and every operation, get the ACL if any is specified and store in aclsMap.
for (String queue : queues) {
for (QueueOperation oper : QueueOperation.values()) {
String key = toFullPropertyName(queue, oper.getAclName());
String aclString = conf.get(key, "*");
aclsMap.put(key, new AccessControlList(aclString));
}
}
}
这个初始化是很简单的,不如来看一个完整的配置例子吧!
从配置文中我们可以得知,Job的操作类型有两种类型:acl-submit-job和acl-administer-jobs,它们对应的对象是:
static enum QueueOperation{
SUBMIT_JOB ("acl-submit-job", false),
ADMINISTER_JOBS ("acl-administer-jobs", true);
private final String aclName;
private final boolean jobOwnerAllowed;
QueueOperation(String aclName, boolean jobOwnerAllowed) {
this.aclName = aclName;
this.jobOwnerAllowed = jobOwnerAllowed;
}
final String getAclName() {
return aclName;
}
final boolean isJobOwnerAllowed() {
return jobOwnerAllowed;
}
}
它们的ACL由用户和用户组组成,用户和用户组之间用空格区分,用户之间、用户组之间用逗号区分。同时ACL可以是一个*,它表示任何用户以及任何用户组都有这个操作权限。
下面我们再来看看QueueManager是如何来对客户端进行Job的访问控制的,其实这个过程也很简单,直接看源码实现:
public synchronized boolean hasAccess(String queueName, JobInProgress job,
QueueOperation oper, UserGroupInformation ugi) {
if (!aclsEnabled) {//是否关闭了ACL验证
return true;
}
if (oper.isJobOwnerAllowed()) {
if (job.getJobConf().getUser().equals(ugi.getUserName())) return true;
}
AccessControlList acl = aclsMap.get(toFullPropertyName(queueName, oper.getAclName()));//获取对应的ACLS
if (acl == null) {
return false;
}
// Check the ACL list
boolean allowed = acl.allAllowed();
if (!allowed) {
if (acl.getUsers().contains(ugi.getUserName())) allowed = true;
else {
Set<String> allowedGroups = acl.getGroups();//匹配用户名
for (String group : ugi.getGroupNames()) {
if (allowedGroups.contains(group)) {//匹配用户组
allowed = true;
break;
}
}
}
}
return allowed;
}