foreach (var property in value)
{
var propertyKey = $"{tuple.Key}/{property.Key}";
switch (property.Value.Type)
{
case JTokenType.Object:
foreach (var item in Flatten(KeyValuePair.Create(propertyKey, property.Value)))
yield return item;
break;
case JTokenType.Array:
break;
default:
yield return KeyValuePair.Create(propertyKey, property.Value.Value<string>());
break;
}
}
}
}
我们可以进一步使用 consul 的变更通知。它只是通过添加一个参数(最后一个索引配置的值)来工作的,HTTP 请求会一直阻塞,直到下一次配置变更(或 HttpClient 超时)。
与前面的类相比,我们只需添加一个方法 ListenToConfigurationChanges,以便在后台监听 consul 的阻塞 HTTP 。
public class ConsulConfigurationProvider : ConfigurationProvider
{
private const string ConsulIndexHeader = "X-Consul-Index";
private readonly string _path;
private readonly HttpClient _httpClient;
private readonly IReadOnlyList<Uri> _consulUrls;
private readonly Task _configurationListeningTask;
private int _consulUrlIndex;
private int _failureCount;
private int _consulConfigurationIndex;
public ConsulConfigurationProvider(IEnumerable<Uri> consulUrls, string path)
{
_path = path;
_consulUrls = consulUrls.Select(u => new Uri(u, $"v1/kv/{path}")).ToList();
if (_consulUrls.Count <= 0)
{
throw new ArgumentOutOfRangeException(nameof(consulUrls));
}
_httpClient = new HttpClient(new HttpClientHandler { AutomaticDecompression = DecompressionMethods.Deflate | DecompressionMethods.GZip }, true);
_configurationListeningTask = new Task(ListenToConfigurationChanges);
}
public override void Load() => LoadAsync().ConfigureAwait(false).GetAwaiter().GetResult();
private async Task LoadAsync()
{
Data = await ExecuteQueryAsync();
if (_configurationListeningTask.Status == TaskStatus.Created)
_configurationListeningTask.Start();
}
private async void ListenToConfigurationChanges()
{
while (true)
{
try
{
if (_failureCount > _consulUrls.Count)
{
_failureCount = 0;
await Task.Delay(TimeSpan.FromMinutes(1));
}
Data = await ExecuteQueryAsync(true);
OnReload();
_failureCount = 0;
}
catch (TaskCanceledException)
{
_failureCount = 0;
}
catch
{
_consulUrlIndex = (_consulUrlIndex + 1) % _consulUrls.Count;
_failureCount++;
}
}
}