1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
#![cfg(feature = "threadsafe")]

//! This module provides facilities for accessing Hexchat from routines
//! running on threads other than Hexchat's main thread.
//!
//! Hexchat's plugin API isn't inherently thread-safe, however plugins
//! can spawn separate threads and invoke Hexchat's API by placing routines
//! to execute on Hexchat's main thread.
//!
//! `main_thread()` makes it easy to declare a function, or closure, that
//! contains Hexchat API calls. Once executed, it uses the timer feature
//! of Hexchat to delegate. The function or closure can return any sendable
//! cloneable value, and `main_thread()` will pass that back to the calling
//! thread via an `AsyncResult` object. This can either be ignored, and
//! the thread can continue doing other work, or `AsyncResult.get()` can be
//! invoked on the result object; this call will block until the main thread
//! has finished executing the callback.

use std::collections::LinkedList;
use std::sync::{Arc, Condvar, Mutex};
use std::thread;

use crate::hexchat::Hexchat;
use crate::hexchat_entry_points::PHEXCHAT;
use crate::{user_data::*, HexchatError};

use UserData::*;

const TASK_SPURT_SIZE: i32 = 5;
const TASK_REST_MSECS: i64 = 2;

// The type of the queue that closures will be added to and pulled from to run
// on the main thread of Hexchat.
type TaskQueue = LinkedList<Box<dyn Task>>;

/// The task queue that other threads use to schedule tasks to run on the
/// main thread. It is guarded by a `Mutex`.
///
static mut TASK_QUEUE: Option<Arc<Mutex<Option<TaskQueue>>>> = None;

/// The main thread's ID is captured and used by `main_thread()` to determine
/// whether it is being called from the main thread or not. If not, the
/// callback can be invoked right away. Otherwise, it gets scheduled.
///
pub(crate) static mut MAIN_THREAD_ID: Option<thread::ThreadId> = None;

/// Base trait for items placed on the task queue.
///
trait Task : Send {
    fn execute(&mut self, hexchat: &Hexchat);
    fn set_error(&mut self, error: &str);
}

/// A task that executes a closure on the main thread.
///
struct ConcreteTask<F, R>
where
    F: FnMut(&Hexchat) -> R,
    R: Clone + Send,
{
    callback : F,
    result   : AsyncResult<R>,
}

impl<F, R> ConcreteTask<F, R>
where
    F: FnMut(&Hexchat) -> R,
    R: Clone + Send,
{
    fn new(callback: F, result: AsyncResult<R>) -> Self {
        ConcreteTask {
            callback,
            result,
        }
    }
}

impl<F, R> Task for ConcreteTask<F, R>
where
    F: FnMut(&Hexchat) -> R,
    R: Clone + Send,
{
    /// Executes the closure and sets the result.
    ///
    fn execute(&mut self, hexchat: &Hexchat) {
        self.result.set((self.callback)(hexchat));
    }
    /// When the task queue is being shut down, this will be called to set the
    /// result to an error.
    ///
    fn set_error(&mut self, error: &str) {
        self.result.set_error(error);
    }
}

unsafe impl<F, R> Send for ConcreteTask<F, R>
where
    F: FnMut(&Hexchat) -> R,
    R: Clone + Send,
{}

/// A result object that allows callbacks operating on a thread to send their
/// return value to a receiver calling `get()` from another thread. Whether
/// return data needs to be transferred or not, this object can be used to wait
/// on the completion of a callback, thus providing synchronization between
/// threads.
///
#[allow(clippy::type_complexity)]
#[derive(Clone)]
pub struct AsyncResult<T: Clone + Send> {
    data: Arc<(Mutex<(Option<Result<T, HexchatError>>, bool)>, Condvar)>,
}

unsafe impl<T: Clone + Send> Send for AsyncResult<T> {}
unsafe impl<T: Clone + Send> Sync for AsyncResult<T> {}

impl<T: Clone + Send> AsyncResult<T> {
    /// Constructor. Initializes the return data to None.
    ///
    pub (crate)
    fn new() -> Self {
        AsyncResult {
            data: Arc::new((Mutex::new((None, false)), Condvar::new()))
        }
    }
    /// Indicates whether the callback executing on another thread is done or
    /// not. This can be used to poll for the result.
    ///
    pub fn is_done(&self) -> bool {
        let (mtx, _) = &*self.data;
        mtx.lock().unwrap().1
    }
    /// Blocking call to retrieve the return data from a callback on another
    /// thread.
    ///
    pub fn get(&self) -> Result<T, HexchatError> {
        let (mtx, cvar) = &*self.data;
        let mut guard   = mtx.lock().unwrap();
        while !guard.1 {
            guard = cvar.wait(guard).unwrap();
        }
        guard.0.take().unwrap()
    }
    /// Sets the return data for the async result. This will unblock the
    /// receiver waiting on the result from `get()`.
    ///
    pub (crate)
    fn set(&self, result: T) {
        let (mtx, cvar) = &*self.data;
        let mut guard   = mtx.lock().unwrap();
               *guard   = (Some(Ok(result)), true);
        cvar.notify_all();
    }
    fn set_error(&self, error: &str) {
        use HexchatError::ThreadSafeOperationFailed as Error;
        let (mtx, cvar) = &*self.data;
        let mut guard   = mtx.lock().unwrap();
               *guard   = (Some(Err(Error(error.into()))), true);
        cvar.notify_all();
    }
}

/// Executes a closure from the Hexchat main thread. This function returns
/// immediately with an AsyncResult object that can be used to retrieve the
/// result of the operation that will run on the main thread.
///
/// # Arguments
/// * `callback` - The callback to execute on the main thread.
///
pub fn main_thread<F, R>(mut callback: F) -> AsyncResult<R>
where
    F: FnMut(&Hexchat) -> R + Sync + Send,
    F: 'static + Send,
    R: 'static + Clone + Send,
{
    if Some(thread::current().id()) == unsafe { MAIN_THREAD_ID } {
        let result = callback(unsafe { &*PHEXCHAT });
        let res = AsyncResult::new();
        res.set(result);
        res
    } else {
        let res = AsyncResult::new();
        let cln = res.clone();
        if let Some(arc) = unsafe { TASK_QUEUE.as_ref() } {
            if let Some(queue) = arc.lock().unwrap().as_mut() {
                let task = Box::new(ConcreteTask::new(callback, cln));
                queue.push_back(task);
            }
            else {
                res.set_error("Task queue has been shut down.");
            }
        } else {
            res.set_error("Task queue has been shut down.");
        }
        res
    }
}

/// This initializes the fundamental thread-safe features of this library.
/// A mutex guarded task queue is created, and a timer function is registered
/// that handles the queue at intervals. If a thread requires fast response,
/// the handler will field its requests one after another for up to
/// `TASK_SPURT_SIZE` times without rest.
///
pub (crate)
fn main_thread_init() {
    unsafe { MAIN_THREAD_ID = Some(thread::current().id()) }
    if unsafe { TASK_QUEUE.is_none() } {
        unsafe {
            TASK_QUEUE = Some(Arc::new(Mutex::new(Some(LinkedList::new()))));
        }
        let hex = unsafe { &*PHEXCHAT };

        hex.hook_timer(
            TASK_REST_MSECS,
            move |_hc, _ud| {
                if let Some(arc) = unsafe { TASK_QUEUE.as_ref() } {
                    if arc.lock().unwrap().is_some() {
                        let mut count = 1;

                        while let Some(mut task)
                            = arc.lock().unwrap().as_mut()
                                 .and_then(|q| q.pop_front()) {
                            task.execute(hex);
                            count += 1;
                            if count > TASK_SPURT_SIZE {
                                break
                            }
                        }
                        1 // Keep going.
                    } else {
                        0 // Task queue is gone, remove timer callback.
                    }
                } else {
                    0 // Task queue is gone, remove timer callback.
                }
            },
            NoData);
    }
}

/// Called when the an addon is being unloaded. This eliminates the task queue.
/// Any holders of `AsyncResult` objects that are blocked on `.get()` may be
/// waiting forever. This can be called from addons if the thread-safe
/// features aren't going to be utilized. No need to have a timer callback
/// being invoked endlessly doing nothing.
///
pub (crate)
fn main_thread_deinit() {
    if let Some(queue) = unsafe { &TASK_QUEUE } {
        if let Some(mut queue ) = queue.lock().unwrap().take() {
            while let Some(mut task) = queue.pop_front() {
                task.set_error("Task queue is being shut down.");
            }
        }
    }
    unsafe { TASK_QUEUE = None; }
}

/// Stops and removes the main thread task queue handler. Otherwise it will
/// keep checking the queue while doing nothing useful - which isn't
/// necessarily bad. Performance is unaffected either way.
///
/// Support for `main_thread()` is on by default. After this function is
/// invoked, `main_thread()` should not be used and threads in general risk
/// crashing the software if they try to access Hexchat directly without
/// the `main_thread()`. `ThreadSafeContext` and `ThreadSafeListIterator`
/// should also not be used after this function is called, since they rely on
/// `main_thread()` internally.
///
/// # Safety
/// While this will disable the handling of the main thread task queue, it
/// doesn't prevent the plugin author from spawning threads and attempting to
/// use the features of the threadsafe objects this crate provides. If the
/// plugin author intends to use `ThreadSafeContext`, `ThreadSafeListIterator`,
/// or invoke `main_thread()` directly, then this function should not be called.
///
#[deprecated(
    since = "0.2.6",
    note = "This function is no longer necessary. Threadsafe features can be \
            turned off by specifying `features = []` in the Cargo.toml file \
            for the `hexchat-api` dependency.")]
pub unsafe fn turn_off_threadsafe_features() {
    main_thread_deinit();
}