]> www.pilppa.org Git - linux-2.6-omap-h63xx.git/blob - drivers/md/dm-kcopyd.c
dm kcopyd: avoid queue shuffle
[linux-2.6-omap-h63xx.git] / drivers / md / dm-kcopyd.c
1 /*
2  * Copyright (C) 2002 Sistina Software (UK) Limited.
3  * Copyright (C) 2006 Red Hat GmbH
4  *
5  * This file is released under the GPL.
6  *
7  * Kcopyd provides a simple interface for copying an area of one
8  * block-device to one or more other block-devices, with an asynchronous
9  * completion notification.
10  */
11
12 #include <linux/types.h>
13 #include <asm/atomic.h>
14 #include <linux/blkdev.h>
15 #include <linux/fs.h>
16 #include <linux/init.h>
17 #include <linux/list.h>
18 #include <linux/mempool.h>
19 #include <linux/module.h>
20 #include <linux/pagemap.h>
21 #include <linux/slab.h>
22 #include <linux/vmalloc.h>
23 #include <linux/workqueue.h>
24 #include <linux/mutex.h>
25 #include <linux/dm-kcopyd.h>
26
27 #include "dm.h"
28
29 /*-----------------------------------------------------------------
30  * Each kcopyd client has its own little pool of preallocated
31  * pages for kcopyd io.
32  *---------------------------------------------------------------*/
33 struct dm_kcopyd_client {
34         spinlock_t lock;
35         struct page_list *pages;
36         unsigned int nr_pages;
37         unsigned int nr_free_pages;
38
39         struct dm_io_client *io_client;
40
41         wait_queue_head_t destroyq;
42         atomic_t nr_jobs;
43
44         mempool_t *job_pool;
45
46         struct workqueue_struct *kcopyd_wq;
47         struct work_struct kcopyd_work;
48
49 /*
50  * We maintain three lists of jobs:
51  *
52  * i)   jobs waiting for pages
53  * ii)  jobs that have pages, and are waiting for the io to be issued.
54  * iii) jobs that have completed.
55  *
56  * All three of these are protected by job_lock.
57  */
58         spinlock_t job_lock;
59         struct list_head complete_jobs;
60         struct list_head io_jobs;
61         struct list_head pages_jobs;
62 };
63
64 static void wake(struct dm_kcopyd_client *kc)
65 {
66         queue_work(kc->kcopyd_wq, &kc->kcopyd_work);
67 }
68
69 static struct page_list *alloc_pl(void)
70 {
71         struct page_list *pl;
72
73         pl = kmalloc(sizeof(*pl), GFP_KERNEL);
74         if (!pl)
75                 return NULL;
76
77         pl->page = alloc_page(GFP_KERNEL);
78         if (!pl->page) {
79                 kfree(pl);
80                 return NULL;
81         }
82
83         return pl;
84 }
85
86 static void free_pl(struct page_list *pl)
87 {
88         __free_page(pl->page);
89         kfree(pl);
90 }
91
92 static int kcopyd_get_pages(struct dm_kcopyd_client *kc,
93                             unsigned int nr, struct page_list **pages)
94 {
95         struct page_list *pl;
96
97         spin_lock(&kc->lock);
98         if (kc->nr_free_pages < nr) {
99                 spin_unlock(&kc->lock);
100                 return -ENOMEM;
101         }
102
103         kc->nr_free_pages -= nr;
104         for (*pages = pl = kc->pages; --nr; pl = pl->next)
105                 ;
106
107         kc->pages = pl->next;
108         pl->next = NULL;
109
110         spin_unlock(&kc->lock);
111
112         return 0;
113 }
114
115 static void kcopyd_put_pages(struct dm_kcopyd_client *kc, struct page_list *pl)
116 {
117         struct page_list *cursor;
118
119         spin_lock(&kc->lock);
120         for (cursor = pl; cursor->next; cursor = cursor->next)
121                 kc->nr_free_pages++;
122
123         kc->nr_free_pages++;
124         cursor->next = kc->pages;
125         kc->pages = pl;
126         spin_unlock(&kc->lock);
127 }
128
129 /*
130  * These three functions resize the page pool.
131  */
132 static void drop_pages(struct page_list *pl)
133 {
134         struct page_list *next;
135
136         while (pl) {
137                 next = pl->next;
138                 free_pl(pl);
139                 pl = next;
140         }
141 }
142
143 static int client_alloc_pages(struct dm_kcopyd_client *kc, unsigned int nr)
144 {
145         unsigned int i;
146         struct page_list *pl = NULL, *next;
147
148         for (i = 0; i < nr; i++) {
149                 next = alloc_pl();
150                 if (!next) {
151                         if (pl)
152                                 drop_pages(pl);
153                         return -ENOMEM;
154                 }
155                 next->next = pl;
156                 pl = next;
157         }
158
159         kcopyd_put_pages(kc, pl);
160         kc->nr_pages += nr;
161         return 0;
162 }
163
164 static void client_free_pages(struct dm_kcopyd_client *kc)
165 {
166         BUG_ON(kc->nr_free_pages != kc->nr_pages);
167         drop_pages(kc->pages);
168         kc->pages = NULL;
169         kc->nr_free_pages = kc->nr_pages = 0;
170 }
171
172 /*-----------------------------------------------------------------
173  * kcopyd_jobs need to be allocated by the *clients* of kcopyd,
174  * for this reason we use a mempool to prevent the client from
175  * ever having to do io (which could cause a deadlock).
176  *---------------------------------------------------------------*/
177 struct kcopyd_job {
178         struct dm_kcopyd_client *kc;
179         struct list_head list;
180         unsigned long flags;
181
182         /*
183          * Error state of the job.
184          */
185         int read_err;
186         unsigned long write_err;
187
188         /*
189          * Either READ or WRITE
190          */
191         int rw;
192         struct dm_io_region source;
193
194         /*
195          * The destinations for the transfer.
196          */
197         unsigned int num_dests;
198         struct dm_io_region dests[DM_KCOPYD_MAX_REGIONS];
199
200         sector_t offset;
201         unsigned int nr_pages;
202         struct page_list *pages;
203
204         /*
205          * Set this to ensure you are notified when the job has
206          * completed.  'context' is for callback to use.
207          */
208         dm_kcopyd_notify_fn fn;
209         void *context;
210
211         /*
212          * These fields are only used if the job has been split
213          * into more manageable parts.
214          */
215         struct mutex lock;
216         atomic_t sub_jobs;
217         sector_t progress;
218 };
219
220 /* FIXME: this should scale with the number of pages */
221 #define MIN_JOBS 512
222
223 static struct kmem_cache *_job_cache;
224
225 int __init dm_kcopyd_init(void)
226 {
227         _job_cache = KMEM_CACHE(kcopyd_job, 0);
228         if (!_job_cache)
229                 return -ENOMEM;
230
231         return 0;
232 }
233
234 void dm_kcopyd_exit(void)
235 {
236         kmem_cache_destroy(_job_cache);
237         _job_cache = NULL;
238 }
239
240 /*
241  * Functions to push and pop a job onto the head of a given job
242  * list.
243  */
244 static struct kcopyd_job *pop(struct list_head *jobs,
245                               struct dm_kcopyd_client *kc)
246 {
247         struct kcopyd_job *job = NULL;
248         unsigned long flags;
249
250         spin_lock_irqsave(&kc->job_lock, flags);
251
252         if (!list_empty(jobs)) {
253                 job = list_entry(jobs->next, struct kcopyd_job, list);
254                 list_del(&job->list);
255         }
256         spin_unlock_irqrestore(&kc->job_lock, flags);
257
258         return job;
259 }
260
261 static void push(struct list_head *jobs, struct kcopyd_job *job)
262 {
263         unsigned long flags;
264         struct dm_kcopyd_client *kc = job->kc;
265
266         spin_lock_irqsave(&kc->job_lock, flags);
267         list_add_tail(&job->list, jobs);
268         spin_unlock_irqrestore(&kc->job_lock, flags);
269 }
270
271
272 static void push_head(struct list_head *jobs, struct kcopyd_job *job)
273 {
274         unsigned long flags;
275         struct dm_kcopyd_client *kc = job->kc;
276
277         spin_lock_irqsave(&kc->job_lock, flags);
278         list_add(&job->list, jobs);
279         spin_unlock_irqrestore(&kc->job_lock, flags);
280 }
281
282 /*
283  * These three functions process 1 item from the corresponding
284  * job list.
285  *
286  * They return:
287  * < 0: error
288  *   0: success
289  * > 0: can't process yet.
290  */
291 static int run_complete_job(struct kcopyd_job *job)
292 {
293         void *context = job->context;
294         int read_err = job->read_err;
295         unsigned long write_err = job->write_err;
296         dm_kcopyd_notify_fn fn = job->fn;
297         struct dm_kcopyd_client *kc = job->kc;
298
299         kcopyd_put_pages(kc, job->pages);
300         mempool_free(job, kc->job_pool);
301         fn(read_err, write_err, context);
302
303         if (atomic_dec_and_test(&kc->nr_jobs))
304                 wake_up(&kc->destroyq);
305
306         return 0;
307 }
308
309 static void complete_io(unsigned long error, void *context)
310 {
311         struct kcopyd_job *job = (struct kcopyd_job *) context;
312         struct dm_kcopyd_client *kc = job->kc;
313
314         if (error) {
315                 if (job->rw == WRITE)
316                         job->write_err |= error;
317                 else
318                         job->read_err = 1;
319
320                 if (!test_bit(DM_KCOPYD_IGNORE_ERROR, &job->flags)) {
321                         push(&kc->complete_jobs, job);
322                         wake(kc);
323                         return;
324                 }
325         }
326
327         if (job->rw == WRITE)
328                 push(&kc->complete_jobs, job);
329
330         else {
331                 job->rw = WRITE;
332                 push(&kc->io_jobs, job);
333         }
334
335         wake(kc);
336 }
337
338 /*
339  * Request io on as many buffer heads as we can currently get for
340  * a particular job.
341  */
342 static int run_io_job(struct kcopyd_job *job)
343 {
344         int r;
345         struct dm_io_request io_req = {
346                 .bi_rw = job->rw | (1 << BIO_RW_SYNC),
347                 .mem.type = DM_IO_PAGE_LIST,
348                 .mem.ptr.pl = job->pages,
349                 .mem.offset = job->offset,
350                 .notify.fn = complete_io,
351                 .notify.context = job,
352                 .client = job->kc->io_client,
353         };
354
355         if (job->rw == READ)
356                 r = dm_io(&io_req, 1, &job->source, NULL);
357         else
358                 r = dm_io(&io_req, job->num_dests, job->dests, NULL);
359
360         return r;
361 }
362
363 static int run_pages_job(struct kcopyd_job *job)
364 {
365         int r;
366
367         job->nr_pages = dm_div_up(job->dests[0].count + job->offset,
368                                   PAGE_SIZE >> 9);
369         r = kcopyd_get_pages(job->kc, job->nr_pages, &job->pages);
370         if (!r) {
371                 /* this job is ready for io */
372                 push(&job->kc->io_jobs, job);
373                 return 0;
374         }
375
376         if (r == -ENOMEM)
377                 /* can't complete now */
378                 return 1;
379
380         return r;
381 }
382
383 /*
384  * Run through a list for as long as possible.  Returns the count
385  * of successful jobs.
386  */
387 static int process_jobs(struct list_head *jobs, struct dm_kcopyd_client *kc,
388                         int (*fn) (struct kcopyd_job *))
389 {
390         struct kcopyd_job *job;
391         int r, count = 0;
392
393         while ((job = pop(jobs, kc))) {
394
395                 r = fn(job);
396
397                 if (r < 0) {
398                         /* error this rogue job */
399                         if (job->rw == WRITE)
400                                 job->write_err = (unsigned long) -1L;
401                         else
402                                 job->read_err = 1;
403                         push(&kc->complete_jobs, job);
404                         break;
405                 }
406
407                 if (r > 0) {
408                         /*
409                          * We couldn't service this job ATM, so
410                          * push this job back onto the list.
411                          */
412                         push_head(jobs, job);
413                         break;
414                 }
415
416                 count++;
417         }
418
419         return count;
420 }
421
422 /*
423  * kcopyd does this every time it's woken up.
424  */
425 static void do_work(struct work_struct *work)
426 {
427         struct dm_kcopyd_client *kc = container_of(work,
428                                         struct dm_kcopyd_client, kcopyd_work);
429
430         /*
431          * The order that these are called is *very* important.
432          * complete jobs can free some pages for pages jobs.
433          * Pages jobs when successful will jump onto the io jobs
434          * list.  io jobs call wake when they complete and it all
435          * starts again.
436          */
437         process_jobs(&kc->complete_jobs, kc, run_complete_job);
438         process_jobs(&kc->pages_jobs, kc, run_pages_job);
439         process_jobs(&kc->io_jobs, kc, run_io_job);
440 }
441
442 /*
443  * If we are copying a small region we just dispatch a single job
444  * to do the copy, otherwise the io has to be split up into many
445  * jobs.
446  */
447 static void dispatch_job(struct kcopyd_job *job)
448 {
449         struct dm_kcopyd_client *kc = job->kc;
450         atomic_inc(&kc->nr_jobs);
451         push(&kc->pages_jobs, job);
452         wake(kc);
453 }
454
455 #define SUB_JOB_SIZE 128
456 static void segment_complete(int read_err, unsigned long write_err,
457                              void *context)
458 {
459         /* FIXME: tidy this function */
460         sector_t progress = 0;
461         sector_t count = 0;
462         struct kcopyd_job *job = (struct kcopyd_job *) context;
463
464         mutex_lock(&job->lock);
465
466         /* update the error */
467         if (read_err)
468                 job->read_err = 1;
469
470         if (write_err)
471                 job->write_err |= write_err;
472
473         /*
474          * Only dispatch more work if there hasn't been an error.
475          */
476         if ((!job->read_err && !job->write_err) ||
477             test_bit(DM_KCOPYD_IGNORE_ERROR, &job->flags)) {
478                 /* get the next chunk of work */
479                 progress = job->progress;
480                 count = job->source.count - progress;
481                 if (count) {
482                         if (count > SUB_JOB_SIZE)
483                                 count = SUB_JOB_SIZE;
484
485                         job->progress += count;
486                 }
487         }
488         mutex_unlock(&job->lock);
489
490         if (count) {
491                 int i;
492                 struct kcopyd_job *sub_job = mempool_alloc(job->kc->job_pool,
493                                                            GFP_NOIO);
494
495                 *sub_job = *job;
496                 sub_job->source.sector += progress;
497                 sub_job->source.count = count;
498
499                 for (i = 0; i < job->num_dests; i++) {
500                         sub_job->dests[i].sector += progress;
501                         sub_job->dests[i].count = count;
502                 }
503
504                 sub_job->fn = segment_complete;
505                 sub_job->context = job;
506                 dispatch_job(sub_job);
507
508         } else if (atomic_dec_and_test(&job->sub_jobs)) {
509
510                 /*
511                  * To avoid a race we must keep the job around
512                  * until after the notify function has completed.
513                  * Otherwise the client may try and stop the job
514                  * after we've completed.
515                  */
516                 job->fn(read_err, write_err, job->context);
517                 mempool_free(job, job->kc->job_pool);
518         }
519 }
520
521 /*
522  * Create some little jobs that will do the move between
523  * them.
524  */
525 #define SPLIT_COUNT 8
526 static void split_job(struct kcopyd_job *job)
527 {
528         int i;
529
530         atomic_set(&job->sub_jobs, SPLIT_COUNT);
531         for (i = 0; i < SPLIT_COUNT; i++)
532                 segment_complete(0, 0u, job);
533 }
534
535 int dm_kcopyd_copy(struct dm_kcopyd_client *kc, struct dm_io_region *from,
536                    unsigned int num_dests, struct dm_io_region *dests,
537                    unsigned int flags, dm_kcopyd_notify_fn fn, void *context)
538 {
539         struct kcopyd_job *job;
540
541         /*
542          * Allocate a new job.
543          */
544         job = mempool_alloc(kc->job_pool, GFP_NOIO);
545
546         /*
547          * set up for the read.
548          */
549         job->kc = kc;
550         job->flags = flags;
551         job->read_err = 0;
552         job->write_err = 0;
553         job->rw = READ;
554
555         job->source = *from;
556
557         job->num_dests = num_dests;
558         memcpy(&job->dests, dests, sizeof(*dests) * num_dests);
559
560         job->offset = 0;
561         job->nr_pages = 0;
562         job->pages = NULL;
563
564         job->fn = fn;
565         job->context = context;
566
567         if (job->source.count < SUB_JOB_SIZE)
568                 dispatch_job(job);
569
570         else {
571                 mutex_init(&job->lock);
572                 job->progress = 0;
573                 split_job(job);
574         }
575
576         return 0;
577 }
578 EXPORT_SYMBOL(dm_kcopyd_copy);
579
580 /*
581  * Cancels a kcopyd job, eg. someone might be deactivating a
582  * mirror.
583  */
584 #if 0
585 int kcopyd_cancel(struct kcopyd_job *job, int block)
586 {
587         /* FIXME: finish */
588         return -1;
589 }
590 #endif  /*  0  */
591
592 /*-----------------------------------------------------------------
593  * Client setup
594  *---------------------------------------------------------------*/
595 int dm_kcopyd_client_create(unsigned int nr_pages,
596                             struct dm_kcopyd_client **result)
597 {
598         int r = -ENOMEM;
599         struct dm_kcopyd_client *kc;
600
601         kc = kmalloc(sizeof(*kc), GFP_KERNEL);
602         if (!kc)
603                 return -ENOMEM;
604
605         spin_lock_init(&kc->lock);
606         spin_lock_init(&kc->job_lock);
607         INIT_LIST_HEAD(&kc->complete_jobs);
608         INIT_LIST_HEAD(&kc->io_jobs);
609         INIT_LIST_HEAD(&kc->pages_jobs);
610
611         kc->job_pool = mempool_create_slab_pool(MIN_JOBS, _job_cache);
612         if (!kc->job_pool)
613                 goto bad_slab;
614
615         INIT_WORK(&kc->kcopyd_work, do_work);
616         kc->kcopyd_wq = create_singlethread_workqueue("kcopyd");
617         if (!kc->kcopyd_wq)
618                 goto bad_workqueue;
619
620         kc->pages = NULL;
621         kc->nr_pages = kc->nr_free_pages = 0;
622         r = client_alloc_pages(kc, nr_pages);
623         if (r)
624                 goto bad_client_pages;
625
626         kc->io_client = dm_io_client_create(nr_pages);
627         if (IS_ERR(kc->io_client)) {
628                 r = PTR_ERR(kc->io_client);
629                 goto bad_io_client;
630         }
631
632         init_waitqueue_head(&kc->destroyq);
633         atomic_set(&kc->nr_jobs, 0);
634
635         *result = kc;
636         return 0;
637
638 bad_io_client:
639         client_free_pages(kc);
640 bad_client_pages:
641         destroy_workqueue(kc->kcopyd_wq);
642 bad_workqueue:
643         mempool_destroy(kc->job_pool);
644 bad_slab:
645         kfree(kc);
646
647         return r;
648 }
649 EXPORT_SYMBOL(dm_kcopyd_client_create);
650
651 void dm_kcopyd_client_destroy(struct dm_kcopyd_client *kc)
652 {
653         /* Wait for completion of all jobs submitted by this client. */
654         wait_event(kc->destroyq, !atomic_read(&kc->nr_jobs));
655
656         BUG_ON(!list_empty(&kc->complete_jobs));
657         BUG_ON(!list_empty(&kc->io_jobs));
658         BUG_ON(!list_empty(&kc->pages_jobs));
659         destroy_workqueue(kc->kcopyd_wq);
660         dm_io_client_destroy(kc->io_client);
661         client_free_pages(kc);
662         mempool_destroy(kc->job_pool);
663         kfree(kc);
664 }
665 EXPORT_SYMBOL(dm_kcopyd_client_destroy);