分类目录归档:elasticsearch

Elasticsearch源码分析之一——使用Guice进行依赖注入与模块化系统

elasticsearch使用google开源的依赖注入框架guice,这个项目号称比spring快100倍,具体性能没有测试过,不过由于其代码比较简洁,比spring快很有可能,是不是快那么多就不知道了。先介绍下guice的基本使用方法。

elasticsearch是直接把guice的源码放到自己的包内(es把很多开源项目的代码都直接集成到自己项目中,省得依赖一堆的jar包,也使es的jar包达到差不多10M),在org.elasticsearch.common.inject目录下。
Guice主要是使用Module这个接口来确定各个接口和它们对应的实现。这个Module是个单例的抽象接口,通过bind(A).to(B)来绑定指定实例到这个模块中,下面看下Guice官方文档中的例子:
public class BillingModule extends AbstractModule {
  @Override
  protected void configure() {
    bind(TransactionLog.class).to(DatabaseTransactionLog.class);
    bind(CreditCardProcessor.class).to(PaypalCreditCardProcessor.class);
    bind(BillingService.class).to(RealBillingService.class);
  }
}
上面定义了一个订单模块,扩展AbstractModule这个抽象类。这个模块里面有三个实例:交易日志、支付过程和账单服务。通过bind(“interface”).to(“implement”)来使接口和实现绑定。
public class RealBillingService implements BillingService {
  private final CreditCardProcessor processor;
  private final TransactionLog transactionLog;
  @Inject
  public RealBillingService(CreditCardProcessor processor,
      TransactionLog transactionLog) {
    this.processor = processor;
    this.transactionLog = transactionLog;
  }
  public Receipt chargeOrder(PizzaOrder order, CreditCard creditCard) {
    try {
      ChargeResult result = processor.charge(creditCard, order.getAmount());
      transactionLog.logChargeResult(result);
      return result.wasSuccessful()
          ? Receipt.forSuccessfulCharge(order.getAmount())
          : Receipt.forDeclinedCharge(result.getDeclineMessage());
     } catch (UnreachableException e) {
      transactionLog.logConnectException(e);
      return Receipt.forSystemFailure(e.getMessage());
    }
  }
}
上面类是BillService接口的实现类。其中要注意的就是@Inject这个注释。Guice的Injector类会扫描@Inject这类注释,找到方法中传入参数的实例进行注入。如上面的CreditCardLog和TransactionLog。
publicstaticvoidmain(String[] args) {
    Injector injector = Guice.createInjector(newBillingModule());
    BillingService billingService = injector.getInstance(BillingService.class);
    ...
  }
最后,在main方法中使用Injector进行注入与获取实例。这就是使用Guice进行依赖注入的一个简单例子。elasticsearch里面的组件基本都是用上面的方式进行模块化管理,elasticsearch对guice进行了简单的封装,通过ModulesBuilder类构建es的模块,一个es节点包括下面模块:
PluginsModule:插件模块
SettingsModule:设置参数模块
NodeModule:节点模块
NetworkModule:网络模块
NodeCacheModule:缓存模块
ScriptModule:脚本模块
JmxModule:jmx模块
EnvironmentModule:环境模块
NodeEnvironmentModule:节点环境模块
ClusterNameModule:集群名模块
ThreadPoolModule:线程池模块
DiscoveryModule:自动发现模块
ClusterModule:集群模块
RestModule:rest模块
TransportModule:tcp模块
HttpServerModule:http模块
RiversModule:river模块
IndicesModule:索引模块
SearchModule:搜索模块
ActionModule:行为模块
MonitorModule:监控模块
GatewayModule:持久化模块
NodeClientModule:客户端模块

接下来的文章会分析其中一些重要的模块。

摘自:http://www.searchtech.pro/articles/2013/02/15/1360942810308.html

分布式搜索Elasticsearch源码分析之二–索引过程源码概要分析

elasticsearch的索引逻辑简单分析,这里只是理清主要的脉络,一些细节方面以后的文章或会阐述。
假如通过java api来调用es的索引接口,先是构造成一个json串(es里表示为XContent,是对要处理的内容进行抽象),在IndexRequest里面指定要索引文档到那个索引库(index)、其类型(type)还有文档的id,如果没有指定文档的id,es会通过UUID工具自动生成一个uuid,代码在IndexRequest的process方法内。
if (allowIdGeneration) {
     if (id == null) {
         id(UUID.randomBase64UUID());
         opType(IndexRequest.OpType.CREATE);
     }
 }
然后使用封装过netty的TransportService通过tcp协议发送请求到es服务器(rest的话就是通过http协议)。
服务器获得TransportAction后解析索引请求(TransportShardReplicationOperationAction)。到AsyncShardOperationAction.start()方法开始进行分片操作,先读取集群状态,把目标索引及其分片信息提取出来,根据索引数据的id、类型以及索引分片信息进行哈希取模,确定把该条数据分配到那个分片。
private int shardId(ClusterState clusterState, String index, String type, @Nullable String id, @Nullable String routing) {
     if (routing == null) {
         if (!useType) {
             return Math.abs(hash(id) % indexMetaData(clusterState, index).numberOfShards());
         } else {
             return Math.abs(hash(type, id) % indexMetaData(clusterState, index).numberOfShards());
         }
     }
     return Math.abs(hash(routing) % indexMetaData(clusterState, index).numberOfShards());
 }
并找到数据要分配到的分片的主分片,先把索引请求提交到主分片处理(TransportIndexAction.shardOperationOnPrimary)。
判断是否必须要指定routing值
MappingMetaData mappingMd = clusterState.metaData().index(request.index()).mappingOrDefault(request.type());
  if (mappingMd != null && mappingMd.routing().required()) {
      if (request.routing() == null) {
          throw new RoutingMissingException(request.index(), request.type(), request.id());
      }
  }
判断索引操作的类型,索引操作有两种,一种是INDEX,当要索引的文档id已经存在时,不会覆盖原来的文档,只是更新原来文档。一种是CREATE,当索引文档id存在时,会抛出该文档已存在的错误。
if (request.opType() == IndexRequest.OpType.INDEX)
调用InternalIndexShard进行索引操作。
Engine.Index index = indexShard.prepareIndex(sourceToParse)
        .version(request.version())
        .versionType(request.versionType())
        .origin(Engine.Operation.Origin.PRIMARY);
indexShard.index(index);
通过(InternalIndexShard)查找与请求索引数据类型(type)相符的mapping。对要索引的json字符串进行解析,根据mapping转换为对应的解析结果ParsedDocument 。
public Engine.Index prepareIndex(SourceToParse source) throws ElasticSearchException {
    long startTime = System.nanoTime();
    DocumentMapper docMapper = mapperService.documentMapperWithAutoCreate(source.type());
    ParsedDocument doc = docMapper.parse(source);
    return new Engine.Index(docMapper, docMapper.uidMapper().term(doc.uid()), doc).startTime(startTime);
}
最后调用RobinEngine中的相关方法(添加或修改)对底层lucene进行操作,这里是写入到lucene的内存索引中(RobinEngine.innerIndex)。
if (currentVersion == -1) {
       // document does not exists, we can optimize for create
       if (index.docs().size() > 1) {
           writer.addDocuments(index.docs(), index.analyzer());
       } else {
           writer.addDocument(index.docs().get(0), index.analyzer());
       }
   } else {
       if (index.docs().size() > 1) {
           writer.updateDocuments(index.uid(), index.docs(), index.analyzer());
       } else {
           writer.updateDocument(index.uid(), index.docs().get(0), index.analyzer());
       }
   }
写入内存索引后还会写入到Translog(Translog是对索引的操作日志,会记录没有持久化的操作)中,防止flush前断电导致索引数据丢失。
Translog.Location translogLocation = translog.add(new Translog.Create(create));
主分片索引请求完就把请求发给副本进行索引操作。最后把成功信息返回给客户端。
摘自:http://www.searchtech.pro/articles/2013/02/15/1360941961206.html