# include <linux/freezer.h>
 #include "async-thread.h"
 
+#define WORK_QUEUED_BIT 0
+#define WORK_DONE_BIT 1
+#define WORK_ORDER_DONE_BIT 2
+
 /*
  * container for the kthread task pointer and the list of pending work
  * One of these is allocated per thread.
        }
 }
 
+static noinline int run_ordered_completions(struct btrfs_workers *workers,
+                                           struct btrfs_work *work)
+{
+       unsigned long flags;
+
+       if (!workers->ordered)
+               return 0;
+
+       set_bit(WORK_DONE_BIT, &work->flags);
+
+       spin_lock_irqsave(&workers->lock, flags);
+
+       while(!list_empty(&workers->order_list)) {
+               work = list_entry(workers->order_list.next,
+                                 struct btrfs_work, order_list);
+
+               if (!test_bit(WORK_DONE_BIT, &work->flags))
+                       break;
+
+               /* we are going to call the ordered done function, but
+                * we leave the work item on the list as a barrier so
+                * that later work items that are done don't have their
+                * functions called before this one returns
+                */
+               if (test_and_set_bit(WORK_ORDER_DONE_BIT, &work->flags))
+                       break;
+
+               spin_unlock_irqrestore(&workers->lock, flags);
+
+               work->ordered_func(work);
+
+               /* now take the lock again and call the freeing code */
+               spin_lock_irqsave(&workers->lock, flags);
+               list_del(&work->order_list);
+               work->ordered_free(work);
+       }
+
+       spin_unlock_irqrestore(&workers->lock, flags);
+       return 0;
+}
+
 /*
  * main loop for servicing work items
  */
                        cur = worker->pending.next;
                        work = list_entry(cur, struct btrfs_work, list);
                        list_del(&work->list);
-                       clear_bit(0, &work->flags);
+                       clear_bit(WORK_QUEUED_BIT, &work->flags);
 
                        work->worker = worker;
                        spin_unlock_irq(&worker->lock);
                        work->func(work);
 
                        atomic_dec(&worker->num_pending);
+                       /*
+                        * unless this is an ordered work queue,
+                        * 'work' was probably freed by func above.
+                        */
+                       run_ordered_completions(worker->workers, work);
+
                        spin_lock_irq(&worker->lock);
                        check_idle_worker(worker);
+
                }
                worker->working = 0;
                if (freezing(current)) {
        workers->num_workers = 0;
        INIT_LIST_HEAD(&workers->worker_list);
        INIT_LIST_HEAD(&workers->idle_list);
+       INIT_LIST_HEAD(&workers->order_list);
        spin_lock_init(&workers->lock);
        workers->max_workers = max;
        workers->idle_thresh = 32;
        workers->name = name;
+       workers->ordered = 0;
 }
 
 /*
        struct btrfs_worker_thread *worker = work->worker;
        unsigned long flags;
 
-       if (test_and_set_bit(0, &work->flags))
+       if (test_and_set_bit(WORK_QUEUED_BIT, &work->flags))
                goto out;
 
        spin_lock_irqsave(&worker->lock, flags);
        int wake = 0;
 
        /* don't requeue something already on a list */
-       if (test_and_set_bit(0, &work->flags))
+       if (test_and_set_bit(WORK_QUEUED_BIT, &work->flags))
                goto out;
 
        worker = find_worker(workers);
+       if (workers->ordered) {
+               spin_lock_irqsave(&workers->lock, flags);
+               list_add_tail(&work->order_list, &workers->order_list);
+               spin_unlock_irqrestore(&workers->lock, flags);
+       } else {
+               INIT_LIST_HEAD(&work->order_list);
+       }
 
        spin_lock_irqsave(&worker->lock, flags);
        atomic_inc(&worker->num_pending);
 
  */
 struct btrfs_work {
        /*
-        * only func should be set to the function you want called
+        * func should be set to the function you want called
         * your work struct is passed as the only arg
+        *
+        * ordered_func must be set for work sent to an ordered work queue,
+        * and it is called to complete a given work item in the same
+        * order they were sent to the queue.
         */
        void (*func)(struct btrfs_work *work);
+       void (*ordered_func)(struct btrfs_work *work);
+       void (*ordered_free)(struct btrfs_work *work);
 
        /*
         * flags should be set to zero.  It is used to make sure the
        /* don't touch these */
        struct btrfs_worker_thread *worker;
        struct list_head list;
+       struct list_head order_list;
 };
 
 struct btrfs_workers {
        /* once a worker has this many requests or fewer, it is idle */
        int idle_thresh;
 
+       /* force completions in the order they were queued */
+       int ordered;
+
        /* list with all the work threads.  The workers on the idle thread
         * may be actively servicing jobs, but they haven't yet hit the
         * idle thresh limit above.
        struct list_head worker_list;
        struct list_head idle_list;
 
+       /*
+        * when operating in ordered mode, this maintains the list
+        * of work items waiting for completion
+        */
+       struct list_head order_list;
+
        /* lock for finding the next worker thread to queue on */
        spinlock_t lock;
 
 
        struct inode *inode;
        struct bio *bio;
        struct list_head list;
-       extent_submit_bio_hook_t *submit_bio_hook;
+       extent_submit_bio_hook_t *submit_bio_start;
+       extent_submit_bio_hook_t *submit_bio_done;
        int rw;
        int mirror_num;
        unsigned long bio_flags;
                btrfs_async_submit_limit(info);
 }
 
-static void run_one_async_submit(struct btrfs_work *work)
+static void run_one_async_start(struct btrfs_work *work)
+{
+       struct btrfs_fs_info *fs_info;
+       struct async_submit_bio *async;
+
+       async = container_of(work, struct  async_submit_bio, work);
+       fs_info = BTRFS_I(async->inode)->root->fs_info;
+       async->submit_bio_start(async->inode, async->rw, async->bio,
+                              async->mirror_num, async->bio_flags);
+}
+
+static void run_one_async_done(struct btrfs_work *work)
 {
        struct btrfs_fs_info *fs_info;
        struct async_submit_bio *async;
            waitqueue_active(&fs_info->async_submit_wait))
                wake_up(&fs_info->async_submit_wait);
 
-       async->submit_bio_hook(async->inode, async->rw, async->bio,
+       async->submit_bio_done(async->inode, async->rw, async->bio,
                               async->mirror_num, async->bio_flags);
+}
+
+static void run_one_async_free(struct btrfs_work *work)
+{
+       struct async_submit_bio *async;
+
+       async = container_of(work, struct  async_submit_bio, work);
        kfree(async);
 }
 
 int btrfs_wq_submit_bio(struct btrfs_fs_info *fs_info, struct inode *inode,
                        int rw, struct bio *bio, int mirror_num,
                        unsigned long bio_flags,
-                       extent_submit_bio_hook_t *submit_bio_hook)
+                       extent_submit_bio_hook_t *submit_bio_start,
+                       extent_submit_bio_hook_t *submit_bio_done)
 {
        struct async_submit_bio *async;
        int limit = btrfs_async_submit_limit(fs_info);
        async->rw = rw;
        async->bio = bio;
        async->mirror_num = mirror_num;
-       async->submit_bio_hook = submit_bio_hook;
-       async->work.func = run_one_async_submit;
+       async->submit_bio_start = submit_bio_start;
+       async->submit_bio_done = submit_bio_done;
+
+       async->work.func = run_one_async_start;
+       async->work.ordered_func = run_one_async_done;
+       async->work.ordered_free = run_one_async_free;
+
        async->work.flags = 0;
        async->bio_flags = bio_flags;
 
        return 0;
 }
 
-static int __btree_submit_bio_hook(struct inode *inode, int rw, struct bio *bio,
-                                int mirror_num, unsigned long bio_flags)
+static int __btree_submit_bio_start(struct inode *inode, int rw,
+                                   struct bio *bio, int mirror_num,
+                                   unsigned long bio_flags)
 {
-       struct btrfs_root *root = BTRFS_I(inode)->root;
-       int ret;
-
        /*
         * when we're called for a write, we're already in the async
         * submission context.  Just jump into btrfs_map_bio
         */
-       if (rw & (1 << BIO_RW)) {
-               btree_csum_one_bio(bio);
-               return btrfs_map_bio(BTRFS_I(inode)->root, rw, bio,
-                                    mirror_num, 1);
-       }
+       btree_csum_one_bio(bio);
+       return 0;
+}
 
+static int __btree_submit_bio_done(struct inode *inode, int rw, struct bio *bio,
+                                int mirror_num, unsigned long bio_flags)
+{
        /*
-        * called for a read, do the setup so that checksum validation
-        * can happen in the async kernel threads
+        * when we're called for a write, we're already in the async
+        * submission context.  Just jump into btrfs_map_bio
         */
-       ret = btrfs_bio_wq_end_io(root->fs_info, bio, 1);
-       BUG_ON(ret);
-
        return btrfs_map_bio(BTRFS_I(inode)->root, rw, bio, mirror_num, 1);
 }
 
         * can happen in parallel across all CPUs
         */
        if (!(rw & (1 << BIO_RW))) {
-               return __btree_submit_bio_hook(inode, rw, bio, mirror_num, 0);
+               int ret;
+               /*
+                * called for a read, do the setup so that checksum validation
+                * can happen in the async kernel threads
+                */
+               ret = btrfs_bio_wq_end_io(BTRFS_I(inode)->root->fs_info,
+                                         bio, 1);
+               BUG_ON(ret);
+
+               return btrfs_map_bio(BTRFS_I(inode)->root, rw, bio,
+                                    mirror_num, 1);
        }
        return btrfs_wq_submit_bio(BTRFS_I(inode)->root->fs_info,
                                   inode, rw, bio, mirror_num, 0,
-                                  __btree_submit_bio_hook);
+                                  __btree_submit_bio_start,
+                                  __btree_submit_bio_done);
 }
 
 static int btree_writepage(struct page *page, struct writeback_control *wbc)
         * were sent by the writeback daemons, improving overall locality
         * of the IO going down the pipe.
         */
-       fs_info->workers.idle_thresh = 128;
+       fs_info->workers.idle_thresh = 8;
+       fs_info->workers.ordered = 1;
 
        btrfs_init_workers(&fs_info->fixup_workers, "fixup", 1);
        btrfs_init_workers(&fs_info->endio_workers, "endio",
 
 int btrfs_wq_submit_bio(struct btrfs_fs_info *fs_info, struct inode *inode,
                        int rw, struct bio *bio, int mirror_num,
                        unsigned long bio_flags,
-                       extent_submit_bio_hook_t *submit_bio_hook);
+                       extent_submit_bio_hook_t *submit_bio_start,
+                       extent_submit_bio_hook_t *submit_bio_done);
+
 int btrfs_congested_async(struct btrfs_fs_info *info, int iodone);
 unsigned long btrfs_async_submit_limit(struct btrfs_fs_info *info);
 int btrfs_write_tree_block(struct extent_buffer *buf);
 
  * At IO completion time the cums attached on the ordered extent record
  * are inserted into the btree
  */
-int __btrfs_submit_bio_hook(struct inode *inode, int rw, struct bio *bio,
+int __btrfs_submit_bio_start(struct inode *inode, int rw, struct bio *bio,
                          int mirror_num, unsigned long bio_flags)
 {
        struct btrfs_root *root = BTRFS_I(inode)->root;
 
        ret = btrfs_csum_one_bio(root, inode, bio);
        BUG_ON(ret);
+       return 0;
+}
 
+/*
+ * in order to insert checksums into the metadata in large chunks,
+ * we wait until bio submission time.   All the pages in the bio are
+ * checksummed and sums are attached onto the ordered extent record.
+ *
+ * At IO completion time the cums attached on the ordered extent record
+ * are inserted into the btree
+ */
+int __btrfs_submit_bio_done(struct inode *inode, int rw, struct bio *bio,
+                         int mirror_num, unsigned long bio_flags)
+{
+       struct btrfs_root *root = BTRFS_I(inode)->root;
        return btrfs_map_bio(root, rw, bio, mirror_num, 1);
 }
 
                /* we're doing a write, do the async checksumming */
                return btrfs_wq_submit_bio(BTRFS_I(inode)->root->fs_info,
                                   inode, rw, bio, mirror_num,
-                                  bio_flags, __btrfs_submit_bio_hook);
+                                  bio_flags, __btrfs_submit_bio_start,
+                                  __btrfs_submit_bio_done);
        }
 
 mapit: