首先配置BinaryAnnotation信息,然后执行serverTracer.setServerSend,在finally中清除当前线程中的Span信息(不管前面是否清楚成功,最终都将执行该不走),ThreadLocal中的数据要做到有始有终
看serverTracer.setServerSend()
public void setServerSend() { if (submitEndAnnotation(zipkinCoreConstants.SERVER_SEND, spanCollector())) { spanAndEndpoint().state().setCurrentServerSpan(null); } }终于看到spanCollector收集器了,说明下面将看是收集Span信息,这里为ss注解
boolean submitEndAnnotation(String annotationName, SpanCollector spanCollector) { Span span = spanAndEndpoint().span(); if (span == null) { return false; } Annotation annotation = Annotation.create( currentTimeMicroseconds(), annotationName, spanAndEndpoint().endpoint() ); span.addToAnnotations(annotation); if (span.getTimestamp() != null) { span.setDuration(annotation.timestamp - span.getTimestamp()); } spanCollector.collect(span); return true; }首先获取当前线程中的Span信息,然后处理注解信息,通过annotation.timestamp - span.getTimestamp()计算延迟,
调用spanCollector.collect(span)进行收集Span信息,那么Span信息是同步收集的吗?肯定不是的,接着看
调用spanCollector.collect(span)则执行FlushingSpanCollector中的collect方法
@Override public void collect(Span span) { metrics.incrementAcceptedSpans(1); if (!pending.offer(span)) { metrics.incrementDroppedSpans(1); } }首先进行的是metrics统计信息,可以自定义该SpanCollectorMetricsHandler信息收集各指标信息,利用如grafana等展示信息
pending.offer(span)将span信息存储在BlockingQueue中,然后通过定时任务去取出阻塞队列中的值,偷偷摸摸的上传span信息
定时任务利用了Flusher类来执行,在构造FlushingSpanCollector时构造了Flusher类
static final class Flusher implements Runnable { final Flushable flushable; final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1); Flusher(Flushable flushable, int flushInterval) { this.flushable = flushable; this.scheduler.scheduleWithFixedDelay(this, 0, flushInterval, SECONDS); } @Override public void run() { try { flushable.flush(); } catch (IOException ignored) { } } }创建了一个核心线程数为1的线程池,每间隔flushInterval秒执行一次Span信息上传,执行flush方法
@Override public void flush() { if (pending.isEmpty()) return; List<Span> drained = new ArrayList<Span>(pending.size()); pending.drainTo(drained); if (drained.isEmpty()) return; int spanCount = drained.size(); try { reportSpans(drained); } catch (IOException e) { metrics.incrementDroppedSpans(spanCount); } catch (RuntimeException e) { metrics.incrementDroppedSpans(spanCount); } }首先将阻塞队列中的值全部取出存如集合中,最后调用reportSpans(List<Span> drained)抽象方法,该方法在AbstractSpanCollector得到覆写
@Override protected void reportSpans(List<Span> drained) throws IOException { byte[] encoded = codec.writeSpans(drained); sendSpans(encoded); }转换成字节流后调用sendSpans抽象方法发送Span信息,此时就回到一开始说的HttpSpanCollector通过HttpURLConnection实现的sendSpans方法。