public UpdateResponse optimize(boolean waitFlush, boolean waitSearcher, int maxSegments ) throws SolrServerException, IOException {
return new UpdateRequest().setAction( UpdateRequest.ACTION.OPTIMIZE, waitFlush, waitSearcher, maxSegments ).process( this );//同样调用process,通过setAction参数,在CommonHttpSolrServer类方法request()中主要执行的是合并和压缩 setAction都是为了对对象ModifiableSolrParams(这个对象在最终CommonHttpSolrServer的request方法中用的到)进行赋值
}
5.既然上面都提到了process方法,那我们来看看
@Override
public UpdateResponse process( SolrServer server ) throws SolrServerException, IOException
{
long startTime = System.currentTimeMillis();
UpdateResponse res = new UpdateResponse();
res.setResponse( server.request( this ) );//这里面这个方法可是重点之重啊,这是调用了 CommonHttpSolrServer类中的request方法
res.setElapsedTime( System.currentTimeMillis()-startTime );
return res;
}
6.最终的方法是SolrServer的子类CommonHttpSolrServer类的request方法,咋再来看看这个方法是怎么工作的
public NamedList<Object> request(final SolrRequest request, ResponseParser processor ) throws SolrServerException, IOException {
HttpMethod method = null;
InputStream is = null;
SolrParams params = request.getParams();
Collection<ContentStream> streams = requestWriter.getContentStreams(request);
String path = requestWriter.getPath(request);
//创建索引进来的是/update /select 为查询
if( path == null || !path.startsWith( "/" ) ) {
path = "/select";
}
ResponseParser parser = request.getResponseParser();
if( parser == null ) {
parser = _parser;
}
// The parser 'wt=' and 'version=' params are used instead of the original params
ModifiableSolrParams wparams = new ModifiableSolrParams();
wparams.set( CommonParams.WT, parser.getWriterType() );
wparams.set( CommonParams.VERSION, parser.getVersion());
if( params == null ) {
params = wparams;
}
else {
params = new DefaultSolrParams( wparams, params );
}
if( _invariantParams != null ) {
params = new DefaultSolrParams( _invariantParams, params );
}
int tries = _maxRetries + 1;
try {
while( tries-- > 0 ) {
// Note: since we aren't do intermittent time keeping
// ourselves, the potential non-timeout latency could be as
// much as tries-times (plus scheduling effects) the given
// timeAllowed.
try {//通过使用查看solr源码,在使用UpdateRequest对象时会自动设置为Post
if( SolrRequest.METHOD.GET == request.getMethod() ) {
if( streams != null ) {
<SPAN></SPAN>throw new SolrException( SolrException.ErrorCode.BAD_REQUEST, "GET can't send streams!" );
}
method = new GetMethod( _baseURL + path + ClientUtils.toQueryString( params, false ) );
}
else if( SolrRequest.METHOD.POST == request.getMethod() ) {//所以我们直接看
String url = _baseURL + path;
boolean isMultipart = ( streams != null && streams.size() > 1 );
if (streams == null || isMultipart) {
PostMethod post = new PostMethod(url);//设置post,包括request头部、内容、参数、等等一些操作
post.getParams().setContentCharset("UTF-8");
if (!this.useMultiPartPost && !isMultipart) {
post.addRequestHeader("Content-Type",
"application/x-www-form-urlencoded; charset=UTF-8");
}
List<Part> parts = new LinkedList<Part>();
Iterator<String> iter = params.getParameterNamesIterator();
while (iter.hasNext()) {
String p = iter.next();
String[] vals = params.getParams(p);
if (vals != null) {
for (String v : vals) {
if (this.useMultiPartPost || isMultipart) {
parts.add(new StringPart(p, v, "UTF-8"));
} else {
post.addParameter(p, v);
}
}
}
}
if (isMultipart) {
int i = 0;
for (ContentStream content : streams) {
final ContentStream c = content;
String charSet = null;
PartSource source = new PartSource() {
public long getLength() {
return c.getSize();
}
public String getFileName() {
return c.getName();
}
public InputStream createInputStream() throws IOException {
return c.getStream();
}
};
parts.add(new FilePart(c.getName(), source,
c.getContentType(), charSet));
}
}
if (parts.size() > 0) {
post.setRequestEntity(new MultipartRequestEntity(parts
.toArray(new Part[parts.size()]), post.getParams()));
}
method = post;
}
// It is has one stream, it is the post body, put the params in the URL
else {
String pstr = ClientUtils.toQueryString(params, false);
PostMethod post = new PostMethod(url + pstr);
// Single stream as body
// Using a loop just to get the first one
final ContentStream[] contentStream = new ContentStream[1];
for (ContentStream content : streams) {
contentStream[0] = content;
break;
}
if (contentStream[0] instanceof RequestWriter.LazyContentStream) {
post.setRequestEntity(new RequestEntity() {
public long getContentLength() {
return -1;
}
public String getContentType() {
return contentStream[0].getContentType();
}
public boolean isRepeatable() {
return false;
}
public void writeRequest(OutputStream outputStream) throws IOException {
((RequestWriter.LazyContentStream) contentStream[0]).writeTo(outputStream);
}
}
);
} else {
is = contentStream[0].getStream();
post.setRequestEntity(new InputStreamRequestEntity(is, contentStream[0].getContentType()));
}
method = post;
}
}
else {
throw new SolrServerException("Unsupported method: "+request.getMethod() );
}
}
catch( NoHttpResponseException r ) {
// This is generally safe to retry on
method.releaseConnection();
method = null;
if(is != null) {
is.close();
}
// If out of tries then just rethrow (as normal error).
if( ( tries < 1 ) ) {
throw r;
}
//log.warn( "Caught: " + r + ". Retrying..." );
}
}
}
catch( IOException ex ) {
throw new SolrServerException("error reading streams", ex );
}
method.setFollowRedirects( _followRedirects );
method.addRequestHeader( "User-Agent", AGENT );
if( _allowCompression ) {
method.setRequestHeader( new Header( "Accept-Encoding", "gzip,deflate" ) );
}
try {
// Execute the method.
//System.out.println( "EXECUTE:"+method.getURI() );
//执行请求,返回状态码,然后组装response 最后返回
int statusCode = _httpClient.executeMethod(method);
if (statusCode != HttpStatus.SC_OK) {
StringBuilder msg = new StringBuilder();
msg.append( method.getStatusLine().getReasonPhrase() );
msg.append( "\n\n" );
msg.append( method.getStatusText() );
msg.append( "\n\n" );
msg.append( "request: "+method.getURI() );
throw new SolrException(statusCode, Java.net.URLDecoder.decode(msg.toString(), "UTF-8") );
}
// Read the contents
String charset = "UTF-8";
if( method instanceof HttpMethodBase ) {
charset = ((HttpMethodBase)method).getResponseCharSet();
}
InputStream respBody = method.getResponseBodyAsStream();
// Jakarta Commons HTTPClient doesn't handle any
// compression natively. Handle gzip or deflate
// here if applicable.
if( _allowCompression ) {
Header contentEncodingHeader = method.getResponseHeader( "Content-Encoding" );
if( contentEncodingHeader != null ) {
String contentEncoding = contentEncodingHeader.getValue();
if( contentEncoding.contains( "gzip" ) ) {
//log.debug( "wrapping response in GZIPInputStream" );
respBody = new GZIPInputStream( respBody );
}
else if( contentEncoding.contains( "deflate" ) ) {
//log.debug( "wrapping response in InflaterInputStream" );
respBody = new InflaterInputStream(respBody);
}
}
else {
Header contentTypeHeader = method.getResponseHeader( "Content-Type" );
if( contentTypeHeader != null ) {
String contentType = contentTypeHeader.getValue();
if( contentType != null ) {
if( contentType.startsWith( "application/x-gzip-compressed" ) ) {
//log.debug( "wrapping response in GZIPInputStream" );
respBody = new GZIPInputStream( respBody );
}
else if ( contentType.startsWith("application/x-deflate") ) {
//log.debug( "wrapping response in InflaterInputStream" );
respBody = new InflaterInputStream(respBody);
}
}
}
}
}
return processor.processResponse(respBody, charset);
}
catch (HttpException e) {
throw new SolrServerException( e );
}
catch (IOException e) {
throw new SolrServerException( e );
}
finally {
method.releaseConnection();
if(is != null) {
is.close();
}
}
}
下面是文字说明:
1.查询数据库或者读取文件等等 按找自己的方式存入SolrInputDocument中、 SolrInputDocument中会定义一个map来存储 (正真的对象是SolrInputFiled)
2.初始化CommonHttpSolrServer ,包括服务url(solr服务地址)、超时时间、最大链接数等等 (SolrUtil类)
3.SolrServer类的add/commit/optimize方法最终调用的都是 AbstractUpdateRequest类中的process方法