diff --git a/README.md b/README.md index e1e0822..1e73e26 100644 --- a/README.md +++ b/README.md @@ -1,6 +1,8 @@ # InfluxDB .NET Collector [![Build status](https://ci.appveyor.com/api/projects/status/0tqovixkf1e1pqu3/branch/master?svg=true)](https://ci.appveyor.com/project/NicholasBlumhardt/influxdb-lineprotocol/branch/master) [![NuGet Version](http://img.shields.io/nuget/v/InfluxDB.LineProtocol.svg?style=flat)](https://www.nuget.org/packages/InfluxDB.LineProtocol/) -This is a C# implementation of the [InfluxDB](http://influxdb.org) ingestion ['Line Protocol'](https://influxdb.com/docs/v0.9/write_protocols/line.html). +### Note: This library is for use with InfluxDB 1.x. For connecting to InfluxDB 2.x instances, please use the [influxdb-client-csharp](https://github.com/influxdata/influxdb-client-csharp) client. + +This is a C# implementation of the [InfluxDB](http://influxdb.org) ingestion ['Line Protocol'](https://docs.influxdata.com/influxdb/latest/write_protocols/line_protocol_tutorial/). You can use it to write time series data to InfluxDB version 0.9.3+ over HTTP or HTTPS. Two packages are provided: @@ -48,7 +50,7 @@ Metrics.Write("cpu_time", Metrics.Measure("working_set", process.WorkingSet64); ``` -View aggregated metrics in a dashboarding interface such as [Grafana](http://grafana.org). +View aggregated metrics in a dashboarding interface such as [Chronograf](https://www.influxdata.com/time-series-platform/chronograf/) or [Grafana](http://grafana.org). ## Raw Client API diff --git a/influxdb-csharp.sln b/influxdb-csharp.sln index c709e6c..35103bb 100644 --- a/influxdb-csharp.sln +++ b/influxdb-csharp.sln @@ -1,7 +1,7 @@  Microsoft Visual Studio Solution File, Format Version 12.00 # Visual Studio 15 -VisualStudioVersion = 15.0.26730.3 +VisualStudioVersion = 15.0.27130.2036 MinimumVisualStudioVersion = 10.0.40219.1 Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "src", "src", "{72DC28B9-37B5-425C-8532-5CA91D253A70}" EndProject @@ -31,6 +31,10 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "InfluxDB.Collector", "src\I EndProject Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Benchmark", "sample\Benchmark\Benchmark.csproj", "{2A34EE83-FB59-4A41-8BB5-174BE678533E}" EndProject +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "InfluxDb.UdpSupport.ConsoleTest", "test\Consoles\InfluxDb.UdpSupport.ConsoleTest\InfluxDb.UdpSupport.ConsoleTest.csproj", "{1B138F1D-A772-4656-9142-442DEFB969E5}" +EndProject +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Sample-UDP-Support", "sample\Sample-UDP-Support\Sample-UDP-Support.csproj", "{62A1A1E7-CCF5-49D9-B462-F11EEF7F5591}" +EndProject Global GlobalSection(SolutionConfigurationPlatforms) = preSolution Debug|Any CPU = Debug|Any CPU @@ -57,6 +61,14 @@ Global {2A34EE83-FB59-4A41-8BB5-174BE678533E}.Debug|Any CPU.Build.0 = Debug|Any CPU {2A34EE83-FB59-4A41-8BB5-174BE678533E}.Release|Any CPU.ActiveCfg = Release|Any CPU {2A34EE83-FB59-4A41-8BB5-174BE678533E}.Release|Any CPU.Build.0 = Release|Any CPU + {1B138F1D-A772-4656-9142-442DEFB969E5}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {1B138F1D-A772-4656-9142-442DEFB969E5}.Debug|Any CPU.Build.0 = Debug|Any CPU + {1B138F1D-A772-4656-9142-442DEFB969E5}.Release|Any CPU.ActiveCfg = Release|Any CPU + {1B138F1D-A772-4656-9142-442DEFB969E5}.Release|Any CPU.Build.0 = Release|Any CPU + {62A1A1E7-CCF5-49D9-B462-F11EEF7F5591}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {62A1A1E7-CCF5-49D9-B462-F11EEF7F5591}.Debug|Any CPU.Build.0 = Debug|Any CPU + {62A1A1E7-CCF5-49D9-B462-F11EEF7F5591}.Release|Any CPU.ActiveCfg = Release|Any CPU + {62A1A1E7-CCF5-49D9-B462-F11EEF7F5591}.Release|Any CPU.Build.0 = Release|Any CPU EndGlobalSection GlobalSection(SolutionProperties) = preSolution HideSolutionNode = FALSE @@ -67,6 +79,8 @@ Global {DC6028D6-ED1D-4857-B5EB-28BA05E3F531} = {CD65EE64-FDA8-4ED9-A7F2-81BDD9F64C64} {F690F3E3-D9F0-441A-9E70-4F70998BDD1B} = {72DC28B9-37B5-425C-8532-5CA91D253A70} {2A34EE83-FB59-4A41-8BB5-174BE678533E} = {CD65EE64-FDA8-4ED9-A7F2-81BDD9F64C64} + {1B138F1D-A772-4656-9142-442DEFB969E5} = {75C71D21-E6FD-493F-A355-997EEF4DDF11} + {62A1A1E7-CCF5-49D9-B462-F11EEF7F5591} = {CD65EE64-FDA8-4ED9-A7F2-81BDD9F64C64} EndGlobalSection GlobalSection(ExtensibilityGlobals) = postSolution SolutionGuid = {AB0C6BCE-235A-4018-8644-7652EC826FF5} diff --git a/sample/Benchmark/Benchmark.csproj b/sample/Benchmark/Benchmark.csproj index e279cb4..f4d71e2 100644 --- a/sample/Benchmark/Benchmark.csproj +++ b/sample/Benchmark/Benchmark.csproj @@ -1,7 +1,7 @@  - net46 + net461 Sample Sample @@ -10,8 +10,8 @@ - - + + diff --git a/sample/Sample-UDP-Support/App.config b/sample/Sample-UDP-Support/App.config new file mode 100644 index 0000000..731f6de --- /dev/null +++ b/sample/Sample-UDP-Support/App.config @@ -0,0 +1,6 @@ + + + + + + \ No newline at end of file diff --git a/sample/Sample-UDP-Support/Program.cs b/sample/Sample-UDP-Support/Program.cs new file mode 100644 index 0000000..780e289 --- /dev/null +++ b/sample/Sample-UDP-Support/Program.cs @@ -0,0 +1,73 @@ +using InfluxDB.Collector; +using System; +using System.Collections.Generic; +using System.Diagnostics; +using System.IO; +using System.Threading.Tasks; + +// influx-db command line: +// start with +// # influx +// # show databases +// # CREATE DATABASE {name} +// # DROP DATABASE {name} +// # precision rfc3339 +// # use +// # SHOW MEASUREMENTS +// # SHOW MEASUREMENTS WITH MEASUREMENT =~ /v1\..*/ -- all fields from measurements that start with 'v1.' +// # SHOW SERIES +// # SHOW SERIES [FROM [WHERE ='']] +// # DROP SERIES FROM /v1.*\.end/ +// # SHOW TAG KEYS +// # SHOW TAG KEYS FROM "v1.cos" +// # SHOW FIELD KEYS +// # SHOW FIELD KEYS FROM /v1\..*\.sin/ -- all fields from series that start with 'v1.' and end with '.sin' + +/* +# influx +docker run --name influx -p 8086:8086 -p 8089:8089/udp -p 8088:8088 -v C:\Docker\Volumes\influxdb\db:/var/lib/influxdb -v C:\Docker\Volumes\influxdb\config\influxdb.conf:/etc/influxdb/influxdb.conf:ro influxdb -config /etc/influxdb/influxdb.conf +docker run -d -p 8083:8083 -p 8086:8086 -p 8089:4444/udp --expose 8083 --expose 8086 --expose 4444 -e UDP_DB="playground" tutum/influxdb + +*/ +namespace Sample +{ + public static class Program + { + public static void Main(string[] args) + { + Collect().Wait(); + + Console.ReadKey(); + } + + async static Task Collect() + { + var process = Process.GetCurrentProcess(); + + Metrics.Collector = new CollectorConfiguration() + .Tag.With("host", Environment.GetEnvironmentVariable("COMPUTERNAME")) + .Tag.With("os", Environment.GetEnvironmentVariable("OS")) + .Tag.With("process", Path.GetFileName(process.MainModule.FileName)) + .Batch.AtInterval(TimeSpan.FromSeconds(2)) + //.WriteTo.InfluxDB("http://localhost:8086", "data") + .WriteTo.InfluxDB("udp://localhost:8089", "data") + .CreateCollector(); + + while (true) + { + Metrics.Increment("iterations"); + + Metrics.Write("cpu_time", + new Dictionary + { + { "value", process.TotalProcessorTime.TotalMilliseconds }, + { "user", process.UserProcessorTime.TotalMilliseconds } + }); + + Metrics.Measure("working_set", process.WorkingSet64); + + await Task.Delay(1000); + } + } + } +} diff --git a/sample/Sample-UDP-Support/Properties/AssemblyInfo.cs b/sample/Sample-UDP-Support/Properties/AssemblyInfo.cs new file mode 100644 index 0000000..e675b47 --- /dev/null +++ b/sample/Sample-UDP-Support/Properties/AssemblyInfo.cs @@ -0,0 +1,36 @@ +using System.Reflection; +using System.Runtime.CompilerServices; +using System.Runtime.InteropServices; + +// General Information about an assembly is controlled through the following +// set of attributes. Change these attribute values to modify the information +// associated with an assembly. +[assembly: AssemblyTitle("Sample-UDP-Support")] +[assembly: AssemblyDescription("")] +[assembly: AssemblyConfiguration("")] +[assembly: AssemblyCompany("")] +[assembly: AssemblyProduct("Sample-UDP-Support")] +[assembly: AssemblyCopyright("Copyright © 2018")] +[assembly: AssemblyTrademark("")] +[assembly: AssemblyCulture("")] + +// Setting ComVisible to false makes the types in this assembly not visible +// to COM components. If you need to access a type in this assembly from +// COM, set the ComVisible attribute to true on that type. +[assembly: ComVisible(false)] + +// The following GUID is for the ID of the typelib if this project is exposed to COM +[assembly: Guid("62a1a1e7-ccf5-49d9-b462-f11eef7f5591")] + +// Version information for an assembly consists of the following four values: +// +// Major Version +// Minor Version +// Build Number +// Revision +// +// You can specify all the values or you can default the Build and Revision Numbers +// by using the '*' as shown below: +// [assembly: AssemblyVersion("1.0.*")] +[assembly: AssemblyVersion("1.0.0.0")] +[assembly: AssemblyFileVersion("1.0.0.0")] diff --git a/sample/Sample-UDP-Support/Sample-UDP-Support.csproj b/sample/Sample-UDP-Support/Sample-UDP-Support.csproj new file mode 100644 index 0000000..c4b4f3d --- /dev/null +++ b/sample/Sample-UDP-Support/Sample-UDP-Support.csproj @@ -0,0 +1,63 @@ + + + + + Debug + AnyCPU + {62A1A1E7-CCF5-49D9-B462-F11EEF7F5591} + Exe + Sample_UDP_Support + Sample-UDP-Support + v4.6.1 + 512 + true + + + AnyCPU + true + full + false + bin\Debug\ + DEBUG;TRACE + prompt + 4 + + + AnyCPU + pdbonly + true + bin\Release\ + TRACE + prompt + 4 + + + + + + + + + + + + + + + + + + + + + + {f690f3e3-d9f0-441a-9e70-4f70998bdd1b} + InfluxDB.Collector + + + {069e0ac5-a2cf-4584-89a7-f475276e244c} + InfluxDB.LineProtocol + + + + \ No newline at end of file diff --git a/sample/Sample-UDP-Support/packages.config b/sample/Sample-UDP-Support/packages.config new file mode 100644 index 0000000..2b5b5a8 --- /dev/null +++ b/sample/Sample-UDP-Support/packages.config @@ -0,0 +1,4 @@ + + + + \ No newline at end of file diff --git a/sample/Sample/Sample.csproj b/sample/Sample/Sample.csproj index 9a37e34..d713742 100644 --- a/sample/Sample/Sample.csproj +++ b/sample/Sample/Sample.csproj @@ -1,7 +1,7 @@  - net452;netcoreapp1.0 + net461;netcoreapp2.0 Sample Sample diff --git a/src/InfluxDB.Collector/Configuration/CollectorBatchConfiguration.cs b/src/InfluxDB.Collector/Configuration/CollectorBatchConfiguration.cs index b40f113..f662d42 100644 --- a/src/InfluxDB.Collector/Configuration/CollectorBatchConfiguration.cs +++ b/src/InfluxDB.Collector/Configuration/CollectorBatchConfiguration.cs @@ -4,6 +4,8 @@ namespace InfluxDB.Collector.Configuration { public abstract class CollectorBatchConfiguration { - public abstract CollectorConfiguration AtInterval(TimeSpan interval); + public CollectorConfiguration AtInterval(TimeSpan interval) => AtInterval(interval, 5000); + + public abstract CollectorConfiguration AtInterval(TimeSpan interval, int? maxBatchSize); } -} +} \ No newline at end of file diff --git a/src/InfluxDB.Collector/Configuration/PipelinedCollectorBatchConfiguration.cs b/src/InfluxDB.Collector/Configuration/PipelinedCollectorBatchConfiguration.cs index ffbd60f..00eca5f 100644 --- a/src/InfluxDB.Collector/Configuration/PipelinedCollectorBatchConfiguration.cs +++ b/src/InfluxDB.Collector/Configuration/PipelinedCollectorBatchConfiguration.cs @@ -8,6 +8,7 @@ class PipelinedCollectorBatchConfiguration : CollectorBatchConfiguration { readonly CollectorConfiguration _configuration; TimeSpan? _interval; + int? _maxBatchSize; public PipelinedCollectorBatchConfiguration(CollectorConfiguration configuration) { @@ -15,9 +16,10 @@ public PipelinedCollectorBatchConfiguration(CollectorConfiguration configuration _configuration = configuration; } - public override CollectorConfiguration AtInterval(TimeSpan interval) + public override CollectorConfiguration AtInterval(TimeSpan interval, int? maxBatchSize) { _interval = interval; + _maxBatchSize = maxBatchSize; return _configuration; } @@ -29,9 +31,9 @@ public IPointEmitter CreateEmitter(IPointEmitter parent, out Action dispose) return parent; } - var batcher = new IntervalBatcher(_interval.Value, parent); + var batcher = new IntervalBatcher(_interval.Value, _maxBatchSize, parent); dispose = batcher.Dispose; return batcher; } } -} +} \ No newline at end of file diff --git a/src/InfluxDB.Collector/Configuration/PipelinedCollectorEmitConfiguration.cs b/src/InfluxDB.Collector/Configuration/PipelinedCollectorEmitConfiguration.cs index 1a72208..81666b1 100644 --- a/src/InfluxDB.Collector/Configuration/PipelinedCollectorEmitConfiguration.cs +++ b/src/InfluxDB.Collector/Configuration/PipelinedCollectorEmitConfiguration.cs @@ -11,9 +11,10 @@ class PipelinedCollectorEmitConfiguration : CollectorEmitConfiguration { readonly CollectorConfiguration _configuration; readonly List> _emitters = new List>(); - LineProtocolClient _client; + private ILineProtocolClient _client; - public PipelinedCollectorEmitConfiguration(CollectorConfiguration configuration) + public PipelinedCollectorEmitConfiguration( + CollectorConfiguration configuration) { if (configuration == null) throw new ArgumentNullException(nameof(configuration)); _configuration = configuration; @@ -21,7 +22,10 @@ public PipelinedCollectorEmitConfiguration(CollectorConfiguration configuration) public override CollectorConfiguration InfluxDB(Uri serverBaseAddress, string database, string username = null, string password = null) { - _client = new LineProtocolClient(serverBaseAddress, database, username, password); + if (string.Compare(serverBaseAddress.Scheme, "udp", ignoreCase: true) == 0) + _client = new LineProtocolUdpClient(serverBaseAddress, database, username, password); + else + _client = new LineProtocolClient(serverBaseAddress, database, username, password); return _configuration; } diff --git a/src/InfluxDB.Collector/InfluxDB.Collector.csproj b/src/InfluxDB.Collector/InfluxDB.Collector.csproj index ad066a8..e902178 100644 --- a/src/InfluxDB.Collector/InfluxDB.Collector.csproj +++ b/src/InfluxDB.Collector/InfluxDB.Collector.csproj @@ -3,10 +3,10 @@ A minimal metrics collection API for InfluxDB influxdb-csharp Contributors - net45;netstandard1.3 + netstandard2.0 true InfluxDB.Collector - 1.1.0 + 1.1.1 InfluxDB.Collector influxdb https://raw.githubusercontent.com/influxdata/influxdb-csharp/master/asset/influxdata.jpg @@ -17,22 +17,15 @@ + - - - - - - - - - - - - - - - + + + + + + + diff --git a/src/InfluxDB.Collector/Pipeline/Batch/IntervalBatcher.cs b/src/InfluxDB.Collector/Pipeline/Batch/IntervalBatcher.cs index c7ccfe5..521ca6f 100644 --- a/src/InfluxDB.Collector/Pipeline/Batch/IntervalBatcher.cs +++ b/src/InfluxDB.Collector/Pipeline/Batch/IntervalBatcher.cs @@ -1,8 +1,10 @@ using System; using System.Collections.Generic; +using System.Linq; using System.Threading.Tasks; using InfluxDB.Collector.Diagnostics; using InfluxDB.Collector.Platform; +using InfluxDB.Collector.Util; namespace InfluxDB.Collector.Pipeline.Batch { @@ -12,6 +14,7 @@ class IntervalBatcher : IPointEmitter, IDisposable Queue _queue = new Queue(); readonly TimeSpan _interval; + readonly int? _maxBatchSize; readonly IPointEmitter _parent; readonly object _stateLock = new object(); @@ -19,10 +22,11 @@ class IntervalBatcher : IPointEmitter, IDisposable bool _unloading; bool _started; - public IntervalBatcher(TimeSpan interval, IPointEmitter parent) + public IntervalBatcher(TimeSpan interval, int? maxBatchSize, IPointEmitter parent) { _parent = parent; _interval = interval; + _maxBatchSize = maxBatchSize; _timer = new PortableTimer(cancel => OnTick()); } @@ -60,7 +64,17 @@ Task OnTick() _queue = new Queue(); } - _parent.Emit(batch.ToArray()); + if (_maxBatchSize == null || batch.Count <= _maxBatchSize.Value) + { + _parent.Emit(batch.ToArray()); + } + else + { + foreach (var chunk in batch.Batch(_maxBatchSize.Value)) + { + _parent.Emit(chunk.ToArray()); + } + } } catch (Exception ex) { diff --git a/src/InfluxDB.Collector/Pipeline/Emit/HttpLineProtocolEmitter.cs b/src/InfluxDB.Collector/Pipeline/Emit/HttpLineProtocolEmitter.cs index 78988b9..529cb42 100644 --- a/src/InfluxDB.Collector/Pipeline/Emit/HttpLineProtocolEmitter.cs +++ b/src/InfluxDB.Collector/Pipeline/Emit/HttpLineProtocolEmitter.cs @@ -7,9 +7,9 @@ namespace InfluxDB.Collector.Pipeline.Emit { class HttpLineProtocolEmitter : IDisposable, IPointEmitter { - readonly LineProtocolClient _client; + readonly ILineProtocolClient _client; - public HttpLineProtocolEmitter(LineProtocolClient client) + public HttpLineProtocolEmitter(ILineProtocolClient client) { if (client == null) throw new ArgumentNullException(nameof(client)); _client = client; diff --git a/src/InfluxDB.Collector/Util/EnumerableExtensions.cs b/src/InfluxDB.Collector/Util/EnumerableExtensions.cs new file mode 100644 index 0000000..aab0236 --- /dev/null +++ b/src/InfluxDB.Collector/Util/EnumerableExtensions.cs @@ -0,0 +1,55 @@ +using System.Collections.Generic; +using System.Linq; + +namespace InfluxDB.Collector.Util +{ + internal static class EnumerableExtensions + { + // Copied from https://github.com/morelinq/MoreLINQ/blob/master/MoreLinq/Batch.cs + // Original license below + // + // MoreLINQ - Extensions to LINQ to Objects + // Copyright (c) 2009 Atif Aziz. All rights reserved. + // + // Licensed under the Apache License, Version 2.0 (the "License"); + // you may not use this file except in compliance with the License. + // You may obtain a copy of the License at + // + // http://www.apache.org/licenses/LICENSE-2.0 + // + // Unless required by applicable law or agreed to in writing, software + // distributed under the License is distributed on an "AS IS" BASIS, + // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + // See the License for the specific language governing permissions and + // limitations under the License. + public static IEnumerable> Batch(this IEnumerable source, int size) + { + TSource[] bucket = null; + var count = 0; + + foreach (var item in source) + { + if (bucket == null) + { + bucket = new TSource[size]; + } + + bucket[count++] = item; + if (count != size) + { + continue; + } + + yield return bucket; + + bucket = null; + count = 0; + } + + if (bucket != null && count > 0) + { + yield return bucket.Take(count); + } + } + } +} \ No newline at end of file diff --git a/src/InfluxDB.LineProtocol/Client/ILineProtocolClient.cs b/src/InfluxDB.LineProtocol/Client/ILineProtocolClient.cs new file mode 100644 index 0000000..65dad59 --- /dev/null +++ b/src/InfluxDB.LineProtocol/Client/ILineProtocolClient.cs @@ -0,0 +1,16 @@ +using System.Threading; +using System.Threading.Tasks; +using InfluxDB.LineProtocol.Payload; + +namespace InfluxDB.LineProtocol.Client +{ + public interface ILineProtocolClient + { + Task SendAsync( + LineProtocolWriter lineProtocolWriter, + CancellationToken cancellationToken = default(CancellationToken)); + Task WriteAsync( + LineProtocolPayload payload, + CancellationToken cancellationToken = default(CancellationToken)); + } +} \ No newline at end of file diff --git a/src/InfluxDB.LineProtocol/Client/LineProtocolClient.cs b/src/InfluxDB.LineProtocol/Client/LineProtocolClient.cs index 8737141..279962c 100644 --- a/src/InfluxDB.LineProtocol/Client/LineProtocolClient.cs +++ b/src/InfluxDB.LineProtocol/Client/LineProtocolClient.cs @@ -1,50 +1,45 @@ using InfluxDB.LineProtocol.Payload; using System; using System.IO; +using System.Net; using System.Net.Http; +using System.Net.Sockets; using System.Text; using System.Threading; using System.Threading.Tasks; namespace InfluxDB.LineProtocol.Client { - public class LineProtocolClient + public class LineProtocolClient : LineProtocolClientBase { - readonly HttpClient _httpClient; - readonly string _database, _username, _password; + private readonly HttpClient _httpClient; public LineProtocolClient(Uri serverBaseAddress, string database, string username = null, string password = null) : this(new HttpClientHandler(), serverBaseAddress, database, username, password) { } - protected LineProtocolClient(HttpMessageHandler handler, Uri serverBaseAddress, string database, string username, string password) + protected LineProtocolClient( + HttpMessageHandler handler, + Uri serverBaseAddress, + string database, + string username, + string password) + :base(serverBaseAddress, database, username, password) { - if (serverBaseAddress == null) throw new ArgumentNullException(nameof(serverBaseAddress)); - if (string.IsNullOrEmpty(database)) throw new ArgumentException("A database must be specified"); + if (serverBaseAddress == null) + throw new ArgumentNullException(nameof(serverBaseAddress)); + if (string.IsNullOrEmpty(database)) + throw new ArgumentException("A database must be specified"); // Overload that allows injecting handler is protected to avoid HttpMessageHandler being part of our public api which would force clients to reference System.Net.Http when using the lib. _httpClient = new HttpClient(handler) { BaseAddress = serverBaseAddress }; - _database = database; - _username = username; - _password = password; } - public Task WriteAsync(LineProtocolPayload payload, CancellationToken cancellationToken = default(CancellationToken)) - { - var stringWriter = new StringWriter(); - - payload.Format(stringWriter); - - return SendAsync(stringWriter.ToString(), Precision.Nanoseconds, cancellationToken); - } - - public Task SendAsync(LineProtocolWriter lineProtocolWriter, CancellationToken cancellationToken = default(CancellationToken)) - { - return SendAsync(lineProtocolWriter.ToString(), lineProtocolWriter.Precision, cancellationToken); - } - - private async Task SendAsync(string payload, Precision precision, CancellationToken cancellationToken = default(CancellationToken)) + protected override async Task OnSendAsync( + string payload, + Precision precision, + CancellationToken cancellationToken = default(CancellationToken)) { var endpoint = $"write?db={Uri.EscapeDataString(_database)}"; if (!string.IsNullOrEmpty(_username)) diff --git a/src/InfluxDB.LineProtocol/Client/LineProtocolClientBase.cs b/src/InfluxDB.LineProtocol/Client/LineProtocolClientBase.cs new file mode 100644 index 0000000..d7bb5ca --- /dev/null +++ b/src/InfluxDB.LineProtocol/Client/LineProtocolClientBase.cs @@ -0,0 +1,49 @@ +using InfluxDB.LineProtocol.Payload; +using System; +using System.IO; +using System.Net; +using System.Net.Http; +using System.Net.Sockets; +using System.Text; +using System.Threading; +using System.Threading.Tasks; + +namespace InfluxDB.LineProtocol.Client +{ + public abstract class LineProtocolClientBase : ILineProtocolClient + { + protected readonly string _database, _username, _password; + + protected LineProtocolClientBase(Uri serverBaseAddress, string database, string username, string password) + { + if (serverBaseAddress == null) + throw new ArgumentNullException(nameof(serverBaseAddress)); + if (string.IsNullOrEmpty(database)) + throw new ArgumentException("A database must be specified"); + + // Overload that allows injecting handler is protected to avoid HttpMessageHandler being part of our public api which would force clients to reference System.Net.Http when using the lib. + _database = database; + _username = username; + _password = password; + } + + public Task WriteAsync(LineProtocolPayload payload, CancellationToken cancellationToken = default(CancellationToken)) + { + var stringWriter = new StringWriter(); + + payload.Format(stringWriter); + + return OnSendAsync(stringWriter.ToString(), Precision.Nanoseconds, cancellationToken); + } + + public Task SendAsync(LineProtocolWriter lineProtocolWriter, CancellationToken cancellationToken = default(CancellationToken)) + { + return OnSendAsync(lineProtocolWriter.ToString(), lineProtocolWriter.Precision, cancellationToken); + } + + protected abstract Task OnSendAsync( + string payload, + Precision precision, + CancellationToken cancellationToken = default(CancellationToken)); + } +} diff --git a/src/InfluxDB.LineProtocol/Client/LineProtocolUdpClient.cs b/src/InfluxDB.LineProtocol/Client/LineProtocolUdpClient.cs new file mode 100644 index 0000000..e76be29 --- /dev/null +++ b/src/InfluxDB.LineProtocol/Client/LineProtocolUdpClient.cs @@ -0,0 +1,46 @@ +using InfluxDB.LineProtocol.Payload; +using System; +using System.IO; +using System.Net; +using System.Net.Http; +using System.Net.Sockets; +using System.Text; +using System.Threading; +using System.Threading.Tasks; + +namespace InfluxDB.LineProtocol.Client +{ + public class LineProtocolUdpClient : LineProtocolClientBase + { + private readonly UdpClient _udpClient; + private readonly string _udpHostName; + private readonly int _udpPort; + + public LineProtocolUdpClient( + Uri serverBaseAddress, + string database, + string username = null, + string password = null) + :base(serverBaseAddress, database, username, password) + { + if (serverBaseAddress == null) + throw new ArgumentNullException(nameof(serverBaseAddress)); + if (string.IsNullOrEmpty(database)) + throw new ArgumentException("A database must be specified"); + + _udpHostName = serverBaseAddress.Host; + _udpPort = serverBaseAddress.Port; + _udpClient = new UdpClient(); + } + + protected override async Task OnSendAsync( + string payload, + Precision precision, + CancellationToken cancellationToken = default(CancellationToken)) + { + var buffer = Encoding.UTF8.GetBytes(payload); + int len = await _udpClient.SendAsync(buffer, buffer.Length, _udpHostName, _udpPort); + return new LineProtocolWriteResult(len == buffer.Length, null); + } + } +} diff --git a/src/InfluxDB.LineProtocol/InfluxDB.LineProtocol.csproj b/src/InfluxDB.LineProtocol/InfluxDB.LineProtocol.csproj index 91eeda3..5b7facb 100644 --- a/src/InfluxDB.LineProtocol/InfluxDB.LineProtocol.csproj +++ b/src/InfluxDB.LineProtocol/InfluxDB.LineProtocol.csproj @@ -3,8 +3,8 @@ A .NET library for efficiently sending time series to InfluxDB influxdb-csharp Contributors - net45;netstandard1.3 - 1.1.0 + netstandard2.0 + 1.1.1 true InfluxDB.LineProtocol InfluxDB.LineProtocol @@ -16,17 +16,12 @@ true - - - - - - - - - - - - + + + + + + + diff --git a/test/Consoles/InfluxDb.UdpSupport.ConsoleTest/App.config b/test/Consoles/InfluxDb.UdpSupport.ConsoleTest/App.config new file mode 100644 index 0000000..016d28f --- /dev/null +++ b/test/Consoles/InfluxDb.UdpSupport.ConsoleTest/App.config @@ -0,0 +1,6 @@ + + + + + + \ No newline at end of file diff --git a/test/Consoles/InfluxDb.UdpSupport.ConsoleTest/InfluxDb.UdpSupport.ConsoleTest.csproj b/test/Consoles/InfluxDb.UdpSupport.ConsoleTest/InfluxDb.UdpSupport.ConsoleTest.csproj new file mode 100644 index 0000000..6a88d2a --- /dev/null +++ b/test/Consoles/InfluxDb.UdpSupport.ConsoleTest/InfluxDb.UdpSupport.ConsoleTest.csproj @@ -0,0 +1,62 @@ + + + + + Debug + AnyCPU + {1B138F1D-A772-4656-9142-442DEFB969E5} + Exe + InfluxDb.UdpSupport.ConsoleTest + InfluxDb.UdpSupport.ConsoleTest + v4.7 + 512 + true + + + AnyCPU + true + full + false + bin\Debug\ + DEBUG;TRACE + prompt + 4 + + + AnyCPU + pdbonly + true + bin\Release\ + TRACE + prompt + 4 + + + + + + + + + + + + + + + + + + + + + {f690f3e3-d9f0-441a-9e70-4f70998bdd1b} + InfluxDB.Collector + + + {069e0ac5-a2cf-4584-89a7-f475276e244c} + InfluxDB.LineProtocol + + + + \ No newline at end of file diff --git a/test/Consoles/InfluxDb.UdpSupport.ConsoleTest/Program.cs b/test/Consoles/InfluxDb.UdpSupport.ConsoleTest/Program.cs new file mode 100644 index 0000000..c2d0f85 --- /dev/null +++ b/test/Consoles/InfluxDb.UdpSupport.ConsoleTest/Program.cs @@ -0,0 +1,89 @@ +using InfluxDB.Collector; +using System; +using System.Collections.Generic; +using System.Diagnostics; +using System.IO; +using System.Linq; +using System.Net; +using System.Net.Sockets; +using System.Text; +using System.Threading; +using System.Threading.Tasks; + +// influx-db command line: +// start with +// # influx +// # show databases +// # CREATE DATABASE {name} +// # DROP DATABASE {name} +// # precision rfc3339 +// # use +// # SHOW MEASUREMENTS +// # SHOW MEASUREMENTS WITH MEASUREMENT =~ /v1\..*/ -- all fields from measurements that start with 'v1.' +// # SHOW SERIES +// # SHOW SERIES [FROM [WHERE ='']] +// # DROP SERIES FROM /v1.*\.end/ +// # SHOW TAG KEYS +// # SHOW TAG KEYS FROM "v1.cos" +// # SHOW FIELD KEYS +// # SHOW FIELD KEYS FROM /v1\..*\.sin/ -- all fields from series that start with 'v1.' and end with '.sin' + +/* +# influx +docker run --name influx -p 8086:8086 -p 8089:8089/udp -p 8088:8088 -v C:\Docker\Volumes\influxdb\db:/var/lib/influxdb -v C:\Docker\Volumes\influxdb\config\influxdb.conf:/etc/influxdb/influxdb.conf:ro influxdb -config /etc/influxdb/influxdb.conf +docker run -d -p 8083:8083 -p 8086:8086 -p 8089:4444/udp --expose 8083 --expose 8086 --expose 4444 -e UDP_DB="playground" tutum/influxdb + +*/ + +namespace InfluxDb.UdpSupport.ConsoleTest +{ + class Program + { + static void Main(string[] args) + { + Task t = ListenAsync(); + var process = Process.GetCurrentProcess(); + + Metrics.Collector = new CollectorConfiguration() + .Tag.With("process", Path.GetFileName(process.Id.ToString())) + .Batch.AtInterval(TimeSpan.FromSeconds(2)) + .WriteTo.InfluxDB("udp://localhost:8089", "data") + .CreateCollector(); + + int i = 0; + while (true) + { + Metrics.Collector.Increment("test", i++ % 10); + Thread.Sleep(500); + } + } + + private static async Task ListenAsync() + { + await Task.Delay(1); + var udpClient = new UdpClient(8999); + while (true) + { + try + { + UdpReceiveResult result = await udpClient.ReceiveAsync(); + Byte[] receiveBytes = result.Buffer; + string returnData = Encoding.UTF8.GetString(receiveBytes); + IPEndPoint remoteIpEndPoint = result.RemoteEndPoint; + + // Uses the IPEndPoint object to determine which of these two hosts responded. + Console.WriteLine($"This message was sent from {remoteIpEndPoint.Address} on their port number {remoteIpEndPoint.Port}"); + Console.WriteLine("This is the message you received:"); + Console.ForegroundColor = ConsoleColor.Yellow; + Console.WriteLine(returnData); + Console.ResetColor(); + Console.WriteLine("-----------------------------------------------------------------"); + } + catch (Exception e) + { + Console.WriteLine(e.ToString()); + } + } + } + } +} diff --git a/test/Consoles/InfluxDb.UdpSupport.ConsoleTest/Properties/AssemblyInfo.cs b/test/Consoles/InfluxDb.UdpSupport.ConsoleTest/Properties/AssemblyInfo.cs new file mode 100644 index 0000000..430ca65 --- /dev/null +++ b/test/Consoles/InfluxDb.UdpSupport.ConsoleTest/Properties/AssemblyInfo.cs @@ -0,0 +1,36 @@ +using System.Reflection; +using System.Runtime.CompilerServices; +using System.Runtime.InteropServices; + +// General Information about an assembly is controlled through the following +// set of attributes. Change these attribute values to modify the information +// associated with an assembly. +[assembly: AssemblyTitle("InfluxDb.UdpSupport.ConsoleTest")] +[assembly: AssemblyDescription("")] +[assembly: AssemblyConfiguration("")] +[assembly: AssemblyCompany("")] +[assembly: AssemblyProduct("InfluxDb.UdpSupport.ConsoleTest")] +[assembly: AssemblyCopyright("Copyright © 2018")] +[assembly: AssemblyTrademark("")] +[assembly: AssemblyCulture("")] + +// Setting ComVisible to false makes the types in this assembly not visible +// to COM components. If you need to access a type in this assembly from +// COM, set the ComVisible attribute to true on that type. +[assembly: ComVisible(false)] + +// The following GUID is for the ID of the typelib if this project is exposed to COM +[assembly: Guid("1b138f1d-a772-4656-9142-442defb969e5")] + +// Version information for an assembly consists of the following four values: +// +// Major Version +// Minor Version +// Build Number +// Revision +// +// You can specify all the values or you can default the Build and Revision Numbers +// by using the '*' as shown below: +// [assembly: AssemblyVersion("1.0.*")] +[assembly: AssemblyVersion("1.0.0.0")] +[assembly: AssemblyFileVersion("1.0.0.0")] diff --git a/test/InfluxDB.LineProtocol.Tests/InfluxDB.LineProtocol.Tests.csproj b/test/InfluxDB.LineProtocol.Tests/InfluxDB.LineProtocol.Tests.csproj index 37f87dc..44d713c 100644 --- a/test/InfluxDB.LineProtocol.Tests/InfluxDB.LineProtocol.Tests.csproj +++ b/test/InfluxDB.LineProtocol.Tests/InfluxDB.LineProtocol.Tests.csproj @@ -1,12 +1,10 @@  - net452;netcoreapp1.0 + netcoreapp2.0 InfluxDB.LineProtocol.Tests InfluxDB.LineProtocol.Tests - true - $(PackageTargetFallback);dnxcore50;portable-net45+win8 - 1.0.4 + 2.0.0 @@ -15,11 +13,11 @@ - - - - - + + + + +