Rework last_log_timestamp handling

Rework last_log_timestamp handling

There was far too much ‘mangle strings and cross your fingers’ going on in there. Much better to deal with actual, honest-to-goodness Ruby objects as much as possible.

diff --git a/lib/mobystash/container.rb b/lib/mobystash/container.rb
index d87fb55..b7143ef 100644
--- a/lib/mobystash/container.rb
+++ b/lib/mobystash/container.rb
@@ -46,7 +46,7 @@ module Mobystash
     # docker_data is the Docker::Container instance representing the moby
     # container metadata, and system_config is the Mobystash::Config.
     #
-    def initialize(docker_data, system_config, last_log_timestamp: nil)
+    def initialize(docker_data, system_config, last_log_time:)
       @id = docker_data.id
 
       @config  = system_config
@@ -76,7 +76,7 @@ module Mobystash
         }
       }
 
-      @last_log_timestamp = last_log_timestamp || Time.at(0).utc.strftime("%FT%T.%NZ")
+      @last_log_time = last_log_time || Time.at(0).utc
       @llt_mutex = Mutex.new
 
       parse_labels(docker_data.info["Config"]["Labels"])
@@ -84,16 +84,22 @@ module Mobystash
       super
 
       @logger.debug(progname) do
-        (["Created new container listener.  Instance variables:"] + %i{@name @capture_logs @parse_syslog @tags @last_log_timestamp}.map do |iv|
+        (["Created new container listener.  Instance variables:"] + %i{@name @capture_logs @parse_syslog @tags @last_log_time}.map do |iv|
           "#{iv}=#{instance_variable_get(iv).inspect}"
         end).join("\n  ")
       end
     end
 
-    # The timestamp, in RFC3339 format, of the last log message which was
-    # received by this container.
+    # The RFC3339 format of the last log timestamp received.
     def last_log_timestamp
-      @llt_mutex.synchronize { @last_log_timestamp }
+      @llt_mutex.synchronize { @last_log_time.strftime("%FT%T.%NZ") }
+    end
+
+    # The Time of the first possible time at which a new log message or event
+    # could possibly have occurred, based on the timestamps of previous events
+    # and/or log entries received from Moby.
+    def next_log_time
+      @llt_mutex.synchronize { @last_log_time + ONE_NANOSECOND }
     end
 
     private
@@ -170,7 +176,7 @@ module Mobystash
             @logger.debug(progname) { "Container is not running; waiting for it to start or be destroyed" }
             wait_for_container_to_start(conn)
           else
-            @logger.debug(progname) { "Capturing logs since #{@last_log_timestamp}" }
+            @logger.debug(progname) { "Capturing logs from #{next_log_time}" }
 
             # The implementation of Docker::Container#streaming_logs has a
             # *terribad* memory leak, in that every log entry that gets received
@@ -185,7 +191,7 @@ module Mobystash
             conn.get(
               "/containers/#{@id}/logs",
               {
-                since: (Time.strptime(@last_log_timestamp, "%FT%T.%N%Z") + ONE_NANOSECOND).strftime("%s.%N"),
+                since: next_log_time.strftime("%s.%N"),
                 timestamps: true,
                 follow: true,
                 stdout: true,
@@ -210,10 +216,10 @@ module Mobystash
     end
 
     def wait_for_container_to_start(conn)
-      @logger.debug(progname) { "Asking for events since #{@last_log_timestamp}" }
+      @logger.debug(progname) { "Asking for events from #{next_log_time}" }
 
-      Docker::Event.since((Time.strptime(@last_log_timestamp, "%FT%T.%N%Z") + ONE_NANOSECOND).strftime("%s.%N"), {}, conn) do |event|
-        @last_log_timestamp = event.time
+      Docker::Event.since(next_log_time.strftime("%s.%N"), {}, conn) do |event|
+        @llt_mutex.synchronize { @last_log_time = Time.at(event.timeNano * ONE_NANOSECOND) }
 
         @logger.debug(progname) { "Docker event@#{event.timeNano}: #{event.Type}.#{event.Action} on #{event.ID}" }
 
@@ -224,12 +230,15 @@ module Mobystash
     def send_event(msg, stream)
       @config.log_entries_read_counter.increment(container_name: @name, container_id: @id, stream: stream.to_s)
 
+      log_timestamp, msg = msg.chomp.split(' ', 2)
+      log_time = Time.strptime(log_timestamp, "%FT%T.%N%Z")
+
       @llt_mutex.synchronize do
-        @last_log_timestamp, msg = msg.chomp.split(' ', 2)
+        @last_log_time = log_time
       end
 
       @config.last_log_entry_at.observe(
-        Time.strptime(@last_log_timestamp, "%FT%T.%N%Z").to_f,
+        log_time.to_f,
         container_name: @name, container_id: @id, stream: stream.to_s
       )
 
@@ -247,7 +256,7 @@ module Mobystash
       if !@filter_regex || !msg.match?(@filter_regex)
         event = {
           message: msg,
-          "@timestamp": @last_log_timestamp,
+          "@timestamp": log_time.strftime("%FT%T.%NZ"),
           moby: {
             stream: stream.to_s,
           },
diff --git a/lib/mobystash/system.rb b/lib/mobystash/system.rb
index 02ff4ce..eae83a4 100644
--- a/lib/mobystash/system.rb
+++ b/lib/mobystash/system.rb
@@ -156,7 +156,7 @@ module Mobystash
       # Thanks, Docker!
       Docker::Container.all({}, docker_connection).each do |c|
         begin
-          @containers[c.id] = Mobystash::Container.new(Docker::Container.get(c.id, {}, docker_connection), @config, last_log_timestamp: state_data[c.id])
+          @containers[c.id] = Mobystash::Container.new(Docker::Container.get(c.id, {}, docker_connection), @config, last_log_time: state_data[c.id] && Time.strptime(state_data[c.id], "%FT%T.%N%Z"))
           @containers[c.id].run!
         rescue Docker::Error::NotFoundError
           nil
diff --git a/spec/container_spec.rb b/spec/container_spec.rb
index 968b5c5..79bda6b 100644
--- a/spec/container_spec.rb
+++ b/spec/container_spec.rb
@@ -17,7 +17,8 @@ describe Mobystash::Container do
   let(:mock_writer)         { instance_double(LogstashWriter) }
   let(:config)              { Mobystash::Config.new(env, logger: logger) }
   let(:docker_data)         { container_fixture(container_name) }
-  let(:container)           { Mobystash::Container.new(docker_data, config) }
+  let(:last_log_time)       { nil }
+  let(:container)           { Mobystash::Container.new(docker_data, config, last_log_time: last_log_time) }
   let(:mock_conn)           { instance_double(Docker::Connection) }
   let(:mock_moby_container) { instance_double(Docker::Container) }
 
@@ -39,9 +40,9 @@ describe Mobystash::Container do
       end
     end
 
-    context "with a last_log_timestamp" do
+    context "with a last_log_time" do
       let(:container_name) { "basic_container" }
-      let(:container) { Mobystash::Container.new(docker_data, config, last_log_timestamp: "2009-06-03T09:06:03.987654321Z") }
+      let(:last_log_time)  { Time.at(1244019963 + Rational(987_654_321, 1_000_000_000)).utc }
 
       it "overrides the last_log_timestamp" do
         expect(container.last_log_timestamp).to eq("2009-06-03T09:06:03.987654321Z")
diff --git a/spec/system_spec.rb b/spec/system_spec.rb
index 4b0251a..769b378 100644
--- a/spec/system_spec.rb
+++ b/spec/system_spec.rb
@@ -143,7 +143,7 @@ describe Mobystash::System do
         allow(Docker::Connection).to receive(:new).with("unix:///var/run/test.sock", {}).and_return(mock_conn)
 
         allow(Docker::Container).to receive(:get).with("asdfasdfbasic", {}, mock_conn).and_return(docker_data)
-        allow(Mobystash::Container).to receive(:new).with(docker_data, system.config, last_log_timestamp: nil).and_return(mobystash_container)
+        allow(Mobystash::Container).to receive(:new).with(docker_data, system.config, last_log_time: nil).and_return(mobystash_container)
         allow(mobystash_container).to receive(:shutdown!)
         allow(mobystash_container).to receive(:last_log_timestamp).and_return("xyzzy")
       end
@@ -316,7 +316,7 @@ describe Mobystash::System do
 
       expect(Mobystash::Container)
         .to receive(:new)
-        .with(moby_container, system.config, last_log_timestamp: nil)
+        .with(moby_container, system.config, last_log_time: nil)
         .and_return(mobystash_container = instance_double(Mobystash::Container))
       expect(mobystash_container).to receive(:run!)
 
@@ -344,7 +344,7 @@ describe Mobystash::System do
 
       expect(Mobystash::Container)

[... diff too long, it was truncated ...]

GitHub sha: a3ac9465