Solr创建索引源码解析(3)

public UpdateResponse optimize(boolean waitFlush, boolean waitSearcher,                int maxSegments ) throws SolrServerException, IOException {
            return new UpdateRequest().setAction( UpdateRequest.ACTION.OPTIMIZE, waitFlush, waitSearcher, maxSegments ).process( this );//同样调用process,通过setAction参数,在CommonHttpSolrServer类方法request()中主要执行的是合并和压缩  setAction都是为了对对象ModifiableSolrParams(这个对象在最终CommonHttpSolrServer的request方法中用的到)进行赋值
  }

5.既然上面都提到了process方法,那我们来看看

@Override
    public UpdateResponse process( SolrServer server ) throws SolrServerException,            IOException
    {
          long startTime = System.currentTimeMillis();
          UpdateResponse res = new UpdateResponse();
          res.setResponse( server.request( this ) );//这里面这个方法可是重点之重啊,这是调用了 CommonHttpSolrServer类中的request方法
          res.setElapsedTime( System.currentTimeMillis()-startTime );
          return res;
    }

6.最终的方法是SolrServer的子类CommonHttpSolrServer类的request方法,咋再来看看这个方法是怎么工作的

public NamedList<Object> request(final SolrRequest request, ResponseParser processor    ) throws SolrServerException, IOException {
   
    HttpMethod method = null;
    InputStream is = null;
    SolrParams params = request.getParams();
    Collection<ContentStream> streams = requestWriter.getContentStreams(request);
    String path = requestWriter.getPath(request);
   
    //创建索引进来的是/update  /select 为查询 
    if( path == null || !path.startsWith( "/" ) ) {
      path = "/select";
    }
   
    ResponseParser parser = request.getResponseParser();
    if( parser == null ) {
      parser = _parser;
    }
   
    // The parser 'wt=' and 'version=' params are used instead of the original params
    ModifiableSolrParams wparams = new ModifiableSolrParams();
    wparams.set( CommonParams.WT, parser.getWriterType() );
    wparams.set( CommonParams.VERSION, parser.getVersion());
    if( params == null ) {
      params = wparams;
    }
    else {
      params = new DefaultSolrParams( wparams, params );
    }
   
    if( _invariantParams != null ) {
      params = new DefaultSolrParams( _invariantParams, params );
    }

int tries = _maxRetries + 1;
    try {
      while( tries-- > 0 ) {
        // Note: since we aren't do intermittent time keeping
        // ourselves, the potential non-timeout latency could be as
        // much as tries-times (plus scheduling effects) the given
        // timeAllowed.
        try {//通过使用查看solr源码,在使用UpdateRequest对象时会自动设置为Post
          if( SolrRequest.METHOD.GET == request.getMethod() ) {
            if( streams != null ) {
                  <SPAN></SPAN>throw new SolrException( SolrException.ErrorCode.BAD_REQUEST, "GET can't send streams!" );
            }
            method = new GetMethod( _baseURL + path + ClientUtils.toQueryString( params, false ) );
          }
          else if( SolrRequest.METHOD.POST == request.getMethod() ) {//所以我们直接看

String url = _baseURL + path;
            boolean isMultipart = ( streams != null && streams.size() > 1 );

if (streams == null || isMultipart) {
              PostMethod post = new PostMethod(url);//设置post,包括request头部、内容、参数、等等一些操作
              post.getParams().setContentCharset("UTF-8");
              if (!this.useMultiPartPost && !isMultipart) {
                post.addRequestHeader("Content-Type",
                    "application/x-www-form-urlencoded; charset=UTF-8");
              }

List<Part> parts = new LinkedList<Part>();
              Iterator<String> iter = params.getParameterNamesIterator();
              while (iter.hasNext()) {
                String p = iter.next();
                String[] vals = params.getParams(p);
                if (vals != null) {
                  for (String v : vals) {
                    if (this.useMultiPartPost || isMultipart) {
                      parts.add(new StringPart(p, v, "UTF-8"));
                    } else {
                      post.addParameter(p, v);
                    }
                  }
                }
              }

if (isMultipart) {
                int i = 0;
                for (ContentStream content : streams) {
                  final ContentStream c = content;

String charSet = null;
                  PartSource source = new PartSource() {
                    public long getLength() {
                      return c.getSize();
                    }
                    public String getFileName() {
                      return c.getName();
                    }
                    public InputStream createInputStream() throws IOException {
                      return c.getStream();
                    }
                  };
               
                  parts.add(new FilePart(c.getName(), source,
                                        c.getContentType(), charSet));
                }
              }
              if (parts.size() > 0) {
                post.setRequestEntity(new MultipartRequestEntity(parts
                    .toArray(new Part[parts.size()]), post.getParams()));
              }

method = post;
            }
            // It is has one stream, it is the post body, put the params in the URL
            else {
              String pstr = ClientUtils.toQueryString(params, false);
              PostMethod post = new PostMethod(url + pstr);

// Single stream as body
              // Using a loop just to get the first one
              final ContentStream[] contentStream = new ContentStream[1];
              for (ContentStream content : streams) {
                contentStream[0] = content;
                break;
              }
              if (contentStream[0] instanceof RequestWriter.LazyContentStream) {
                post.setRequestEntity(new RequestEntity() {
                  public long getContentLength() {
                    return -1;
                  }

public String getContentType() {
                    return contentStream[0].getContentType();
                  }

public boolean isRepeatable() {
                    return false;
                  }

public void writeRequest(OutputStream outputStream) throws IOException {
                    ((RequestWriter.LazyContentStream) contentStream[0]).writeTo(outputStream);
                  }
                }
                );

} else {
                is = contentStream[0].getStream();
                post.setRequestEntity(new InputStreamRequestEntity(is, contentStream[0].getContentType()));
              }
              method = post;
            }
          }
          else {
            throw new SolrServerException("Unsupported method: "+request.getMethod() );
          }
        }
        catch( NoHttpResponseException r ) {
          // This is generally safe to retry on
          method.releaseConnection();
          method = null;
          if(is != null) {
            is.close();
          }
          // If out of tries then just rethrow (as normal error).
          if( ( tries < 1 ) ) {
            throw r;
          }
          //log.warn( "Caught: " + r + ". Retrying..." );
        }
      }
    }
    catch( IOException ex ) {
      throw new SolrServerException("error reading streams", ex );
    }

method.setFollowRedirects( _followRedirects );
    method.addRequestHeader( "User-Agent", AGENT );
    if( _allowCompression ) {
      method.setRequestHeader( new Header( "Accept-Encoding", "gzip,deflate" ) );
    }

try {
      // Execute the method.
      //System.out.println( "EXECUTE:"+method.getURI() );
      //执行请求,返回状态码,然后组装response 最后返回
      int statusCode = _httpClient.executeMethod(method);
      if (statusCode != HttpStatus.SC_OK) {
        StringBuilder msg = new StringBuilder();
        msg.append( method.getStatusLine().getReasonPhrase() );
        msg.append( "\n\n" );
        msg.append( method.getStatusText() );
        msg.append( "\n\n" );
        msg.append( "request: "+method.getURI() );
        throw new SolrException(statusCode, Java.net.URLDecoder.decode(msg.toString(), "UTF-8") );
      }

// Read the contents
      String charset = "UTF-8";
      if( method instanceof HttpMethodBase ) {
        charset = ((HttpMethodBase)method).getResponseCharSet();
      }
      InputStream respBody = method.getResponseBodyAsStream();
      // Jakarta Commons HTTPClient doesn't handle any
      // compression natively.  Handle gzip or deflate
      // here if applicable.
      if( _allowCompression ) {
        Header contentEncodingHeader = method.getResponseHeader( "Content-Encoding" );
        if( contentEncodingHeader != null ) {
          String contentEncoding = contentEncodingHeader.getValue();
          if( contentEncoding.contains( "gzip" ) ) {
            //log.debug( "wrapping response in GZIPInputStream" );
            respBody = new GZIPInputStream( respBody );
          }
          else if( contentEncoding.contains( "deflate" ) ) {
            //log.debug( "wrapping response in InflaterInputStream" );
            respBody = new InflaterInputStream(respBody);
          }
        }
        else {
          Header contentTypeHeader = method.getResponseHeader( "Content-Type" );
          if( contentTypeHeader != null ) {
            String contentType = contentTypeHeader.getValue();
            if( contentType != null ) {
              if( contentType.startsWith( "application/x-gzip-compressed" ) ) {
                //log.debug( "wrapping response in GZIPInputStream" );
                respBody = new GZIPInputStream( respBody );
              }
              else if ( contentType.startsWith("application/x-deflate") ) {
                //log.debug( "wrapping response in InflaterInputStream" );
                respBody = new InflaterInputStream(respBody);
              }
            }
          }
        }
      }
      return processor.processResponse(respBody, charset);
    }
    catch (HttpException e) {
      throw new SolrServerException( e );
    }
    catch (IOException e) {
      throw new SolrServerException( e );
    }
    finally {
      method.releaseConnection();
      if(is != null) {
        is.close();
      }
    }
  }

下面是文字说明:

1.查询数据库或者读取文件等等  按找自己的方式存入SolrInputDocument中、 SolrInputDocument中会定义一个map来存储  (正真的对象是SolrInputFiled)

2.初始化CommonHttpSolrServer  ,包括服务url(solr服务地址)、超时时间、最大链接数等等 (SolrUtil类)

3.SolrServer类的add/commit/optimize方法最终调用的都是 AbstractUpdateRequest类中的process方法

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:http://www.heiqu.com/64d95134413939bb0ecdb114e34209ce.html