use crate::bindings::filesystem::types;
use crate::runtime::{spawn_blocking, AbortOnDropJoinHandle};
use crate::{
    HostInputStream, HostOutputStream, StreamError, StreamResult, Subscribe, TrappableError,
};
use anyhow::anyhow;
use bytes::{Bytes, BytesMut};
use std::io;
use std::mem;
use std::sync::Arc;

pub type FsResult<T> = Result<T, FsError>;

pub type FsError = TrappableError<types::ErrorCode>;

impl From<wasmtime::component::ResourceTableError> for FsError {
    fn from(error: wasmtime::component::ResourceTableError) -> Self {
        Self::trap(error)
    }
}

impl From<io::Error> for FsError {
    fn from(error: io::Error) -> Self {
        types::ErrorCode::from(error).into()
    }
}

pub enum Descriptor {
    File(File),
    Dir(Dir),
}

impl Descriptor {
    pub fn file(&self) -> Result<&File, types::ErrorCode> {
        match self {
            Descriptor::File(f) => Ok(f),
            Descriptor::Dir(_) => Err(types::ErrorCode::BadDescriptor),
        }
    }

    pub fn dir(&self) -> Result<&Dir, types::ErrorCode> {
        match self {
            Descriptor::Dir(d) => Ok(d),
            Descriptor::File(_) => Err(types::ErrorCode::NotDirectory),
        }
    }

    pub fn is_file(&self) -> bool {
        match self {
            Descriptor::File(_) => true,
            Descriptor::Dir(_) => false,
        }
    }

    pub fn is_dir(&self) -> bool {
        match self {
            Descriptor::File(_) => false,
            Descriptor::Dir(_) => true,
        }
    }
}

bitflags::bitflags! {
    #[derive(Copy, Clone, Debug, PartialEq, Eq)]
    pub struct FilePerms: usize {
        const READ = 0b1;
        const WRITE = 0b10;
    }
}

bitflags::bitflags! {
    #[derive(Copy, Clone, Debug, PartialEq, Eq)]
    pub struct OpenMode: usize {
        const READ = 0b1;
        const WRITE = 0b10;
    }
}

#[derive(Clone)]
pub struct File {
    /// The operating system File this struct is mediating access to.
    ///
    /// Wrapped in an Arc because the same underlying file is used for
    /// implementing the stream types. A copy is also needed for
    /// [`spawn_blocking`].
    ///
    /// [`spawn_blocking`]: Self::spawn_blocking
    pub file: Arc<cap_std::fs::File>,
    /// Permissions to enforce on access to the file. These permissions are
    /// specified by a user of the `crate::WasiCtxBuilder`, and are
    /// enforced prior to any enforced by the underlying operating system.
    pub perms: FilePerms,
    /// The mode the file was opened under: bits for reading, and writing.
    /// Required to correctly report the DescriptorFlags, because cap-std
    /// doesn't presently provide a cross-platform equivalent of reading the
    /// oflags back out using fcntl.
    pub open_mode: OpenMode,

    allow_blocking_current_thread: bool,
}

impl File {
    pub fn new(
        file: cap_std::fs::File,
        perms: FilePerms,
        open_mode: OpenMode,
        allow_blocking_current_thread: bool,
    ) -> Self {
        Self {
            file: Arc::new(file),
            perms,
            open_mode,
            allow_blocking_current_thread,
        }
    }

    /// Execute the blocking `body` function.
    ///
    /// Depending on how the WasiCtx was configured, the body may either be:
    /// - Executed directly on the current thread. In this case the `async`
    ///   signature of this method is effectively a lie and the returned
    ///   Future will always be immediately Ready. Or:
    /// - Spawned on a background thread using [`tokio::task::spawn_blocking`]
    ///   and immediately awaited.
    ///
    /// Intentionally blocking the executor thread might seem unorthodox, but is
    /// not actually a problem for specific workloads. See:
    /// - [`crate::WasiCtxBuilder::allow_blocking_current_thread`]
    /// - [Poor performance of wasmtime file I/O maybe because tokio](https://github.com/bytecodealliance/wasmtime/issues/7973)
    /// - [Implement opt-in for enabling WASI to block the current thread](https://github.com/bytecodealliance/wasmtime/pull/8190)
    pub(crate) async fn run_blocking<F, R>(&self, body: F) -> R
    where
        F: FnOnce(&cap_std::fs::File) -> R + Send + 'static,
        R: Send + 'static,
    {
        match self.as_blocking_file() {
            Some(file) => body(file),
            None => self.spawn_blocking(body).await,
        }
    }

    pub(crate) fn spawn_blocking<F, R>(&self, body: F) -> AbortOnDropJoinHandle<R>
    where
        F: FnOnce(&cap_std::fs::File) -> R + Send + 'static,
        R: Send + 'static,
    {
        let f = self.file.clone();
        spawn_blocking(move || body(&f))
    }

    /// Returns `Some` when the current thread is allowed to block in filesystem
    /// operations, and otherwise returns `None` to indicate that
    /// `spawn_blocking` must be used.
    pub(crate) fn as_blocking_file(&self) -> Option<&cap_std::fs::File> {
        if self.allow_blocking_current_thread {
            Some(&self.file)
        } else {
            None
        }
    }
}

bitflags::bitflags! {
    /// Permission bits for operating on a directory.
    ///
    /// Directories can be limited to being readonly. This will restrict what
    /// can be done with them, for example preventing creation of new files.
    #[derive(Copy, Clone, Debug, PartialEq, Eq)]
    pub struct DirPerms: usize {
        /// This directory can be read, for example its entries can be iterated
        /// over and files can be opened.
        const READ = 0b1;

        /// This directory can be mutated, for example by creating new files
        /// within it.
        const MUTATE = 0b10;
    }
}

#[derive(Clone)]
pub struct Dir {
    /// The operating system file descriptor this struct is mediating access
    /// to.
    ///
    /// Wrapped in an Arc because a copy is needed for [`spawn_blocking`].
    ///
    /// [`spawn_blocking`]: Self::spawn_blocking
    pub dir: Arc<cap_std::fs::Dir>,
    /// Permissions to enforce on access to this directory. These permissions
    /// are specified by a user of the `crate::WasiCtxBuilder`, and
    /// are enforced prior to any enforced by the underlying operating system.
    ///
    /// These permissions are also enforced on any directories opened under
    /// this directory.
    pub perms: DirPerms,
    /// Permissions to enforce on any files opened under this directory.
    pub file_perms: FilePerms,
    /// The mode the directory was opened under: bits for reading, and writing.
    /// Required to correctly report the DescriptorFlags, because cap-std
    /// doesn't presently provide a cross-platform equivalent of reading the
    /// oflags back out using fcntl.
    pub open_mode: OpenMode,

    allow_blocking_current_thread: bool,
}

impl Dir {
    pub fn new(
        dir: cap_std::fs::Dir,
        perms: DirPerms,
        file_perms: FilePerms,
        open_mode: OpenMode,
        allow_blocking_current_thread: bool,
    ) -> Self {
        Dir {
            dir: Arc::new(dir),
            perms,
            file_perms,
            open_mode,
            allow_blocking_current_thread,
        }
    }

    /// Execute the blocking `body` function.
    ///
    /// Depending on how the WasiCtx was configured, the body may either be:
    /// - Executed directly on the current thread. In this case the `async`
    ///   signature of this method is effectively a lie and the returned
    ///   Future will always be immediately Ready. Or:
    /// - Spawned on a background thread using [`tokio::task::spawn_blocking`]
    ///   and immediately awaited.
    ///
    /// Intentionally blocking the executor thread might seem unorthodox, but is
    /// not actually a problem for specific workloads. See:
    /// - [`crate::WasiCtxBuilder::allow_blocking_current_thread`]
    /// - [Poor performance of wasmtime file I/O maybe because tokio](https://github.com/bytecodealliance/wasmtime/issues/7973)
    /// - [Implement opt-in for enabling WASI to block the current thread](https://github.com/bytecodealliance/wasmtime/pull/8190)
    pub(crate) async fn run_blocking<F, R>(&self, body: F) -> R
    where
        F: FnOnce(&cap_std::fs::Dir) -> R + Send + 'static,
        R: Send + 'static,
    {
        if self.allow_blocking_current_thread {
            body(&self.dir)
        } else {
            let d = self.dir.clone();
            spawn_blocking(move || body(&d)).await
        }
    }
}

pub struct FileInputStream {
    file: File,
    position: u64,
    state: ReadState,
}
enum ReadState {
    Idle,
    Waiting(AbortOnDropJoinHandle<ReadState>),
    DataAvailable(Bytes),
    Error(io::Error),
    Closed,
}
impl FileInputStream {
    pub fn new(file: &File, position: u64) -> Self {
        Self {
            file: file.clone(),
            position,
            state: ReadState::Idle,
        }
    }

    fn blocking_read(file: &cap_std::fs::File, offset: u64, size: usize) -> ReadState {
        use system_interface::fs::FileIoExt;

        let mut buf = BytesMut::zeroed(size);
        loop {
            match file.read_at(&mut buf, offset) {
                Ok(0) => return ReadState::Closed,
                Ok(n) => {
                    buf.truncate(n);
                    return ReadState::DataAvailable(buf.freeze());
                }
                Err(e) if e.kind() == std::io::ErrorKind::Interrupted => {
                    // Try again, continue looping
                }
                Err(e) => return ReadState::Error(e),
            }
        }
    }

    /// Wait for existing background task to finish, without starting any new background reads.
    async fn wait_ready(&mut self) {
        match &mut self.state {
            ReadState::Waiting(task) => {
                self.state = task.await;
            }
            _ => {}
        }
    }
}
#[async_trait::async_trait]
impl HostInputStream for FileInputStream {
    fn read(&mut self, size: usize) -> StreamResult<Bytes> {
        match &mut self.state {
            ReadState::Idle => {
                if size == 0 {
                    return Ok(Bytes::new());
                }

                let p = self.position;
                self.state = ReadState::Waiting(
                    self.file
                        .spawn_blocking(move |f| Self::blocking_read(f, p, size)),
                );
                Ok(Bytes::new())
            }
            ReadState::DataAvailable(b) => {
                let min_len = b.len().min(size);
                let chunk = b.split_to(min_len);
                if b.len() == 0 {
                    self.state = ReadState::Idle;
                }
                self.position += min_len as u64;
                Ok(chunk)
            }
            ReadState::Waiting(_) => Ok(Bytes::new()),
            ReadState::Error(_) => match mem::replace(&mut self.state, ReadState::Closed) {
                ReadState::Error(e) => Err(StreamError::LastOperationFailed(e.into())),
                _ => unreachable!(),
            },
            ReadState::Closed => Err(StreamError::Closed),
        }
    }
    /// Specialized blocking_* variant to bypass tokio's task spawning & joining
    /// overhead on synchronous file I/O.
    async fn blocking_read(&mut self, size: usize) -> StreamResult<Bytes> {
        self.wait_ready().await;

        // Before we defer to the regular `read`, make sure it has data ready to go:
        if let ReadState::Idle = self.state {
            let p = self.position;
            self.state = self
                .file
                .run_blocking(move |f| Self::blocking_read(f, p, size))
                .await;
        }

        self.read(size)
    }
    async fn cancel(&mut self) {
        match mem::replace(&mut self.state, ReadState::Closed) {
            ReadState::Waiting(task) => {
                // The task was created using `spawn_blocking`, so unless we're
                // lucky enough that the task hasn't started yet, the abort
                // signal won't have any effect and we're forced to wait for it
                // to run to completion.
                // From the guest's point of view, `input-stream::drop` then
                // appears to block. Certainly less than ideal, but arguably still
                // better than letting the guest rack up an unbounded number of
                // background tasks. Also, the guest is only blocked if
                // the stream was dropped mid-read, which we don't expect to
                // occur frequently.
                task.cancel().await;
            }
            _ => {}
        }
    }
}
#[async_trait::async_trait]
impl Subscribe for FileInputStream {
    async fn ready(&mut self) {
        if let ReadState::Idle = self.state {
            // The guest hasn't initiated any read, but is nonetheless waiting
            // for data to be available. We'll start a read for them:

            const DEFAULT_READ_SIZE: usize = 4096;
            let p = self.position;
            self.state = ReadState::Waiting(
                self.file
                    .spawn_blocking(move |f| Self::blocking_read(f, p, DEFAULT_READ_SIZE)),
            );
        }

        self.wait_ready().await
    }
}

#[derive(Clone, Copy)]
pub(crate) enum FileOutputMode {
    Position(u64),
    Append,
}

pub(crate) struct FileOutputStream {
    file: File,
    mode: FileOutputMode,
    state: OutputState,
}

enum OutputState {
    Ready,
    /// Allows join future to be awaited in a cancellable manner. Gone variant indicates
    /// no task is currently outstanding.
    Waiting(AbortOnDropJoinHandle<io::Result<usize>>),
    /// The last I/O operation failed with this error.
    Error(io::Error),
    Closed,
}

impl FileOutputStream {
    pub fn write_at(file: &File, position: u64) -> Self {
        Self {
            file: file.clone(),
            mode: FileOutputMode::Position(position),
            state: OutputState::Ready,
        }
    }

    pub fn append(file: &File) -> Self {
        Self {
            file: file.clone(),
            mode: FileOutputMode::Append,
            state: OutputState::Ready,
        }
    }

    fn blocking_write(
        file: &cap_std::fs::File,
        mut buf: Bytes,
        mode: FileOutputMode,
    ) -> io::Result<usize> {
        use system_interface::fs::FileIoExt;

        match mode {
            FileOutputMode::Position(mut p) => {
                let mut total = 0;
                loop {
                    let nwritten = file.write_at(buf.as_ref(), p)?;
                    // afterwards buf contains [nwritten, len):
                    let _ = buf.split_to(nwritten);
                    p += nwritten as u64;
                    total += nwritten;
                    if buf.is_empty() {
                        break;
                    }
                }
                Ok(total)
            }
            FileOutputMode::Append => {
                let mut total = 0;
                loop {
                    let nwritten = file.append(buf.as_ref())?;
                    let _ = buf.split_to(nwritten);
                    total += nwritten;
                    if buf.is_empty() {
                        break;
                    }
                }
                Ok(total)
            }
        }
    }
}

// FIXME: configurable? determine from how much space left in file?
const FILE_WRITE_CAPACITY: usize = 1024 * 1024;

#[async_trait::async_trait]
impl HostOutputStream for FileOutputStream {
    fn write(&mut self, buf: Bytes) -> Result<(), StreamError> {
        match self.state {
            OutputState::Ready => {}
            OutputState::Closed => return Err(StreamError::Closed),
            OutputState::Waiting(_) | OutputState::Error(_) => {
                // a write is pending - this call was not permitted
                return Err(StreamError::Trap(anyhow!(
                    "write not permitted: check_write not called first"
                )));
            }
        }

        let m = self.mode;
        self.state = OutputState::Waiting(
            self.file
                .spawn_blocking(move |f| Self::blocking_write(f, buf, m)),
        );
        Ok(())
    }
    /// Specialized blocking_* variant to bypass tokio's task spawning & joining
    /// overhead on synchronous file I/O.
    async fn blocking_write_and_flush(&mut self, buf: Bytes) -> StreamResult<()> {
        self.ready().await;

        match self.state {
            OutputState::Ready => {}
            OutputState::Closed => return Err(StreamError::Closed),
            OutputState::Error(_) => match mem::replace(&mut self.state, OutputState::Closed) {
                OutputState::Error(e) => return Err(StreamError::LastOperationFailed(e.into())),
                _ => unreachable!(),
            },
            OutputState::Waiting(_) => unreachable!("we've just waited for readiness"),
        }

        let m = self.mode;
        match self
            .file
            .run_blocking(move |f| Self::blocking_write(f, buf, m))
            .await
        {
            Ok(nwritten) => {
                if let FileOutputMode::Position(ref mut p) = &mut self.mode {
                    *p += nwritten as u64;
                }
                self.state = OutputState::Ready;
                Ok(())
            }
            Err(e) => {
                self.state = OutputState::Closed;
                Err(StreamError::LastOperationFailed(e.into()))
            }
        }
    }
    fn flush(&mut self) -> Result<(), StreamError> {
        match self.state {
            // Only userland buffering of file writes is in the blocking task,
            // so there's nothing extra that needs to be done to request a
            // flush.
            OutputState::Ready | OutputState::Waiting(_) => Ok(()),
            OutputState::Closed => Err(StreamError::Closed),
            OutputState::Error(_) => match mem::replace(&mut self.state, OutputState::Closed) {
                OutputState::Error(e) => Err(StreamError::LastOperationFailed(e.into())),
                _ => unreachable!(),
            },
        }
    }
    fn check_write(&mut self) -> Result<usize, StreamError> {
        match self.state {
            OutputState::Ready => Ok(FILE_WRITE_CAPACITY),
            OutputState::Closed => Err(StreamError::Closed),
            OutputState::Error(_) => match mem::replace(&mut self.state, OutputState::Closed) {
                OutputState::Error(e) => Err(StreamError::LastOperationFailed(e.into())),
                _ => unreachable!(),
            },
            OutputState::Waiting(_) => Ok(0),
        }
    }
    async fn cancel(&mut self) {
        match mem::replace(&mut self.state, OutputState::Closed) {
            OutputState::Waiting(task) => {
                // The task was created using `spawn_blocking`, so unless we're
                // lucky enough that the task hasn't started yet, the abort
                // signal won't have any effect and we're forced to wait for it
                // to run to completion.
                // From the guest's point of view, `output-stream::drop` then
                // appears to block. Certainly less than ideal, but arguably still
                // better than letting the guest rack up an unbounded number of
                // background tasks. Also, the guest is only blocked if
                // the stream was dropped mid-write, which we don't expect to
                // occur frequently.
                task.cancel().await;
            }
            _ => {}
        }
    }
}

#[async_trait::async_trait]
impl Subscribe for FileOutputStream {
    async fn ready(&mut self) {
        if let OutputState::Waiting(task) = &mut self.state {
            self.state = match task.await {
                Ok(nwritten) => {
                    if let FileOutputMode::Position(ref mut p) = &mut self.mode {
                        *p += nwritten as u64;
                    }
                    OutputState::Ready
                }
                Err(e) => OutputState::Error(e),
            };
        }
    }
}

pub struct ReaddirIterator(
    std::sync::Mutex<Box<dyn Iterator<Item = FsResult<types::DirectoryEntry>> + Send + 'static>>,
);

impl ReaddirIterator {
    pub(crate) fn new(
        i: impl Iterator<Item = FsResult<types::DirectoryEntry>> + Send + 'static,
    ) -> Self {
        ReaddirIterator(std::sync::Mutex::new(Box::new(i)))
    }
    pub(crate) fn next(&self) -> FsResult<Option<types::DirectoryEntry>> {
        self.0.lock().unwrap().next().transpose()
    }
}

impl IntoIterator for ReaddirIterator {
    type Item = FsResult<types::DirectoryEntry>;
    type IntoIter = Box<dyn Iterator<Item = Self::Item> + Send>;

    fn into_iter(self) -> Self::IntoIter {
        self.0.into_inner().unwrap()
    }
}