import * as Sentry from "@sentry/browser";
import { datadogRum } from "@datadog/browser-rum";

import { multipart_msg_t } from "@skydio/lcm/types/skybus_tunnel/multipart_msg_t";

import EventChunks from "./event_chunks";

import { ReceiveStats, initialReceiveStats } from "./stats";
import { PublishOptions } from "../types";
import { logger } from "../logger";

export type SendDataCallback = (
  channel: string,
  data: Uint8Array,
  options?: PublishOptions
) => void;
export type ReceiveMsgCallback = (channel: string, data: Uint8Array) => void;

const RESET_STREAMING_TIMEOUT = 0.7; // seconds

// NOTE(sam): All of this logic is copied from infrastructure/skybus_tunnel/channel_buffer.cc
export default class ChannelBuffer {
  private channel: string;
  private publishMsg: ReceiveMsgCallback;
  // We want this mapping to be ordered
  private chunkMap: Map<number, EventChunks>;
  private maxBufferLength: number;
  private nextIdToPublish = -1;
  private expectedEventId = -1;
  private expectedChunkIndex = -1;
  private lastChunkAddedTime = -1;
  private lastPublishTime = -1;
  private numChunksDropped = 0;

  private stats: ReceiveStats;

  constructor(channel: string, msgCallback: ReceiveMsgCallback, maxBufferLength: number) {
    this.channel = channel;
    this.publishMsg = msgCallback;
    this.chunkMap = new Map();
    this.maxBufferLength = maxBufferLength;
    this.stats = { ...initialReceiveStats };
  }

  public getStats() {
    return this.stats;
  }

  public pushChunk(msg: multipart_msg_t) {
    const now = new Date().getTime();
    this.stats.lastChunkReceivedTime = now;
    this.stats.chunksReceived++;
    this.stats.bytesReceived += msg.chunk_size;
    // this is the first chunk
    if (this.nextIdToPublish === -1) {
      this.nextIdToPublish = msg.id;
      this.expectedEventId = msg.id;
      this.expectedChunkIndex = msg.chunk_index;
    } else {
      // Determine if we should ignore this chunk, or if the connection has reset
      const idDelta = msg.id - this.nextIdToPublish;
      const threshold = this.maxBufferLength * -3;
      const secondsSinceLastAdd = (now - this.lastChunkAddedTime) / 1000;

      let shouldReset = false;
      if (idDelta < threshold) {
        this.logStatus(
          `Reset id from ${this.nextIdToPublish} to ${msg.id} due to id gap of ${idDelta}`
        );
        shouldReset = true;
      } else if (idDelta < 0 && secondsSinceLastAdd > RESET_STREAMING_TIMEOUT) {
        this.logStatus(
          `Reset id from ${this.nextIdToPublish} to ${msg.id} due to timeout of ${secondsSinceLastAdd} seconds.`
        );
        shouldReset = true;
      }

      if (shouldReset) {
        // Reset to values from current message
        this.nextIdToPublish = msg.id;
        this.expectedEventId = msg.id;
        this.expectedChunkIndex = msg.chunk_index;
        if (this.chunkMap.size) {
          this.logStatus(`Clearing ${this.chunkMap.size} unpublished events.`);
          this.chunkMap.clear();
        }
      } else if (idDelta < 0) {
        this.numChunksDropped++;
        this.logStatus(
          `Dropping chunk from event ${msg.id} since it is older than ${this.nextIdToPublish}.`,
          `Last published ${(now - this.lastPublishTime) / 1000}.`,
          `Dropped ${this.numChunksDropped} incoming chunks total so far.`
        );
        return;
      } else if (idDelta > 100) {
        this.logStatus(`Large event id gap of +${idDelta}`);
      }
    }

    if (!this.chunkMap.has(msg.id)) {
      this.chunkMap.set(msg.id, new EventChunks(msg.chunk_count));
    }
    const eventChunks = this.chunkMap.get(msg.id)!;
    if (msg.chunk_count !== eventChunks.length) {
      this.logWarning(
        `Total chunks changed from ${eventChunks.length} to ${msg.chunk_count} for event ${msg.id}.`,
        `Dropping chunk ${msg.chunk_index}`
      );
      return;
    }
    if (msg.chunk_index >= eventChunks.length) {
      this.logWarning(
        `chunk_index ${msg.chunk_index} is >= total chunks ${eventChunks.length} for event ${msg.id}.`,
        "Dropping it."
      );
      return;
    }

    // Not the chunk we were expecting, must be a recovered chunk
    if (
      msg.id < this.expectedEventId ||
      (msg.id === this.expectedEventId && msg.chunk_index < this.expectedChunkIndex)
    ) {
      if (eventChunks.hasChunk(msg.chunk_index)) {
        this.logStatus(`Same chunk recovered twice (id=${msg.id}, index=${msg.chunk_index}).`);
      } else {
        this.stats.lastChunkRecoveredTime = now;
        this.stats.chunksRecovered++;
        eventChunks.numRecovered++;
      }
    } else {
      // Reset expectations based on current message
      this.expectedEventId = msg.id;
      this.expectedChunkIndex = msg.chunk_index + 1;
      if (this.expectedChunkIndex >= msg.chunk_count) {
        this.expectedChunkIndex = 0;
        this.expectedEventId++;
      }
    }

    eventChunks.receiveChunk(msg);
    this.lastChunkAddedTime = now;
  }

  public tick() {
    if (!this.chunkMap.size) {
      // No pending chunks yet
      return;
    }

    // Consider it stalled if we run out of space for new events
    let bufferStalled = this.chunkMap.size >= this.maxBufferLength;
    let numCompleteEvents = [...this.chunkMap.values()].filter(
      chunks => chunks.receivedAllChunks
    ).length;

    if (!bufferStalled && !numCompleteEvents) {
      // No complete events and remaining space, nothing to do right now
      return;
    }

    for (let [eventId, eventChunks] of this.chunkMap) {
      if (eventId > this.nextIdToPublish) {
        // This event is ahead of schedule
        if (bufferStalled || numCompleteEvents > 0) {
          // There are no chunks for the prior events - skip them
          const idGap = eventId - this.nextIdToPublish;
          this.stats.eventsDropped += idGap;
          this.stats.lastEventDroppedTime = new Date().getTime();
          this.nextIdToPublish = eventId;
        } else {
          // Not yet ready for this event
          break;
        }
      }

      if (eventChunks.receivedAllChunks) {
        try {
          this.publishCurrentEvent();
          numCompleteEvents--;
        } catch (e) {
          this.dropCurrentEvent();
          this.logError(e);
          Sentry.captureException(e); // still send the error to sentry
          datadogRum.addError(e); // still send the error to Datadog
        }
        bufferStalled = false;
      } else if (bufferStalled || numCompleteEvents > 0) {
        // Evict this event and move on to the next one
        this.dropCurrentEvent();
        if (numCompleteEvents === 0) {
          bufferStalled = false;
        }
      } else {
        // Stop the loop at the first incomplete, non-dropped event
        break;
      }
    }

    // Delete all old events below the next id to maintain that events will be published in order
    this.chunkMap.forEach((_chunks, eventId) => {
      if (eventId < this.nextIdToPublish) {
        this.chunkMap.delete(eventId);
      }
    });
  }

  private publishCurrentEvent() {
    const eventChunks = this.chunkMap.get(this.nextIdToPublish);
    if (!eventChunks) {
      throw new Error(`${this.channel}: No chunk map for event ${this.nextIdToPublish}`);
    }

    const now = new Date().getTime();
    this.stats.eventsCompleted++;
    this.stats.lastEventCompletedTime = now;
    this.stats.eventsRecovered += Number(eventChunks.numRecovered > 0);

    this.publishMsg(this.channel, eventChunks.getData());
    this.lastPublishTime = new Date().getTime();
    this.nextIdToPublish++;
  }

  private dropCurrentEvent() {
    // Give up on the current event and move the next event to the front of the queue
    const eventChunks = this.chunkMap.get(this.nextIdToPublish);
    if (eventChunks) {
      this.stats.chunksMissed += eventChunks.length - eventChunks.numReceived;
    } else {
      this.logWarning(
        `No events to drop. id=${this.nextIdToPublish}, map_size=${this.chunkMap.size}`
      );
    }
    this.stats.eventsDropped++;
    this.stats.lastEventDroppedTime = new Date().getTime();
    this.nextIdToPublish++;
  }

  private logStatus(message: any, ...args: any[]) {
    logger.log(`${this.channel}:`, message, ...args);
  }

  private logWarning(message: any, ...args: any[]) {
    logger.warn(`${this.channel}:`, message, ...args);
  }

  private logError(message: any, ...args: any[]) {
    logger.error(`${this.channel}:`, message, ...args);
  }
}
