GVKun编程网logo

聊聊 dubbo 的 ConcurrentHashSet(dubbo consul)

13

对于聊聊dubbo的ConcurrentHashSet感兴趣的读者,本文将会是一篇不错的选择,我们将详细介绍dubboconsul,并为您提供关于com.intellij.util.container

对于聊聊 dubbo 的 ConcurrentHashSet感兴趣的读者,本文将会是一篇不错的选择,我们将详细介绍dubbo consul,并为您提供关于com.intellij.util.containers.ConcurrentHashSet的实例源码、concurrentHashMap、ConcurrentHashMap computeIfAbsent、ConcurrentHashMap 、HashTable的有用信息。

本文目录一览:

聊聊 dubbo 的 ConcurrentHashSet(dubbo consul)

聊聊 dubbo 的 ConcurrentHashSet(dubbo consul)

本文主要研究一下 dubbo 的 ConcurrentHashSet

ConcurrentHashSet

dubbo-2.7.2/dubbo-common/src/main/java/org/apache/dubbo/common/utils/ConcurrentHashSet.java

public class ConcurrentHashSet<E> extends AbstractSet<E> implements Set<E>, java.io.Serializable {

    private static final long serialVersionUID = -8672117787651310382L;

    private static final Object PRESENT = new Object();

    private final ConcurrentMap<E, Object> map;

    public ConcurrentHashSet() {
        map = new ConcurrentHashMap<E, Object>();
    }

    public ConcurrentHashSet(int initialCapacity) {
        map = new ConcurrentHashMap<E, Object>(initialCapacity);
    }

    /**
     * Returns an iterator over the elements in this set. The elements are
     * returned in no particular order.
     *
     * @return an Iterator over the elements in this set
     * @see ConcurrentModificationException
     */
    @Override
    public Iterator<E> iterator() {
        return map.keySet().iterator();
    }

    /**
     * Returns the number of elements in this set (its cardinality).
     *
     * @return the number of elements in this set (its cardinality)
     */
    @Override
    public int size() {
        return map.size();
    }

    /**
     * Returns <tt>true</tt> if this set contains no elements.
     *
     * @return <tt>true</tt> if this set contains no elements
     */
    @Override
    public boolean isEmpty() {
        return map.isEmpty();
    }

    /**
     * Returns <tt>true</tt> if this set contains the specified element. More
     * formally, returns <tt>true</tt> if and only if this set contains an
     * element <tt>e</tt> such that
     * <tt>(o==null&nbsp;?&nbsp;e==null&nbsp;:&nbsp;o.equals(e))</tt>.
     *
     * @param o element whose presence in this set is to be tested
     * @return <tt>true</tt> if this set contains the specified element
     */
    @Override
    public boolean contains(Object o) {
        return map.containsKey(o);
    }

    /**
     * Adds the specified element to this set if it is not already present. More
     * formally, adds the specified element <tt>e</tt> to this set if this set
     * contains no element <tt>e2</tt> such that
     * <tt>(e==null&nbsp;?&nbsp;e2==null&nbsp;:&nbsp;e.equals(e2))</tt>. If this
     * set already contains the element, the call leaves the set unchanged and
     * returns <tt>false</tt>.
     *
     * @param e element to be added to this set
     * @return <tt>true</tt> if this set did not already contain the specified
     * element
     */
    @Override
    public boolean add(E e) {
        return map.put(e, PRESENT) == null;
    }

    /**
     * Removes the specified element from this set if it is present. More
     * formally, removes an element <tt>e</tt> such that
     * <tt>(o==null&nbsp;?&nbsp;e==null&nbsp;:&nbsp;o.equals(e))</tt>, if this
     * set contains such an element. Returns <tt>true</tt> if this set contained
     * the element (or equivalently, if this set changed as a result of the
     * call). (This set will not contain the element once the call returns.)
     *
     * @param o object to be removed from this set, if present
     * @return <tt>true</tt> if the set contained the specified element
     */
    @Override
    public boolean remove(Object o) {
        return map.remove(o) == PRESENT;
    }

    /**
     * Removes all of the elements from this set. The set will be empty after
     * this call returns.
     */
    @Override
    public void clear() {
        map.clear();
    }

}
  • ConcurrentHashSet 继承了 AbstractSet,实现了 Set、Serializable 接口;它底层使用 ConcurrentHashMap 来实现,其 value 固定为 PRESENT

实例

dubbo-2.7.2/dubbo-config/dubbo-config-spring/src/main/java/org/apache/dubbo/config/spring/extension/SpringExtensionFactory.java

public class SpringExtensionFactory implements ExtensionFactory {
    private static final Logger logger = LoggerFactory.getLogger(SpringExtensionFactory.class);

    private static final Set<ApplicationContext> CONTEXTS = new ConcurrentHashSet<ApplicationContext>();
    private static final ApplicationListener SHUTDOWN_HOOK_LISTENER = new ShutdownHookListener();

    public static void addApplicationContext(ApplicationContext context) {
        CONTEXTS.add(context);
        if (context instanceof ConfigurableApplicationContext) {
            ((ConfigurableApplicationContext) context).registerShutdownHook();
            DubboShutdownHook.getDubboShutdownHook().unregister();
        }
        BeanFactoryUtils.addApplicationListener(context, SHUTDOWN_HOOK_LISTENER);
    }

    public static void removeApplicationContext(ApplicationContext context) {
        CONTEXTS.remove(context);
    }

    public static Set<ApplicationContext> getContexts() {
        return CONTEXTS;
    }

    // currently for test purpose
    public static void clearContexts() {
        CONTEXTS.clear();
    }

    @Override
    @SuppressWarnings("unchecked")
    public <T> T getExtension(Class<T> type, String name) {

        //SPI should be get from SpiExtensionFactory
        if (type.isInterface() && type.isAnnotationPresent(SPI.class)) {
            return null;
        }

        for (ApplicationContext context : CONTEXTS) {
            if (context.containsBean(name)) {
                Object bean = context.getBean(name);
                if (type.isInstance(bean)) {
                    return (T) bean;
                }
            }
        }

        logger.warn("No spring extension (bean) named:" + name + ", try to find an extension (bean) of type " + type.getName());

        if (Object.class == type) {
            return null;
        }

        for (ApplicationContext context : CONTEXTS) {
            try {
                return context.getBean(type);
            } catch (NoUniqueBeanDefinitionException multiBeanExe) {
                logger.warn("Find more than 1 spring extensions (beans) of type " + type.getName() + ", will stop auto injection. Please make sure you have specified the concrete parameter type and there''s only one extension of that type.");
            } catch (NoSuchBeanDefinitionException noBeanExe) {
                if (logger.isDebugEnabled()) {
                    logger.debug("Error when get spring extension(bean) for type:" + type.getName(), noBeanExe);
                }
            }
        }

        logger.warn("No spring extension (bean) named:" + name + ", type:" + type.getName() + " found, stop get bean.");

        return null;
    }

    private static class ShutdownHookListener implements ApplicationListener {
        @Override
        public void onApplicationEvent(ApplicationEvent event) {
            if (event instanceof ContextClosedEvent) {
                DubboShutdownHook shutdownHook = DubboShutdownHook.getDubboShutdownHook();
                shutdownHook.doDestroy();
            }
        }
    }
}
  • SpringExtensionFactory 实现了 ExtensionFactory 接口,它使用 ConcurrentHashSet 来注册 spring 的 ApplicationContext

小结

ConcurrentHashSet 继承了 AbstractSet,实现了 Set、Serializable 接口;它底层使用 ConcurrentHashMap 来实现,其 value 固定为 PRESENT

doc

  • ConcurrentHashSet

com.intellij.util.containers.ConcurrentHashSet的实例源码

com.intellij.util.containers.ConcurrentHashSet的实例源码

项目:tools-idea    文件:MockComponentManager.java   
public MockComponentManager(@Nullable picocontainer parent,@NotNull disposable parentdisposable) {
  mypicocontainer = new Ideapicocontainer(parent) {
    private Set<Object> mydisposableComponents = new ConcurrentHashSet<Object>();

    @Override
    @Nullable
    public Object getComponentInstance(final Object componentKey) {
      final Object o = super.getComponentInstance(componentKey);
      if (o instanceof disposable && o != MockComponentManager.this) {
        if (mydisposableComponents.add(o))
          disposer.register(MockComponentManager.this,(disposable)o);
      }
      return o;
    }
  };
  mypicocontainer.registerComponentInstance(this);
  disposer.register(parentdisposable,this);
}
项目:tools-idea    文件:SliceLeafAnalyzer.java   
public static Map<SliceNode,Collection<PsiElement>> createMap() {
  return new FactoryMap<SliceNode,Collection<PsiElement>>() {
    @Override
    protected Map<SliceNode,Collection<PsiElement>> createMap() {
      return new ConcurrentHashMap<SliceNode,Collection<PsiElement>>(ContainerUtil.<SliceNode>identityStrategy());
    }

    @Override
    protected Collection<PsiElement> create(SliceNode key) {
      return new ConcurrentHashSet<PsiElement>(SliceLeafAnalyzer.LEAF_ELEMENT_EQUALITY);
    }
  };
}
项目:tools-idea    文件:ExternalSystemProgressnotificationmanagerImpl.java   
@Override
public boolean addNotificationListener(@NotNull ExternalSystemTaskId taskId,@NotNull ExternalSystemTaskNotificationListener listener) {
  Set<ExternalSystemTaskId> ids = null;
  while (ids == null) {
    if (myListeners.containsKey(listener)) {
      ids = myListeners.get(listener);
    }
    else {
      ids = myListeners.putIfAbsent(listener,new ConcurrentHashSet<ExternalSystemTaskId>());
    }
  }
  return ids.add(taskId);
}
项目:tools-idea    文件:VirtualFileTrackerImpl.java   
private static Set<VirtualFileListener> getSet(final String fileUrl,final Map<String,Set<VirtualFileListener>> map) {
  Set<VirtualFileListener> listeners = map.get(fileUrl);

  if (listeners == null) {
    listeners = new ConcurrentHashSet<VirtualFileListener>();
    map.put(fileUrl,listeners);
  }
  return listeners;
}
项目:tools-idea    文件:FormattingProgresstask.java   
private Collection<Runnable> getCallbacks(@NotNull EventType eventType) {
  Collection<Runnable> result = myCallbacks.get(eventType);
  if (result == null) {
    Collection<Runnable> candidate = myCallbacks.putIfAbsent(eventType,result = new ConcurrentHashSet<Runnable>());
    if (candidate != null) {
      result = candidate;
    }
  }
  return result;
}
项目:consulo    文件:ExternalSystemProgressnotificationmanagerImpl.java   
@Override
public boolean addNotificationListener(@Nonnull ExternalSystemTaskId taskId,@Nonnull ExternalSystemTaskNotificationListener listener) {
  Set<ExternalSystemTaskId> ids = null;
  while (ids == null) {
    if (myListeners.containsKey(listener)) {
      ids = myListeners.get(listener);
    }
    else {
      ids = myListeners.putIfAbsent(listener,new ConcurrentHashSet<ExternalSystemTaskId>());
    }
  }
  return ids.add(taskId);
}
项目:consulo    文件:VirtualFileTrackerImpl.java   
private static Set<VirtualFileListener> getSet(final String fileUrl,listeners);
  }
  return listeners;
}
项目:consulo    文件:FormattingProgresstask.java   
private Collection<Runnable> getCallbacks(@Nonnull EventType eventType) {
  Collection<Runnable> result = myCallbacks.get(eventType);
  if (result == null) {
    Collection<Runnable> candidate = myCallbacks.putIfAbsent(eventType,result = new ConcurrentHashSet<Runnable>());
    if (candidate != null) {
      result = candidate;
    }
  }
  return result;
}
项目:consulo-java    文件:SliceLeafAnalyzer.java   
public static Map<SliceNode,Collection<PsiElement>>(ContainerUtil.<SliceNode>identityStrategy());
    }

    @Override
    protected Collection<PsiElement> create(SliceNode key) {
      return new ConcurrentHashSet<PsiElement>(SliceLeafAnalyzer.LEAF_ELEMENT_EQUALITY);
    }
  };
}
项目:tools-idea    文件:IncProjectBuilder.java   
private static CompileContext createContextwrapper(final CompileContext delegate) {
  final ClassLoader loader = delegate.getClass().getClassLoader();
  final UserDataHolderBase localDataHolder = new UserDataHolderBase();
  final Set<Object> deletedKeysSet = new ConcurrentHashSet<Object>();
  final Class<UserDataHolder> dataHolderInterface = UserDataHolder.class;
  final Class<MessageHandler> messageHandlerInterface = MessageHandler.class;
  return (CompileContext)Proxy.newProxyInstance(loader,new Class[]{CompileContext.class},new InvocationHandler() {
    @Override
    public Object invoke(Object proxy,Method method,Object[] args) throws Throwable {
      final Class<?> declaringClass = method.getDeclaringClass();
      if (dataHolderInterface.equals(declaringClass)) {
        final Object firstArgument = args[0];
        if (!(firstArgument instanceof GlobalContextKey)) {
          final boolean isWriteOperation = args.length == 2 /*&& void.class.equals(method.getReturnType())*/;
          if (isWriteOperation) {
            if (args[1] == null) {
              deletedKeysSet.add(firstArgument);
            }
            else {
              deletedKeysSet.remove(firstArgument);
            }
          }
          else {
            if (deletedKeysSet.contains(firstArgument)) {
              return null;
            }
          }
          final Object result = method.invoke(localDataHolder,args);
          if (isWriteOperation || result != null) {
            return result;
          }
        }
      }
      else if (messageHandlerInterface.equals(declaringClass)) {
        final BuildMessage msg = (BuildMessage)args[0];
        if (msg.getKind() == BuildMessage.Kind.ERROR) {
          Utils.ERRORS_DETECTED_KEY.set(localDataHolder,Boolean.TRUE);
        }
      }
      try {
        return method.invoke(delegate,args);
      }
      catch (InvocationTargetException e) {
        final Throwable targetEx = e.getTargetException();
        if (targetEx instanceof ProjectBuildException) {
          throw targetEx;
        }
        throw e;
      }
    }
  });
}
项目:tools-idea    文件:JarFileSystemImpl.java   
public JarFileSystemImpl(MessageBus bus) {
  boolean nocopy = SystemProperties.getBooleanProperty("idea.jars.nocopy",!SystemInfo.isWindows);
  myNocopyJarPaths = nocopy ? null : new ConcurrentHashSet<String>(FileUtil.PATH_HASHING_STRATEGY);

  bus.connect().subscribe(VirtualFileManager.VFS_CHANGES,new BulkFileListener.Adapter() {
    @Override
    public void after(@NotNull final List<? extends VFileEvent> events) {
      final List<VirtualFile> rootsToRefresh = new ArrayList<VirtualFile>();

      for (VFileEvent event : events) {
        if (event.getFileSystem() instanceof LocalFileSystem) {
          String path = event.getPath();

          String[] jarPaths;
          synchronized (LOCK) {
            jarPaths = jarPathsCache;
            if (jarPaths == null) {
              jarPathsCache = jarPaths = ArrayUtil.toStringArray(myHandlers.keySet());
            }
          }

          for (String jarPath : jarPaths) {
            final String jarFile = jarPath.substring(0,jarPath.length() - JAR_SEParaTOR.length());
            if (FileUtil.startsWith(jarFile,path)) {
              VirtualFile jarRoottoRefresh = markDirty(jarPath);
              if (jarRoottoRefresh != null) {
                rootsToRefresh.add(jarRoottoRefresh);
              }
            }
          }
        }
      }

      if (!rootsToRefresh.isEmpty()) {
        final Application app = ApplicationManager.getApplication();
        Runnable runnable = new Runnable() {
          @Override
          public void run() {
            if (app.isdisposed()) return;
            for (VirtualFile root : rootsToRefresh) {
              if (root.isValid()) {
                ((NewVirtualFile)root).markDirtyRecursively();
              }
            }
            RefreshQueue.getInstance().refresh(false,true,null,rootsToRefresh);
          }
        };
        if (app.isUnitTestMode()) {
          runnable.run();
        }
        else {
          app.invokelater(runnable,ModalityState.NON_MODAL);
        }
      }
    }
  });
}
项目:tools-idea    文件:GroovyConstructorUsagesSearcher.java   
static void processConstructorUsages(final PsiMethod constructor,final SearchScope searchScope,final Processor<PsiReference> consumer,final SearchRequestCollector collector,final boolean searchGppCalls,final boolean includeOverloads) {
  if (!constructor.isConstructor()) return;

  final PsiClass clazz = constructor.getContainingClass();
  if (clazz == null) return;

  SearchScope onlyGroovy = GroovyScopeUtil.restrictScopetoGroovyFiles(searchScope,GroovyScopeUtil.getEffectiveScope(constructor));
  Set<PsiClass> processed = collector.getSearchSession().getUserData(LIteraLLY_CONSTRUCTED_CLASSES);
  if (processed == null) {
    collector.getSearchSession().putUserData(LIteraLLY_CONSTRUCTED_CLASSES,processed = new ConcurrentHashSet<PsiClass>());
  }
  if (!processed.add(clazz)) return;

  if (clazz.isEnum() && clazz instanceof GroovyPsiElement) {
    for (PsiField field : clazz.getFields()) {
      if (field instanceof GrEnumConstant) {
        final PsiReference ref = field.getReference();
        if (ref != null && ref.isReferenceto(constructor)) {
          if (!consumer.process(ref)) return;
        }
      }
    }
  }

  final LiteralConstructorSearcher literalProcessor = new LiteralConstructorSearcher(constructor,consumer,includeOverloads);

  final Processor<GrNewExpression> newExpressionProcessor = new Processor<GrNewExpression>() {
    @Override
    public boolean process(GrNewExpression grNewExpression) {
      final PsiMethod resolvedConstructor = grNewExpression.resolveMethod();
      if (includeOverloads || constructor.getManager().areElementsEquivalent(resolvedConstructor,constructor)) {
        return consumer.process(grNewExpression.getReferenceElement());
      }
      return true;
    }
  };

  processGroovyClassUsages(clazz,searchScope,collector,searchGppCalls,newExpressionProcessor,literalProcessor);

  //this()
  if (clazz instanceof GrTypeDeFinition) {
    if (!processConstructors(constructor,clazz,true)) {
      return;
    }
  }
  //super()
  DirectClassInheritoRSSearch.search(clazz,onlyGroovy).forEach(new ReadActionProcessor<PsiClass>() {
    @Override
    public boolean processInReadAction(PsiClass inheritor) {
      if (inheritor instanceof GrTypeDeFinition) {
        if (!processConstructors(constructor,inheritor,false)) return false;
      }
      return true;
    }
  });
}

concurrentHashMap

concurrentHashMap

 

TSHIFT:表格中的位移数量

TBASE :表格进行数据保存时,进行索引跳越的值

1.segment <HashEntry> []  segment表;进行保存

2.HashEntry<K,V>[] 链表结构 保存在segment中

put(){

se=ensureSegment() segement初始化,返回某一个保存的segment (类似于分表,找到表)

se.scanAndLockForPut();获取锁 进行segemen中的hashEntry保存

scanAndLockForPut(){

    entryForHash();获取segement 中 hashEntry数组中第一个保存链表的索引

     进行链表结构的数据保存操作

}

进行数据结构检查 并返回数据值

 

 

ConcurrentHashMap computeIfAbsent

ConcurrentHashMap computeIfAbsent

Java 8中引入了一个新的computeIfAbsent API。ConcurrentHashMap
实现它的javadocs 状态如下:

如果指定的键尚未与值关联,则尝试使用给定的映射函数计算其值,除非为null,否则将其输入此映射。整个方法调用是原子执行的,因此每个键最多可应用一次该功能。在进行计算时,可能会阻止其他线程在此映射上进行的某些尝试的更新操作,因此计算应简短而简单,并且不得尝试更新此映射的任何其他映射。

那么,当密钥已经存在并且不需要计算时,对于该实现的锁定有什么看法呢?即使不需要计算,还是仅同步映射函数调用以防止两次调用该函数,整个方法是否都按照docs中所述进行了同步?

答案1

小编典典

ConcurrentHashMap的实现非常复杂,因为它专门设计为允许并发可读性,同时最大程度地减少了更新争用。在非常高的抽象水平上,它被组织为一个存储桶的哈希表。所有读取操作都不需要锁定,并且(引用javadoc)
“不支持以阻止所有访问的方式锁定整个表”
。为了实现这一点,内部设计非常复杂(但仍然很精致),其中的键值映射保存在节点中,可以利用各种方式(例如列表或平衡树)来安排键值映射,以利用细粒度的锁。如果您对实现细节感兴趣,还可以查看源代码。

尝试回答您的问题:

那么,当密钥已经存在并且不需要计算时,对于该实现的锁定有什么看法呢?

可以合理地认为,与任何读取操作一样,不需要锁定就可以检查密钥是否已存在并且不需要执行映射功能。

即使不需要计算,还是仅同步映射函数调用以防止两次调用该函数,整个方法是否都按照docs中所述进行了同步?

不,该方法在锁定方面不是 同步
的,但是从调用方的角度来看,它是原子执行的(即,映射功能最多只能应用一次)。如果未找到密钥,则必须使用由映射函数计算的值执行更新操作,并且在调用该函数时涉及某种锁定。可以合理地认为这样的锁定是非常细粒度的,并且只涉及表的一小部分(很好,必须存储键的特定数据结构),这就是为什么(引用javadoc,强调我的观点)
“ 计算正在进行时,其他线程尝试的 某些 更新操作 可能会 被阻止”

ConcurrentHashMap 、HashTable

ConcurrentHashMap 、HashTable

java集合类中非线程安全的集合可以用同步包装器使集合变成线程安全,比如:
HashMap --> Collections.synchronizedMap(new HashMap()),
那如果比较Collections.synchronizedMap(new HashMap())和HashTable的话,
哪个的效率更高呢?类似的对应集合(HashMap-HashTable)还有哪些呢,每个

的效率又如何呢?

其实Java集合用到了很多的数据结构,用不同的数据结构实现的Java集合类,当然在不同的场合中使用有不同的性能表现;下面简单说说你提到的这两个类似的java集合类在多线程下的效率问题(一般都是时间方面):

首先 Collections.synchronizedMap(new HashMap())和HashTable 都是同步集合,Collections.synchronizedMap采用的同步块实现,HashTable采用的是同步方法实现,在多线程环境下,一般认为是同步块好于同步方法
其次,说到多线程,那线程安全是很重要的,这两个同步集合对于一些常见的简单操作:如put, remove, get等,线程安全对于单个操作来说是没什么问题的,但对于一些复合操作来说,如同时put和remove,迭代等,要保证线程安全,就必须在使用时 同步加锁 了,也就是确保像 put和remove 是一个原子操作,当然两种Java集合类都采用hash算法实现,运行效率不会有什么大的区别;前面说到它们都是同步集合,也就是说在多线程下并发操作时,不管是简单的操作还是复合操作,为了线程安全,都是串行的操作,这样就会大大降低程序的效率,所以呢,并发集合ConcurrentHashMap出现了,这样即解决的并发问题,也解决了效率问题;
最后,建议你在多线程下,就用ConcurrentHashMap吧,用它来替换你说的那两个同步集合;

鱼和熊掌的问题了。
比如竞标系统,系统不会太大,而且并发量也不会太高。
这时候你就不需要去考虑空间管理等问题,速度和效率是第一位的。

而如果是银行项目,如果效率和安全二选其一,你是程序员,你怎么选?


http://blog.csdn.net/ghevinn/article/details/37764791


关于聊聊 dubbo 的 ConcurrentHashSetdubbo consul的介绍现已完结,谢谢您的耐心阅读,如果想了解更多关于com.intellij.util.containers.ConcurrentHashSet的实例源码、concurrentHashMap、ConcurrentHashMap computeIfAbsent、ConcurrentHashMap 、HashTable的相关知识,请在本站寻找。

本文标签: