From 8d2c0b23721a32c8d1ab79c68c02e4ea8f2feeff Mon Sep 17 00:00:00 2001
From: Niko Ehrenfeuchter <nikolaus.ehrenfeuchter@unibas.ch>
Date: Sat, 21 Jul 2018 00:29:14 +0200
Subject: [PATCH] Implement I/O queue length performance monitoring

Refers to #35
---
 ATxCommon/ATxCommon.csproj               |   1 +
 ATxCommon/Monitoring/PhysicalDisk.cs     | 213 +++++++++++++++++++++++
 ATxCommon/Serializables/ServiceConfig.cs |  10 ++
 ATxService/AutoTx.cs                     |  39 ++++-
 Resources/conf-minimal/config.common.xml |   5 +
 Resources/conf/config.common.xml         |   5 +
 6 files changed, 268 insertions(+), 5 deletions(-)
 create mode 100644 ATxCommon/Monitoring/PhysicalDisk.cs

diff --git a/ATxCommon/ATxCommon.csproj b/ATxCommon/ATxCommon.csproj
index d281301..c006aed 100644
--- a/ATxCommon/ATxCommon.csproj
+++ b/ATxCommon/ATxCommon.csproj
@@ -48,6 +48,7 @@
     <Compile Include="Conv.cs" />
     <Compile Include="FsUtils.cs" />
     <Compile Include="Monitoring\Cpu.cs" />
+    <Compile Include="Monitoring\PhysicalDisk.cs" />
     <Compile Include="NLog\RateLimitWrapper.cs" />
     <Compile Include="Serializables\DriveToCheck.cs" />
     <Compile Include="Serializables\ServiceConfig.cs" />
diff --git a/ATxCommon/Monitoring/PhysicalDisk.cs b/ATxCommon/Monitoring/PhysicalDisk.cs
new file mode 100644
index 0000000..3ca9317
--- /dev/null
+++ b/ATxCommon/Monitoring/PhysicalDisk.cs
@@ -0,0 +1,213 @@
+using System;
+using System.Diagnostics;
+using System.Linq;
+// using System.Threading;
+using System.Timers;
+using NLog;
+using Timer = System.Timers.Timer;
+
+namespace ATxCommon.Monitoring
+{
+    /// <summary>
+    /// Load monitoring class for physical disks, constantly checking the queue length at the
+    /// given <see cref="Interval"/> in a separate (timer-based) thread.
+    /// 
+    /// The load (=queue length) is determined using a <see cref="PerformanceCounter"/>, and is
+    /// compared against a configurable <see cref="Limit"/>. If the load changes from below the
+    /// limit to above, a <see cref="LoadAboveLimit"/> event will be raised. If the load has been
+    /// above the limit and is then dropping below, an <see cref="OnLoadBelowLimit"/> event will
+    /// be raised as soon as a given number of consecutive load measurements (defined via <see
+    /// cref="Probation"/>) were found to be below the limit.
+    /// </summary>
+    public class PhysicalDisk
+    {
+        private static readonly Logger Log = LogManager.GetCurrentClassLogger();
+
+        /// <summary>
+        /// The generic event handler delegate for PhysicalDisk events.
+        /// </summary>
+        public delegate void EventHandler(object sender, EventArgs e);
+        
+        /// <summary>
+        /// Event raised when the PhysicalDisk load exceeds the limit for any measurement.
+        /// </summary>
+        public event EventHandler LoadAboveLimit;
+
+        /// <summary>
+        /// Event raised when the PhysicalDisk load is below the configured limit for at least
+        /// the number of consecutive measurements configured in <see cref="Probation"/> after
+        /// having exceeded this limit before.
+        /// </summary>
+        public event EventHandler LoadBelowLimit;
+
+        private readonly Timer _monitoringTimer;
+        private readonly PerformanceCounter _diskQueueLength;
+        private readonly float[] _loadReadings = {0F, 0F, 0F, 0F};
+
+        private int _interval;
+        private int _behaving;
+        private int _probation;
+
+        private float _limit;
+
+
+        /// <summary>
+        /// Current PhysicalDisk Queue Length, averaged of the last four readings.
+        /// </summary>
+        /// <returns>The average PhysicalDisk Queue Length from the last four readings.</returns>
+        public float Load { get; private set; }
+
+        /// <summary>
+        /// Flag representing whether the load is considered to be high or low.
+        /// </summary>
+        public bool HighLoad { get; private set; }
+
+        /// <summary>
+        /// How often (in ms) to check the PhysicalDisk Queue Length.
+        /// </summary>
+        public int Interval {
+            get => _interval;
+            set {
+                _interval = value;
+                _monitoringTimer.Interval = value;
+                Log.Debug("PhysicalDisk Queue Length monitoring interval: {0}", _interval);
+            }
+        }
+
+        /// <summary>
+        /// Upper limit of PhysicalDisk load before it is classified as "high".
+        /// </summary>
+        public float Limit {
+            get => _limit;
+            set {
+                _limit = value;
+                Log.Debug("PhysicalDisk monitoring limit: {0:0.000}", _limit);
+            }
+        }
+
+        /// <summary>
+        /// Number of cycles where the PhysicalDisk load value has to be below the limit before it is
+        /// classified as "low" again.
+        /// </summary>
+        public int Probation {
+            get => _probation;
+            set {
+                _probation = value;
+                Log.Debug("PhysicalDisk monitoring probation cycles when violating limit: {0}", _probation);
+            }
+        }
+        
+        /// <summary>
+        /// Indicating whether the PhysicalDisk load monitoring is active.
+        /// </summary>
+        public bool Enabled {
+            get => _monitoringTimer.Enabled;
+            set {
+                Log.Debug("{0} PhysicalDisk monitoring.", value ? "Enabling" : "Disabling");
+                _monitoringTimer.Enabled = value;
+            }
+        }
+
+
+
+        /// <summary>
+        /// Create performance counter and initialize it.
+        /// </summary>
+        /// <param name="counterName">The counter to use for the monitoring, default is the
+        /// overall "Avg. Disk Queue Length", other reasonable options are the corresponding read
+        /// or write queues ("Avg. Disk Read Queue Length" and "Avg. Disk Write Queue Length").
+        /// </param>
+        public PhysicalDisk(string counterName = "Avg. Disk Queue Length") {
+            _interval = 250;
+            _limit = 0.5F;
+            _probation = 40;
+            Log.Info($"Initializing PhysicalDisk performance monitoring for [{counterName}]...");
+            try {
+                Log.Debug("PhysicalDisk monitoring initializing PerformanceCounter...");
+                _diskQueueLength = new PerformanceCounter("PhysicalDisk", counterName, "_Total");
+                var curLoad = _diskQueueLength.NextValue();
+                Log.Debug("PhysicalDisk Queue Length initial value: {0:0.000}", curLoad);
+                /* this initialization doesn't seem to be necessary for PhysicalDisk, so we just
+                 * disable those calls for now:
+                Thread.Sleep(1000);
+                curLoad = _diskQueueLength.NextValue();
+                Log.Debug("PhysicalDisk monitoring current queue length: {0:0.000}", curLoad);
+                 */
+                // now initialize the load state:
+                HighLoad = curLoad > _limit;
+                _monitoringTimer = new Timer(_interval);
+                _monitoringTimer.Elapsed += UpdatePhysicalDiskLoad;
+            }
+            catch (Exception) {
+                Log.Error("Initializing PhysicalDisk monitoring failed!");
+                throw;
+            }
+
+            Log.Debug("Initializing PhysicalDisk monitoring completed.");
+        }
+
+        /// <summary>
+        /// Check current PhysicalDisk queue length, update the history of readings and trigger
+        /// the corresponding events if the required criteria are met.
+        /// </summary>
+        private void UpdatePhysicalDiskLoad(object sender, ElapsedEventArgs e) {
+            _monitoringTimer.Enabled = false;
+            try {
+                // ConstrainedCopy seems to be the most efficient approach to shift the array:
+                Array.ConstrainedCopy(_loadReadings, 1, _loadReadings, 0, 3);
+                _loadReadings[3] = _diskQueueLength.NextValue();
+                Load = _loadReadings.Average();
+                if (_loadReadings[3] > _limit) {
+                    if (_behaving > _probation) {
+                        // this means the load was considered as "low" before, so raise an event:
+                        OnLoadAboveLimit();
+                        Log.Trace("PhysicalDisk Queue Length ({0:0.00}) violating limit ({1})!",
+                            _loadReadings[3], _limit);
+                    } else if (_behaving > 0) {
+                        // this means we were still in probation, so no need to trigger again...
+                        Log.Trace("PhysicalDisk: resetting behaving counter (was {0}).", _behaving);
+                    }
+                    _behaving = 0;
+                } else {
+                    _behaving++;
+                    if (_behaving == _probation) {
+                        Log.Trace("PhysicalDisk Queue Length below limit for {0} cycles, " +
+                                  "passing probation!", _probation);
+                        OnLoadBelowLimit();
+                    } else if (_behaving > _probation) {
+                        Log.Trace("PhysicalDisk Queue Length behaving well since {0} cycles.",
+                            _behaving);
+                    } else if (_behaving < 0) {
+                        Log.Info("PhysicalDisk Queue Length: integer wrap around happened, " +
+                                 "resetting probation counter (no reason to worry).");
+                        _behaving = _probation + 1;
+                    }
+                }
+            }
+            catch (Exception ex) {
+                Log.Error("UpdatePhysicalDiskLoad failed: {0}", ex.Message);
+            }
+            finally {
+                _monitoringTimer.Enabled = true;
+            }
+            Log.Trace("PhysicalDisk Queue Length: {0:0.000} {1}", _loadReadings[3],
+                _loadReadings[3] < Limit ? " [" + _behaving + "]" : "");
+        }
+
+        /// <summary>
+        /// Raise the "LoadAboveLimit" event.
+        /// </summary>
+        protected virtual void OnLoadAboveLimit() {
+            HighLoad = true;
+            LoadAboveLimit?.Invoke(this, EventArgs.Empty);
+        }
+
+        /// <summary>
+        /// Raise the "LoadBelowLimit" event.
+        /// </summary>
+        protected virtual void OnLoadBelowLimit() {
+            HighLoad = false;
+            LoadBelowLimit?.Invoke(this, EventArgs.Empty);
+        }
+    }
+}
\ No newline at end of file
diff --git a/ATxCommon/Serializables/ServiceConfig.cs b/ATxCommon/Serializables/ServiceConfig.cs
index 192fc32..5c3a480 100644
--- a/ATxCommon/Serializables/ServiceConfig.cs
+++ b/ATxCommon/Serializables/ServiceConfig.cs
@@ -63,6 +63,13 @@ namespace ATxCommon.Serializables
         /// </summary>
         public int MaxCpuUsage { get; set; }
 
+        /// <summary>
+        /// Maximum length of the disk queue multiplied by 1000 (so a value of "25" here means the
+        /// queue length is required to be "0.025" or less). Running transfers will be paused if
+        /// this limit is exceeded.
+        /// </summary>
+        public int MaxDiskQueue { get; set; }
+
         /// <summary>
         /// Minimum amount of free RAM (in MB) required for the service to operate.
         /// </summary>
@@ -405,6 +412,7 @@ namespace ATxCommon.Serializables
 
             errmsg += CheckMinValue(c.ServiceTimer, nameof(c.ServiceTimer), 1000);
             errmsg += CheckMinValue(c.MaxCpuUsage, nameof(c.MaxCpuUsage), 5);
+            errmsg += CheckMinValue(c.MaxDiskQueue, nameof(c.MaxDiskQueue), 1);
             errmsg += CheckMinValue(c.MinAvailableMemory, nameof(c.MinAvailableMemory), 256);
 
             // if any of the required parameter checks failed we terminate now as many of the
@@ -454,6 +462,7 @@ namespace ATxCommon.Serializables
 
             WarnOnHighValue(c.ServiceTimer, nameof(c.ServiceTimer), 10000);
             WarnOnHighValue(c.MaxCpuUsage, nameof(c.MaxCpuUsage), 75);
+            WarnOnHighValue(c.MaxDiskQueue, nameof(c.MaxDiskQueue), 2000);
             WarnOnHighValue(c.MinAvailableMemory, nameof(c.MinAvailableMemory), 8192);
             WarnOnHighValue(c.AdminNotificationDelta, nameof(c.AdminNotificationDelta), 1440);
             WarnOnHighValue(c.GraceNotificationDelta, nameof(c.GraceNotificationDelta), 10080);
@@ -493,6 +502,7 @@ namespace ATxCommon.Serializables
                 $"DestinationDirectory: {DestinationDirectory}\n" +
                 $"TmpTransferDir: {TmpTransferDir}\n" +
                 $"MaxCpuUsage: {MaxCpuUsage}%\n" +
+                $"MaxDiskQueue: {MaxDiskQueue} / 1000 (effectively {MaxDiskQueue/1000})\n" +
                 $"MinAvailableMemory: {MinAvailableMemory} MB\n" +
                 "\n" +
                 "############### OPTIONAL PARAMETERS ###############\n" +
diff --git a/ATxService/AutoTx.cs b/ATxService/AutoTx.cs
index 770e354..86174e6 100644
--- a/ATxService/AutoTx.cs
+++ b/ATxService/AutoTx.cs
@@ -45,6 +45,8 @@ namespace ATxService
         /// </summary>
         private readonly Cpu _cpu;
 
+        private readonly PhysicalDisk _phyDisk;
+
         private RoboCommand _roboCommand;
         
         /// <summary>
@@ -62,6 +64,8 @@ namespace ATxService
         /// </summary>
         private int _waitCyclesBeforeNextTx;
 
+        private int _exceedingLoadLimit = 0;
+
         private DateTime _lastUserDirCheck = DateTime.MinValue;
 
         // the transfer state:
@@ -131,6 +135,21 @@ namespace ATxService
                 throw;
             }
 
+            try {
+                _phyDisk = new PhysicalDisk {
+                    Interval = 250,
+                    Limit = (float) _config.MaxDiskQueue / 1000,
+                    Probation = 16,
+                    Enabled = true
+                };
+                _phyDisk.LoadAboveLimit += OnLoadAboveLimit;
+                _phyDisk.LoadBelowLimit += OnLoadBelowLimit;
+            }
+            catch (Exception ex) {
+                Log.Error("Unexpected error initializing PhysicalDisk monitoring: {0}", ex.Message);
+                throw;
+            }
+
 
             if (_config.DebugRoboSharp) {
                 Debugger.Instance.DebugMessageEvent += HandleDebugMessage;
@@ -542,18 +561,25 @@ namespace ATxService
         #region general methods
 
         /// <summary>
-        /// Event handler for CPU load dropping below the configured limit.
+        /// Event handler for load dropping below the configured limit(s).
         /// </summary>
         private void OnLoadBelowLimit(object sender, EventArgs e) {
-            Log.Trace("Received a low-CPU-load event!");
-            ResumePausedTransfer();
+            _exceedingLoadLimit--;
+            if (_exceedingLoadLimit < 0)
+                _exceedingLoadLimit = 0;
+            Log.Log(_config.MonitoringLogLevel,
+                "Received 'low-load' from {0} (exceeding: {1})", sender, _exceedingLoadLimit);
+            if (_exceedingLoadLimit == 0)
+                ResumePausedTransfer();
         }
 
         /// <summary>
-        /// Event handler for CPU load exceeding the configured limit.
+        /// Event handler for load exceeding the configured limits.
         /// </summary>
         private void OnLoadAboveLimit(object sender, EventArgs e) {
-            Log.Trace("Received a high-CPU-load event!");
+            _exceedingLoadLimit++;
+            Log.Log(_config.MonitoringLogLevel,
+                "Received 'high-load' from {0} (exceeding: {1})", sender, _exceedingLoadLimit);
             PauseTransfer();
         }
 
@@ -565,6 +591,9 @@ namespace ATxService
 
             // check all system parameters for valid ranges and remember the reason in a string
             // if one of them is failing (to report in the log why we're suspended)
+            if (_phyDisk.HighLoad)
+                suspendReasons.Add("Disk I/O");
+
             if (_cpu.HighLoad)
                 suspendReasons.Add("CPU");
 
diff --git a/Resources/conf-minimal/config.common.xml b/Resources/conf-minimal/config.common.xml
index f35a532..1925ecb 100644
--- a/Resources/conf-minimal/config.common.xml
+++ b/Resources/conf-minimal/config.common.xml
@@ -24,6 +24,11 @@
     <!-- MaxCpuUsage: pause transfer if CPU usage is above this value (in %)-->
     <MaxCpuUsage>25</MaxCpuUsage>
 
+    <!-- MaxDiskQueue: maximum allowed length of disk I/O queue, multiplied
+         by 1000 (so a value of "500" here means the maximum queue length
+         allowed is "0.500"), otherwise running transfers will be paused. -->
+    <MaxDiskQueue>500</MaxDiskQueue>
+
     <!-- MinAvailableMemory: pause transfer if free RAM is below (in MB) -->
     <MinAvailableMemory>512</MinAvailableMemory>
 
diff --git a/Resources/conf/config.common.xml b/Resources/conf/config.common.xml
index 7502066..43845c1 100644
--- a/Resources/conf/config.common.xml
+++ b/Resources/conf/config.common.xml
@@ -24,6 +24,11 @@
     <!-- MaxCpuUsage: pause transfer if CPU usage is above this value (in %)-->
     <MaxCpuUsage>25</MaxCpuUsage>
 
+    <!-- MaxDiskQueue: maximum allowed length of disk I/O queue, multiplied
+         by 1000 (so a value of "500" here means the maximum queue length
+         allowed is "0.500"), otherwise running transfers will be paused. -->
+    <MaxDiskQueue>500</MaxDiskQueue>
+
     <!-- MinAvailableMemory: pause transfer if free RAM is below (in MB) -->
     <MinAvailableMemory>512</MinAvailableMemory>
 
-- 
GitLab