Fix crash after recursive aliasing
[capablanca.git] / lasker-2.2.3 / src / tdb / tdb.c
1  /* 
2    Unix SMB/CIFS implementation.
3    Samba database functions
4    Copyright (C) Andrew Tridgell              1999-2000
5    Copyright (C) Luke Kenneth Casson Leighton      2000
6    Copyright (C) Paul `Rusty' Russell              2000
7    Copyright (C) Jeremy Allison                    2000
8    
9    This program is free software; you can redistribute it and/or modify
10    it under the terms of the GNU General Public License as published by
11    the Free Software Foundation; either version 2 of the License, or
12    (at your option) any later version.
13    
14    This program is distributed in the hope that it will be useful,
15    but WITHOUT ANY WARRANTY; without even the implied warranty of
16    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17    GNU General Public License for more details.
18    
19    You should have received a copy of the GNU General Public License
20    along with this program; if not, write to the Free Software
21    Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
22 */
23 #ifdef STANDALONE
24 #if HAVE_CONFIG_H
25 #include <config.h>
26 #endif
27
28 #include <stdlib.h>
29 #include <stdio.h>
30 #include <fcntl.h>
31 #include <unistd.h>
32 #include <string.h>
33 #include <fcntl.h>
34 #include <errno.h>
35 #include <sys/mman.h>
36 #include <sys/stat.h>
37 #include "tdb.h"
38 #include "spinlock.h"
39 #else
40 #include "includes.h"
41 #endif
42
43 #define TDB_MAGIC_FOOD "TDB file\n"
44 #define TDB_VERSION (0x26011967 + 6)
45 #define TDB_MAGIC (0x26011999U)
46 #define TDB_FREE_MAGIC (~TDB_MAGIC)
47 #define TDB_DEAD_MAGIC (0xFEE1DEAD)
48 #define TDB_ALIGNMENT 4
49 #define MIN_REC_SIZE (2*sizeof(struct list_struct) + TDB_ALIGNMENT)
50 #define DEFAULT_HASH_SIZE 131
51 #define TDB_PAGE_SIZE 0x2000
52 #define FREELIST_TOP (sizeof(struct tdb_header))
53 #define TDB_ALIGN(x,a) (((x) + (a)-1) & ~((a)-1))
54 #define TDB_BYTEREV(x) (((((x)&0xff)<<24)|((x)&0xFF00)<<8)|(((x)>>8)&0xFF00)|((x)>>24))
55 #define TDB_DEAD(r) ((r)->magic == TDB_DEAD_MAGIC)
56 #define TDB_BAD_MAGIC(r) ((r)->magic != TDB_MAGIC && !TDB_DEAD(r))
57 #define TDB_HASH_TOP(hash) (FREELIST_TOP + (BUCKET(hash)+1)*sizeof(tdb_off))
58
59 /* NB assumes there is a local variable called "tdb" that is the
60  * current context, also takes doubly-parenthesized print-style
61  * argument. */
62 #define TDB_LOG(x) (tdb->log_fn?((tdb->log_fn x),0) : 0)
63
64 /* lock offsets */
65 #define GLOBAL_LOCK 0
66 #define ACTIVE_LOCK 4
67
68 #ifndef MAP_FILE
69 #define MAP_FILE 0
70 #endif
71
72 #ifndef MAP_FAILED
73 #define MAP_FAILED ((void *)-1)
74 #endif
75
76 /* free memory if the pointer is valid and zero the pointer */
77 #ifndef SAFE_FREE
78 #define SAFE_FREE(x) do { if ((x) != NULL) {free((x)); (x)=NULL;} } while(0)
79 #endif
80
81 #define BUCKET(hash) ((hash) % tdb->header.hash_size)
82 TDB_DATA tdb_null;
83
84 /* all contexts, to ensure no double-opens (fcntl locks don't nest!) */
85 static TDB_CONTEXT *tdbs = NULL;
86
87 static int tdb_munmap(TDB_CONTEXT *tdb)
88 {
89         if (tdb->flags & TDB_INTERNAL)
90                 return 0;
91
92 #ifdef HAVE_MMAP
93         if (tdb->map_ptr) {
94                 int ret = munmap(tdb->map_ptr, tdb->map_size);
95                 if (ret != 0)
96                         return ret;
97         }
98 #endif
99         tdb->map_ptr = NULL;
100         return 0;
101 }
102
103 static void tdb_mmap(TDB_CONTEXT *tdb)
104 {
105         if (tdb->flags & TDB_INTERNAL)
106                 return;
107
108 #ifdef HAVE_MMAP
109         if (!(tdb->flags & TDB_NOMMAP)) {
110                 tdb->map_ptr = mmap(NULL, tdb->map_size, 
111                                     PROT_READ|(tdb->read_only? 0:PROT_WRITE), 
112                                     MAP_SHARED|MAP_FILE, tdb->fd, 0);
113
114                 /*
115                  * NB. When mmap fails it returns MAP_FAILED *NOT* NULL !!!!
116                  */
117
118                 if (tdb->map_ptr == MAP_FAILED) {
119                         tdb->map_ptr = NULL;
120                         TDB_LOG((tdb, 2, "tdb_mmap failed for size %d (%s)\n", 
121                                  tdb->map_size, strerror(errno)));
122                 }
123         } else {
124                 tdb->map_ptr = NULL;
125         }
126 #else
127         tdb->map_ptr = NULL;
128 #endif
129 }
130
131 /* Endian conversion: we only ever deal with 4 byte quantities */
132 static void *convert(void *buf, u32 size)
133 {
134         u32 i, *p = buf;
135         for (i = 0; i < size / 4; i++)
136                 p[i] = TDB_BYTEREV(p[i]);
137         return buf;
138 }
139 #define DOCONV() (tdb->flags & TDB_CONVERT)
140 #define CONVERT(x) (DOCONV() ? convert(&x, sizeof(x)) : &x)
141
142 /* the body of the database is made of one list_struct for the free space
143    plus a separate data list for each hash value */
144 struct list_struct {
145         tdb_off next; /* offset of the next record in the list */
146         tdb_len rec_len; /* total byte length of record */
147         tdb_len key_len; /* byte length of key */
148         tdb_len data_len; /* byte length of data */
149         u32 full_hash; /* the full 32 bit hash of the key */
150         u32 magic;   /* try to catch errors */
151         /* the following union is implied:
152                 union {
153                         char record[rec_len];
154                         struct {
155                                 char key[key_len];
156                                 char data[data_len];
157                         }
158                         u32 totalsize; (tailer)
159                 }
160         */
161 };
162
163 /* a byte range locking function - return 0 on success
164    this functions locks/unlocks 1 byte at the specified offset.
165
166    On error, errno is also set so that errors are passed back properly
167    through tdb_open(). */
168 static int tdb_brlock(TDB_CONTEXT *tdb, tdb_off offset, 
169                       int rw_type, int lck_type, int probe)
170 {
171         struct flock fl;
172         int ret;
173
174         if (tdb->flags & TDB_NOLOCK)
175                 return 0;
176         if (tdb->read_only) {
177                 errno = EACCES;
178                 return -1;
179         }
180
181         fl.l_type = rw_type;
182         fl.l_whence = SEEK_SET;
183         fl.l_start = offset;
184         fl.l_len = 1;
185         fl.l_pid = 0;
186
187         do {
188                 ret = fcntl(tdb->fd,lck_type,&fl);
189         } while (ret == -1 && errno == EINTR);
190
191         if (ret == -1) {
192                 if (!probe) {
193                         TDB_LOG((tdb, 5,"tdb_brlock failed (fd=%d) at offset %d rw_type=%d lck_type=%d\n", 
194                                  tdb->fd, offset, rw_type, lck_type));
195                 }
196                 /* errno set by fcntl */
197                 return TDB_ERRCODE(TDB_ERR_LOCK, -1);
198         }
199         return 0;
200 }
201
202 /* lock a list in the database. list -1 is the alloc list */
203 static int tdb_lock(TDB_CONTEXT *tdb, int list, int ltype)
204 {
205         if (list < -1 || list >= (int)tdb->header.hash_size) {
206                 TDB_LOG((tdb, 0,"tdb_lock: invalid list %d for ltype=%d\n", 
207                            list, ltype));
208                 return -1;
209         }
210         if (tdb->flags & TDB_NOLOCK)
211                 return 0;
212
213         /* Since fcntl locks don't nest, we do a lock for the first one,
214            and simply bump the count for future ones */
215         if (tdb->locked[list+1].count == 0) {
216                 if (!tdb->read_only && tdb->header.rwlocks) {
217                         if (tdb_spinlock(tdb, list, ltype)) {
218                                 TDB_LOG((tdb, 0, "tdb_lock spinlock failed on list ltype=%d\n", 
219                                            list, ltype));
220                                 return -1;
221                         }
222                 } else if (tdb_brlock(tdb,FREELIST_TOP+4*list,ltype,F_SETLKW, 0)) {
223                         TDB_LOG((tdb, 0,"tdb_lock failed on list %d ltype=%d (%s)\n", 
224                                            list, ltype, strerror(errno)));
225                         return -1;
226                 }
227                 tdb->locked[list+1].ltype = ltype;
228         }
229         tdb->locked[list+1].count++;
230         return 0;
231 }
232
233 /* unlock the database: returns void because it's too late for errors. */
234         /* changed to return int it may be interesting to know there
235            has been an error  --simo */
236 static int tdb_unlock(TDB_CONTEXT *tdb, int list, int ltype)
237 {
238         int ret = -1;
239
240         if (tdb->flags & TDB_NOLOCK)
241                 return 0;
242
243         /* Sanity checks */
244         if (list < -1 || list >= (int)tdb->header.hash_size) {
245                 TDB_LOG((tdb, 0, "tdb_unlock: list %d invalid (%d)\n", list, tdb->header.hash_size));
246                 return ret;
247         }
248
249         if (tdb->locked[list+1].count==0) {
250                 TDB_LOG((tdb, 0, "tdb_unlock: count is 0\n"));
251                 return ret;
252         }
253
254         if (tdb->locked[list+1].count == 1) {
255                 /* Down to last nested lock: unlock underneath */
256                 if (!tdb->read_only && tdb->header.rwlocks) {
257                         ret = tdb_spinunlock(tdb, list, ltype);
258                 } else {
259                         ret = tdb_brlock(tdb, FREELIST_TOP+4*list, F_UNLCK, F_SETLKW, 0);
260                 }
261         } else {
262                 ret = 0;
263         }
264         tdb->locked[list+1].count--;
265
266         if (ret)
267                 TDB_LOG((tdb, 0,"tdb_unlock: An error occurred unlocking!\n")); 
268         return ret;
269 }
270
271 /* This is based on the hash algorithm from gdbm */
272 static u32 tdb_hash(TDB_DATA *key)
273 {
274         u32 value;      /* Used to compute the hash value.  */
275         u32   i;        /* Used to cycle through random values. */
276
277         /* Set the initial value from the key size. */
278         for (value = 0x238F13AF * key->dsize, i=0; i < key->dsize; i++)
279                 value = (value + (key->dptr[i] << (i*5 % 24)));
280
281         return (1103515243 * value + 12345);  
282 }
283
284 /* check for an out of bounds access - if it is out of bounds then
285    see if the database has been expanded by someone else and expand
286    if necessary 
287    note that "len" is the minimum length needed for the db
288 */
289 static int tdb_oob(TDB_CONTEXT *tdb, tdb_off len, int probe)
290 {
291         struct stat st;
292         if (len <= tdb->map_size)
293                 return 0;
294         if (tdb->flags & TDB_INTERNAL) {
295                 if (!probe) {
296                         TDB_LOG((tdb, 0,"tdb_oob len %d beyond internal malloc size %d\n",
297                                  (int)len, (int)tdb->map_size));
298                 }
299                 return TDB_ERRCODE(TDB_ERR_IO, -1);
300         }
301
302         if (fstat(tdb->fd, &st) == -1)
303                 return TDB_ERRCODE(TDB_ERR_IO, -1);
304
305         if (st.st_size < (size_t)len) {
306                 if (!probe) {
307                         TDB_LOG((tdb, 0,"tdb_oob len %d beyond eof at %d\n",
308                                  (int)len, (int)st.st_size));
309                 }
310                 return TDB_ERRCODE(TDB_ERR_IO, -1);
311         }
312
313         /* Unmap, update size, remap */
314         if (tdb_munmap(tdb) == -1)
315                 return TDB_ERRCODE(TDB_ERR_IO, -1);
316         tdb->map_size = st.st_size;
317         tdb_mmap(tdb);
318         return 0;
319 }
320
321 /* write a lump of data at a specified offset */
322 static int tdb_write(TDB_CONTEXT *tdb, tdb_off off, const void *buf, tdb_len len)
323 {
324         if (tdb_oob(tdb, off + len, 0) != 0)
325                 return -1;
326
327         if (tdb->map_ptr)
328                 memcpy(off + (char *)tdb->map_ptr, buf, len);
329 #ifdef HAVE_PWRITE
330         else if (pwrite(tdb->fd, buf, len, off) != (ssize_t)len) {
331 #else
332         else if (lseek(tdb->fd, off, SEEK_SET) != off
333                  || write(tdb->fd, buf, len) != (ssize_t)len) {
334 #endif
335                 TDB_LOG((tdb, 0,"tdb_write failed at %d len=%d (%s)\n",
336                            off, len, strerror(errno)));
337                 return TDB_ERRCODE(TDB_ERR_IO, -1);
338         }
339         return 0;
340 }
341
342 /* read a lump of data at a specified offset, maybe convert */
343 static int tdb_read(TDB_CONTEXT *tdb,tdb_off off,void *buf,tdb_len len,int cv)
344 {
345         if (tdb_oob(tdb, off + len, 0) != 0)
346                 return -1;
347
348         if (tdb->map_ptr)
349                 memcpy(buf, off + (char *)tdb->map_ptr, len);
350 #ifdef HAVE_PREAD
351         else if (pread(tdb->fd, buf, len, off) != (ssize_t)len) {
352 #else
353         else if (lseek(tdb->fd, off, SEEK_SET) != off
354                  || read(tdb->fd, buf, len) != (ssize_t)len) {
355 #endif
356                 TDB_LOG((tdb, 0,"tdb_read failed at %d len=%d (%s)\n",
357                            off, len, strerror(errno)));
358                 return TDB_ERRCODE(TDB_ERR_IO, -1);
359         }
360         if (cv)
361                 convert(buf, len);
362         return 0;
363 }
364
365 /* read a lump of data, allocating the space for it */
366 static char *tdb_alloc_read(TDB_CONTEXT *tdb, tdb_off offset, tdb_len len)
367 {
368         char *buf;
369
370         if (!(buf = malloc(len))) {
371                 TDB_LOG((tdb, 0,"tdb_alloc_read malloc failed len=%d (%s)\n",
372                            len, strerror(errno)));
373                 return TDB_ERRCODE(TDB_ERR_OOM, buf);
374         }
375         if (tdb_read(tdb, offset, buf, len, 0) == -1) {
376                 SAFE_FREE(buf);
377                 return NULL;
378         }
379         return buf;
380 }
381
382 /* read/write a tdb_off */
383 static int ofs_read(TDB_CONTEXT *tdb, tdb_off offset, tdb_off *d)
384 {
385         return tdb_read(tdb, offset, (char*)d, sizeof(*d), DOCONV());
386 }
387 static int ofs_write(TDB_CONTEXT *tdb, tdb_off offset, tdb_off *d)
388 {
389         tdb_off off = *d;
390         return tdb_write(tdb, offset, CONVERT(off), sizeof(*d));
391 }
392
393 /* read/write a record */
394 static int rec_read(TDB_CONTEXT *tdb, tdb_off offset, struct list_struct *rec)
395 {
396         if (tdb_read(tdb, offset, rec, sizeof(*rec),DOCONV()) == -1)
397                 return -1;
398         if (TDB_BAD_MAGIC(rec)) {
399                 TDB_LOG((tdb, 0,"rec_read bad magic 0x%x at offset=%d\n", rec->magic, offset));
400                 return TDB_ERRCODE(TDB_ERR_CORRUPT, -1);
401         }
402         return tdb_oob(tdb, rec->next+sizeof(*rec), 0);
403 }
404 static int rec_write(TDB_CONTEXT *tdb, tdb_off offset, struct list_struct *rec)
405 {
406         struct list_struct r = *rec;
407         return tdb_write(tdb, offset, CONVERT(r), sizeof(r));
408 }
409
410 /* read a freelist record and check for simple errors */
411 static int rec_free_read(TDB_CONTEXT *tdb, tdb_off off, struct list_struct *rec)
412 {
413         if (tdb_read(tdb, off, rec, sizeof(*rec),DOCONV()) == -1)
414                 return -1;
415
416         if (rec->magic == TDB_MAGIC) {
417                 /* this happens when a app is showdown while deleting a record - we should
418                    not completely fail when this happens */
419                 TDB_LOG((tdb, 0,"rec_free_read non-free magic at offset=%d - fixing\n", 
420                          rec->magic, off));
421                 rec->magic = TDB_FREE_MAGIC;
422                 tdb_write(tdb, off, rec, sizeof(*rec));
423         }
424
425         if (rec->magic != TDB_FREE_MAGIC) {
426                 TDB_LOG((tdb, 0,"rec_free_read bad magic 0x%x at offset=%d\n", 
427                            rec->magic, off));
428                 return TDB_ERRCODE(TDB_ERR_CORRUPT, -1);
429         }
430         if (tdb_oob(tdb, rec->next+sizeof(*rec), 0) != 0)
431                 return -1;
432         return 0;
433 }
434
435 /* update a record tailer (must hold allocation lock) */
436 static int update_tailer(TDB_CONTEXT *tdb, tdb_off offset,
437                          const struct list_struct *rec)
438 {
439         tdb_off totalsize;
440
441         /* Offset of tailer from record header */
442         totalsize = sizeof(*rec) + rec->rec_len;
443         return ofs_write(tdb, offset + totalsize - sizeof(tdb_off),
444                          &totalsize);
445 }
446
447 static tdb_off tdb_dump_record(TDB_CONTEXT *tdb, tdb_off offset)
448 {
449         struct list_struct rec;
450         tdb_off tailer_ofs, tailer;
451
452         if (tdb_read(tdb, offset, (char *)&rec, sizeof(rec), DOCONV()) == -1) {
453                 printf("ERROR: failed to read record at %u\n", offset);
454                 return 0;
455         }
456
457         printf(" rec: offset=%u next=%d rec_len=%d key_len=%d data_len=%d full_hash=0x%x magic=0x%x\n",
458                offset, rec.next, rec.rec_len, rec.key_len, rec.data_len, rec.full_hash, rec.magic);
459
460         tailer_ofs = offset + sizeof(rec) + rec.rec_len - sizeof(tdb_off);
461         if (ofs_read(tdb, tailer_ofs, &tailer) == -1) {
462                 printf("ERROR: failed to read tailer at %u\n", tailer_ofs);
463                 return rec.next;
464         }
465
466         if (tailer != rec.rec_len + sizeof(rec)) {
467                 printf("ERROR: tailer does not match record! tailer=%u totalsize=%u\n",
468                                 (unsigned)tailer, (unsigned)(rec.rec_len + sizeof(rec)));
469         }
470         return rec.next;
471 }
472
473 static int tdb_dump_chain(TDB_CONTEXT *tdb, int i)
474 {
475         tdb_off rec_ptr, top;
476
477         top = TDB_HASH_TOP(i);
478
479         if (tdb_lock(tdb, i, F_WRLCK) != 0)
480                 return -1;
481
482         if (ofs_read(tdb, top, &rec_ptr) == -1)
483                 return tdb_unlock(tdb, i, F_WRLCK);
484
485         if (rec_ptr)
486                 printf("hash=%d\n", i);
487
488         while (rec_ptr) {
489                 rec_ptr = tdb_dump_record(tdb, rec_ptr);
490         }
491
492         return tdb_unlock(tdb, i, F_WRLCK);
493 }
494
495 void tdb_dump_all(TDB_CONTEXT *tdb)
496 {
497         int i;
498         for (i=0;i<tdb->header.hash_size;i++) {
499                 tdb_dump_chain(tdb, i);
500         }
501         printf("freelist:\n");
502         tdb_dump_chain(tdb, -1);
503 }
504
505 int tdb_printfreelist(TDB_CONTEXT *tdb)
506 {
507         int ret;
508         long total_free = 0;
509         tdb_off offset, rec_ptr;
510         struct list_struct rec;
511
512         if ((ret = tdb_lock(tdb, -1, F_WRLCK)) != 0)
513                 return ret;
514
515         offset = FREELIST_TOP;
516
517         /* read in the freelist top */
518         if (ofs_read(tdb, offset, &rec_ptr) == -1) {
519                 return 0;
520         }
521
522         printf("freelist top=[0x%08x]\n", rec_ptr );
523         while (rec_ptr) {
524                 if (tdb_read(tdb, rec_ptr, (char *)&rec, sizeof(rec), DOCONV()) == -1) {
525                         return -1;
526                 }
527
528                 if (rec.magic != TDB_FREE_MAGIC) {
529                         printf("bad magic 0x%08x in free list\n", rec.magic);
530                         return -1;
531                 }
532
533                 printf("entry offset=[0x%08x], rec.rec_len = [0x%08x (%d)]\n", rec.next, rec.rec_len, rec.rec_len );
534                 total_free += rec.rec_len;
535
536                 /* move to the next record */
537                 rec_ptr = rec.next;
538         }
539         printf("total rec_len = [0x%08x (%d)]\n", (int)total_free, 
540                (int)total_free);
541
542         return tdb_unlock(tdb, -1, F_WRLCK);
543 }
544
545 /* Remove an element from the freelist.  Must have alloc lock. */
546 static int remove_from_freelist(TDB_CONTEXT *tdb, tdb_off off, tdb_off next)
547 {
548         tdb_off last_ptr, i;
549
550         /* read in the freelist top */
551         last_ptr = FREELIST_TOP;
552         while (ofs_read(tdb, last_ptr, &i) != -1 && i != 0) {
553                 if (i == off) {
554                         /* We've found it! */
555                         return ofs_write(tdb, last_ptr, &next);
556                 }
557                 /* Follow chain (next offset is at start of record) */
558                 last_ptr = i;
559         }
560         TDB_LOG((tdb, 0,"remove_from_freelist: not on list at off=%d\n", off));
561         return TDB_ERRCODE(TDB_ERR_CORRUPT, -1);
562 }
563
564 /* Add an element into the freelist. Merge adjacent records if
565    neccessary. */
566 static int tdb_free(TDB_CONTEXT *tdb, tdb_off offset, struct list_struct *rec)
567 {
568         tdb_off right, left;
569
570         /* Allocation and tailer lock */
571         if (tdb_lock(tdb, -1, F_WRLCK) != 0)
572                 return -1;
573
574         /* set an initial tailer, so if we fail we don't leave a bogus record */
575         if (update_tailer(tdb, offset, rec) != 0) {
576                 TDB_LOG((tdb, 0, "tdb_free: upfate_tailer failed!\n"));
577                 goto fail;
578         }
579
580         /* Look right first (I'm an Australian, dammit) */
581         right = offset + sizeof(*rec) + rec->rec_len;
582         if (right + sizeof(*rec) <= tdb->map_size) {
583                 struct list_struct r;
584
585                 if (tdb_read(tdb, right, &r, sizeof(r), DOCONV()) == -1) {
586                         TDB_LOG((tdb, 0, "tdb_free: right read failed at %u\n", right));
587                         goto left;
588                 }
589
590                 /* If it's free, expand to include it. */
591                 if (r.magic == TDB_FREE_MAGIC) {
592                         if (remove_from_freelist(tdb, right, r.next) == -1) {
593                                 TDB_LOG((tdb, 0, "tdb_free: right free failed at %u\n", right));
594                                 goto left;
595                         }
596                         rec->rec_len += sizeof(r) + r.rec_len;
597                 }
598         }
599
600 left:
601         /* Look left */
602         left = offset - sizeof(tdb_off);
603         if (left > TDB_HASH_TOP(tdb->header.hash_size-1)) {
604                 struct list_struct l;
605                 tdb_off leftsize;
606
607                 /* Read in tailer and jump back to header */
608                 if (ofs_read(tdb, left, &leftsize) == -1) {
609                         TDB_LOG((tdb, 0, "tdb_free: left offset read failed at %u\n", left));
610                         goto update;
611                 }
612                 left = offset - leftsize;
613
614                 /* Now read in record */
615                 if (tdb_read(tdb, left, &l, sizeof(l), DOCONV()) == -1) {
616                         TDB_LOG((tdb, 0, "tdb_free: left read failed at %u (%u)\n", left, leftsize));
617                         goto update;
618                 }
619
620                 /* If it's free, expand to include it. */
621                 if (l.magic == TDB_FREE_MAGIC) {
622                         if (remove_from_freelist(tdb, left, l.next) == -1) {
623                                 TDB_LOG((tdb, 0, "tdb_free: left free failed at %u\n", left));
624                                 goto update;
625                         } else {
626                                 offset = left;
627                                 rec->rec_len += leftsize;
628                         }
629                 }
630         }
631
632 update:
633         if (update_tailer(tdb, offset, rec) == -1) {
634                 TDB_LOG((tdb, 0, "tdb_free: update_tailer failed at %u\n", offset));
635                 goto fail;
636         }
637
638         /* Now, prepend to free list */
639         rec->magic = TDB_FREE_MAGIC;
640
641         if (ofs_read(tdb, FREELIST_TOP, &rec->next) == -1 ||
642             rec_write(tdb, offset, rec) == -1 ||
643             ofs_write(tdb, FREELIST_TOP, &offset) == -1) {
644                 TDB_LOG((tdb, 0, "tdb_free record write failed at offset=%d\n", offset));
645                 goto fail;
646         }
647
648         /* And we're done. */
649         tdb_unlock(tdb, -1, F_WRLCK);
650         return 0;
651
652  fail:
653         tdb_unlock(tdb, -1, F_WRLCK);
654         return -1;
655 }
656
657
658 /* expand a file.  we prefer to use ftruncate, as that is what posix
659   says to use for mmap expansion */
660 static int expand_file(TDB_CONTEXT *tdb, tdb_off size, tdb_off addition)
661 {
662         char buf[1024];
663 #if HAVE_FTRUNCATE_EXTEND
664         if (ftruncate(tdb->fd, size+addition) != 0) {
665                 TDB_LOG((tdb, 0, "expand_file ftruncate to %d failed (%s)\n", 
666                            size+addition, strerror(errno)));
667                 return -1;
668         }
669 #else
670         char b = 0;
671
672 #ifdef HAVE_PWRITE
673         if (pwrite(tdb->fd,  &b, 1, (size+addition) - 1) != 1) {
674 #else
675         if (lseek(tdb->fd, (size+addition) - 1, SEEK_SET) != (size+addition) - 1 || 
676             write(tdb->fd, &b, 1) != 1) {
677 #endif
678                 TDB_LOG((tdb, 0, "expand_file to %d failed (%s)\n", 
679                            size+addition, strerror(errno)));
680                 return -1;
681         }
682 #endif
683
684         /* now fill the file with something. This ensures that the file isn't sparse, which would be
685            very bad if we ran out of disk. This must be done with write, not via mmap */
686         memset(buf, 0x42, sizeof(buf));
687         while (addition) {
688                 int n = addition>sizeof(buf)?sizeof(buf):addition;
689 #ifdef HAVE_PWRITE
690                 int ret = pwrite(tdb->fd, buf, n, size);
691 #else
692                 int ret;
693                 if (lseek(tdb->fd, size, SEEK_SET) != size)
694                         return -1;
695                 ret = write(tdb->fd, buf, n);
696 #endif
697                 if (ret != n) {
698                         TDB_LOG((tdb, 0, "expand_file write of %d failed (%s)\n", 
699                                    n, strerror(errno)));
700                         return -1;
701                 }
702                 addition -= n;
703                 size += n;
704         }
705         return 0;
706 }
707
708
709 /* expand the database at least size bytes by expanding the underlying
710    file and doing the mmap again if necessary */
711 static int tdb_expand(TDB_CONTEXT *tdb, tdb_off size)
712 {
713         struct list_struct rec;
714         tdb_off offset;
715
716         if (tdb_lock(tdb, -1, F_WRLCK) == -1) {
717                 TDB_LOG((tdb, 0, "lock failed in tdb_expand\n"));
718                 return -1;
719         }
720
721         /* must know about any previous expansions by another process */
722         tdb_oob(tdb, tdb->map_size + 1, 1);
723
724         /* always make room for at least 10 more records, and round
725            the database up to a multiple of TDB_PAGE_SIZE */
726         size = TDB_ALIGN(tdb->map_size + size*10, TDB_PAGE_SIZE) - tdb->map_size;
727
728         if (!(tdb->flags & TDB_INTERNAL))
729                 tdb_munmap(tdb);
730
731         /*
732          * We must ensure the file is unmapped before doing this
733          * to ensure consistency with systems like OpenBSD where
734          * writes and mmaps are not consistent.
735          */
736
737         /* expand the file itself */
738         if (!(tdb->flags & TDB_INTERNAL)) {
739                 if (expand_file(tdb, tdb->map_size, size) != 0)
740                         goto fail;
741         }
742
743         tdb->map_size += size;
744
745         if (tdb->flags & TDB_INTERNAL)
746                 tdb->map_ptr = realloc(tdb->map_ptr, tdb->map_size);
747         else {
748                 /*
749                  * We must ensure the file is remapped before adding the space
750                  * to ensure consistency with systems like OpenBSD where
751                  * writes and mmaps are not consistent.
752                  */
753
754                 /* We're ok if the mmap fails as we'll fallback to read/write */
755                 tdb_mmap(tdb);
756         }
757
758         /* form a new freelist record */
759         memset(&rec,'\0',sizeof(rec));
760         rec.rec_len = size - sizeof(rec);
761
762         /* link it into the free list */
763         offset = tdb->map_size - size;
764         if (tdb_free(tdb, offset, &rec) == -1)
765                 goto fail;
766
767         tdb_unlock(tdb, -1, F_WRLCK);
768         return 0;
769  fail:
770         tdb_unlock(tdb, -1, F_WRLCK);
771         return -1;
772 }
773
774 /* allocate some space from the free list. The offset returned points
775    to a unconnected list_struct within the database with room for at
776    least length bytes of total data
777
778    0 is returned if the space could not be allocated
779  */
780 static tdb_off tdb_allocate(TDB_CONTEXT *tdb, tdb_len length,
781                             struct list_struct *rec)
782 {
783         tdb_off rec_ptr, last_ptr, newrec_ptr;
784         struct list_struct newrec;
785
786         if (tdb_lock(tdb, -1, F_WRLCK) == -1)
787                 return 0;
788
789         /* Extra bytes required for tailer */
790         length += sizeof(tdb_off);
791
792  again:
793         last_ptr = FREELIST_TOP;
794
795         /* read in the freelist top */
796         if (ofs_read(tdb, FREELIST_TOP, &rec_ptr) == -1)
797                 goto fail;
798
799         /* keep looking until we find a freelist record big enough */
800         while (rec_ptr) {
801                 if (rec_free_read(tdb, rec_ptr, rec) == -1)
802                         goto fail;
803
804                 if (rec->rec_len >= length) {
805                         /* found it - now possibly split it up  */
806                         if (rec->rec_len > length + MIN_REC_SIZE) {
807                                 /* Length of left piece */
808                                 length = TDB_ALIGN(length, TDB_ALIGNMENT);
809
810                                 /* Right piece to go on free list */
811                                 newrec.rec_len = rec->rec_len
812                                         - (sizeof(*rec) + length);
813                                 newrec_ptr = rec_ptr + sizeof(*rec) + length;
814
815                                 /* And left record is shortened */
816                                 rec->rec_len = length;
817                         } else
818                                 newrec_ptr = 0;
819
820                         /* Remove allocated record from the free list */
821                         if (ofs_write(tdb, last_ptr, &rec->next) == -1)
822                                 goto fail;
823
824                         /* Update header: do this before we drop alloc
825                            lock, otherwise tdb_free() might try to
826                            merge with us, thinking we're free.
827                            (Thanks Jeremy Allison). */
828                         rec->magic = TDB_MAGIC;
829                         if (rec_write(tdb, rec_ptr, rec) == -1)
830                                 goto fail;
831
832                         /* Did we create new block? */
833                         if (newrec_ptr) {
834                                 /* Update allocated record tailer (we
835                                    shortened it). */
836                                 if (update_tailer(tdb, rec_ptr, rec) == -1)
837                                         goto fail;
838
839                                 /* Free new record */
840                                 if (tdb_free(tdb, newrec_ptr, &newrec) == -1)
841                                         goto fail;
842                         }
843
844                         /* all done - return the new record offset */
845                         tdb_unlock(tdb, -1, F_WRLCK);
846                         return rec_ptr;
847                 }
848                 /* move to the next record */
849                 last_ptr = rec_ptr;
850                 rec_ptr = rec->next;
851         }
852         /* we didn't find enough space. See if we can expand the
853            database and if we can then try again */
854         if (tdb_expand(tdb, length + sizeof(*rec)) == 0)
855                 goto again;
856  fail:
857         tdb_unlock(tdb, -1, F_WRLCK);
858         return 0;
859 }
860
861 /* initialise a new database with a specified hash size */
862 static int tdb_new_database(TDB_CONTEXT *tdb, int hash_size)
863 {
864         struct tdb_header *newdb;
865         int size, ret = -1;
866
867         /* We make it up in memory, then write it out if not internal */
868         size = sizeof(struct tdb_header) + (hash_size+1)*sizeof(tdb_off);
869         if (!(newdb = calloc(size, 1)))
870                 return TDB_ERRCODE(TDB_ERR_OOM, -1);
871
872         /* Fill in the header */
873         newdb->version = TDB_VERSION;
874         newdb->hash_size = hash_size;
875 #ifdef USE_SPINLOCKS
876         newdb->rwlocks = size;
877 #endif
878         if (tdb->flags & TDB_INTERNAL) {
879                 tdb->map_size = size;
880                 tdb->map_ptr = (char *)newdb;
881                 memcpy(&tdb->header, newdb, sizeof(tdb->header));
882                 /* Convert the `ondisk' version if asked. */
883                 CONVERT(*newdb);
884                 return 0;
885         }
886         if (lseek(tdb->fd, 0, SEEK_SET) == -1)
887                 goto fail;
888
889         if (ftruncate(tdb->fd, 0) == -1)
890                 goto fail;
891
892         /* This creates an endian-converted header, as if read from disk */
893         CONVERT(*newdb);
894         memcpy(&tdb->header, newdb, sizeof(tdb->header));
895         /* Don't endian-convert the magic food! */
896         memcpy(newdb->magic_food, TDB_MAGIC_FOOD, strlen(TDB_MAGIC_FOOD)+1);
897         if (write(tdb->fd, newdb, size) != size)
898                 ret = -1;
899         else
900                 ret = tdb_create_rwlocks(tdb->fd, hash_size);
901
902   fail:
903         SAFE_FREE(newdb);
904         return ret;
905 }
906
907 /* Returns 0 on fail.  On success, return offset of record, and fills
908    in rec */
909 static tdb_off tdb_find(TDB_CONTEXT *tdb, TDB_DATA key, u32 hash,
910                         struct list_struct *r)
911 {
912         tdb_off rec_ptr;
913         
914         /* read in the hash top */
915         if (ofs_read(tdb, TDB_HASH_TOP(hash), &rec_ptr) == -1)
916                 return 0;
917
918         /* keep looking until we find the right record */
919         while (rec_ptr) {
920                 if (rec_read(tdb, rec_ptr, r) == -1)
921                         return 0;
922
923                 if (!TDB_DEAD(r) && hash==r->full_hash && key.dsize==r->key_len) {
924                         char *k;
925                         /* a very likely hit - read the key */
926                         k = tdb_alloc_read(tdb, rec_ptr + sizeof(*r), 
927                                            r->key_len);
928                         if (!k)
929                                 return 0;
930
931                         if (memcmp(key.dptr, k, key.dsize) == 0) {
932                                 SAFE_FREE(k);
933                                 return rec_ptr;
934                         }
935                         SAFE_FREE(k);
936                 }
937                 rec_ptr = r->next;
938         }
939         return TDB_ERRCODE(TDB_ERR_NOEXIST, 0);
940 }
941
942 /* If they do lockkeys, check that this hash is one they locked */
943 static int tdb_keylocked(TDB_CONTEXT *tdb, u32 hash)
944 {
945         u32 i;
946         if (!tdb->lockedkeys)
947                 return 1;
948         for (i = 0; i < tdb->lockedkeys[0]; i++)
949                 if (tdb->lockedkeys[i+1] == hash)
950                         return 1;
951         return TDB_ERRCODE(TDB_ERR_NOLOCK, 0);
952 }
953
954 /* As tdb_find, but if you succeed, keep the lock */
955 static tdb_off tdb_find_lock(TDB_CONTEXT *tdb, TDB_DATA key, int locktype,
956                              struct list_struct *rec)
957 {
958         u32 hash, rec_ptr;
959
960         hash = tdb_hash(&key);
961         if (!tdb_keylocked(tdb, hash))
962                 return 0;
963         if (tdb_lock(tdb, BUCKET(hash), locktype) == -1)
964                 return 0;
965         if (!(rec_ptr = tdb_find(tdb, key, hash, rec)))
966                 tdb_unlock(tdb, BUCKET(hash), locktype);
967         return rec_ptr;
968 }
969
970 enum TDB_ERROR tdb_error(TDB_CONTEXT *tdb)
971 {
972         return tdb->ecode;
973 }
974
975 static struct tdb_errname {
976         enum TDB_ERROR ecode; const char *estring;
977 } emap[] = { {TDB_SUCCESS, "Success"},
978              {TDB_ERR_CORRUPT, "Corrupt database"},
979              {TDB_ERR_IO, "IO Error"},
980              {TDB_ERR_LOCK, "Locking error"},
981              {TDB_ERR_OOM, "Out of memory"},
982              {TDB_ERR_EXISTS, "Record exists"},
983              {TDB_ERR_NOLOCK, "Lock exists on other keys"},
984              {TDB_ERR_NOEXIST, "Record does not exist"} };
985
986 /* Error string for the last tdb error */
987 const char *tdb_errorstr(TDB_CONTEXT *tdb)
988 {
989         u32 i;
990         for (i = 0; i < sizeof(emap) / sizeof(struct tdb_errname); i++)
991                 if (tdb->ecode == emap[i].ecode)
992                         return emap[i].estring;
993         return "Invalid error code";
994 }
995
996 /* update an entry in place - this only works if the new data size
997    is <= the old data size and the key exists.
998    on failure return -1
999 */
1000 static int tdb_update(TDB_CONTEXT *tdb, TDB_DATA key, TDB_DATA dbuf)
1001 {
1002         struct list_struct rec;
1003         tdb_off rec_ptr;
1004         int ret = -1;
1005
1006         /* find entry */
1007         if (!(rec_ptr = tdb_find_lock(tdb, key, F_WRLCK, &rec)))
1008                 return -1;
1009
1010         /* must be long enough key, data and tailer */
1011         if (rec.rec_len < key.dsize + dbuf.dsize + sizeof(tdb_off)) {
1012                 tdb->ecode = TDB_SUCCESS; /* Not really an error */
1013                 goto out;
1014         }
1015
1016         if (tdb_write(tdb, rec_ptr + sizeof(rec) + rec.key_len,
1017                       dbuf.dptr, dbuf.dsize) == -1)
1018                 goto out;
1019
1020         if (dbuf.dsize != rec.data_len) {
1021                 /* update size */
1022                 rec.data_len = dbuf.dsize;
1023                 ret = rec_write(tdb, rec_ptr, &rec);
1024         } else
1025                 ret = 0;
1026  out:
1027         tdb_unlock(tdb, BUCKET(rec.full_hash), F_WRLCK);
1028         return ret;
1029 }
1030
1031 /* find an entry in the database given a key */
1032 TDB_DATA tdb_fetch(TDB_CONTEXT *tdb, TDB_DATA key)
1033 {
1034         tdb_off rec_ptr;
1035         struct list_struct rec;
1036         TDB_DATA ret;
1037
1038         /* find which hash bucket it is in */
1039         if (!(rec_ptr = tdb_find_lock(tdb,key,F_RDLCK,&rec)))
1040                 return tdb_null;
1041
1042         ret.dptr = tdb_alloc_read(tdb, rec_ptr + sizeof(rec) + rec.key_len,
1043                                   rec.data_len);
1044         ret.dsize = rec.data_len;
1045         tdb_unlock(tdb, BUCKET(rec.full_hash), F_RDLCK);
1046         return ret;
1047 }
1048
1049 /* check if an entry in the database exists 
1050
1051    note that 1 is returned if the key is found and 0 is returned if not found
1052    this doesn't match the conventions in the rest of this module, but is
1053    compatible with gdbm
1054 */
1055 int tdb_exists(TDB_CONTEXT *tdb, TDB_DATA key)
1056 {
1057         struct list_struct rec;
1058         
1059         if (tdb_find_lock(tdb, key, F_RDLCK, &rec) == 0)
1060                 return 0;
1061         tdb_unlock(tdb, BUCKET(rec.full_hash), F_RDLCK);
1062         return 1;
1063 }
1064
1065 /* record lock stops delete underneath */
1066 static int lock_record(TDB_CONTEXT *tdb, tdb_off off)
1067 {
1068         return off ? tdb_brlock(tdb, off, F_RDLCK, F_SETLKW, 0) : 0;
1069 }
1070 /*
1071   Write locks override our own fcntl readlocks, so check it here.
1072   Note this is meant to be F_SETLK, *not* F_SETLKW, as it's not
1073   an error to fail to get the lock here.
1074 */
1075  
1076 static int write_lock_record(TDB_CONTEXT *tdb, tdb_off off)
1077 {
1078         struct tdb_traverse_lock *i;
1079         for (i = &tdb->travlocks; i; i = i->next)
1080                 if (i->off == off)
1081                         return -1;
1082         return tdb_brlock(tdb, off, F_WRLCK, F_SETLK, 1);
1083 }
1084
1085 /*
1086   Note this is meant to be F_SETLK, *not* F_SETLKW, as it's not
1087   an error to fail to get the lock here.
1088 */
1089
1090 static int write_unlock_record(TDB_CONTEXT *tdb, tdb_off off)
1091 {
1092         return tdb_brlock(tdb, off, F_UNLCK, F_SETLK, 0);
1093 }
1094 /* fcntl locks don't stack: avoid unlocking someone else's */
1095 static int unlock_record(TDB_CONTEXT *tdb, tdb_off off)
1096 {
1097         struct tdb_traverse_lock *i;
1098         u32 count = 0;
1099
1100         if (off == 0)
1101                 return 0;
1102         for (i = &tdb->travlocks; i; i = i->next)
1103                 if (i->off == off)
1104                         count++;
1105         return (count == 1 ? tdb_brlock(tdb, off, F_UNLCK, F_SETLKW, 0) : 0);
1106 }
1107
1108 /* actually delete an entry in the database given the offset */
1109 static int do_delete(TDB_CONTEXT *tdb, tdb_off rec_ptr, struct list_struct*rec)
1110 {
1111         tdb_off last_ptr, i;
1112         struct list_struct lastrec;
1113
1114         if (tdb->read_only) return -1;
1115
1116         if (write_lock_record(tdb, rec_ptr) == -1) {
1117                 /* Someone traversing here: mark it as dead */
1118                 rec->magic = TDB_DEAD_MAGIC;
1119                 return rec_write(tdb, rec_ptr, rec);
1120         }
1121         if (write_unlock_record(tdb, rec_ptr) != 0)
1122                 return -1;
1123
1124         /* find previous record in hash chain */
1125         if (ofs_read(tdb, TDB_HASH_TOP(rec->full_hash), &i) == -1)
1126                 return -1;
1127         for (last_ptr = 0; i != rec_ptr; last_ptr = i, i = lastrec.next)
1128                 if (rec_read(tdb, i, &lastrec) == -1)
1129                         return -1;
1130
1131         /* unlink it: next ptr is at start of record. */
1132         if (last_ptr == 0)
1133                 last_ptr = TDB_HASH_TOP(rec->full_hash);
1134         if (ofs_write(tdb, last_ptr, &rec->next) == -1)
1135                 return -1;
1136
1137         /* recover the space */
1138         if (tdb_free(tdb, rec_ptr, rec) == -1)
1139                 return -1;
1140         return 0;
1141 }
1142
1143 /* Uses traverse lock: 0 = finish, -1 = error, other = record offset */
1144 static int tdb_next_lock(TDB_CONTEXT *tdb, struct tdb_traverse_lock *tlock,
1145                          struct list_struct *rec)
1146 {
1147         int want_next = (tlock->off != 0);
1148
1149         /* No traversal allows if you've called tdb_lockkeys() */
1150         if (tdb->lockedkeys)
1151                 return TDB_ERRCODE(TDB_ERR_NOLOCK, -1);
1152
1153         /* Lock each chain from the start one. */
1154         for (; tlock->hash < tdb->header.hash_size; tlock->hash++) {
1155                 if (tdb_lock(tdb, tlock->hash, F_WRLCK) == -1)
1156                         return -1;
1157
1158                 /* No previous record?  Start at top of chain. */
1159                 if (!tlock->off) {
1160                         if (ofs_read(tdb, TDB_HASH_TOP(tlock->hash),
1161                                      &tlock->off) == -1)
1162                                 goto fail;
1163                 } else {
1164                         /* Otherwise unlock the previous record. */
1165                         if (unlock_record(tdb, tlock->off) != 0)
1166                                 goto fail;
1167                 }
1168
1169                 if (want_next) {
1170                         /* We have offset of old record: grab next */
1171                         if (rec_read(tdb, tlock->off, rec) == -1)
1172                                 goto fail;
1173                         tlock->off = rec->next;
1174                 }
1175
1176                 /* Iterate through chain */
1177                 while( tlock->off) {
1178                         tdb_off current;
1179                         if (rec_read(tdb, tlock->off, rec) == -1)
1180                                 goto fail;
1181                         if (!TDB_DEAD(rec)) {
1182                                 /* Woohoo: we found one! */
1183                                 if (lock_record(tdb, tlock->off) != 0)
1184                                         goto fail;
1185                                 return tlock->off;
1186                         }
1187                         /* Try to clean dead ones from old traverses */
1188                         current = tlock->off;
1189                         tlock->off = rec->next;
1190                         if (do_delete(tdb, current, rec) != 0)
1191                                 goto fail;
1192                 }
1193                 tdb_unlock(tdb, tlock->hash, F_WRLCK);
1194                 want_next = 0;
1195         }
1196         /* We finished iteration without finding anything */
1197         return TDB_ERRCODE(TDB_SUCCESS, 0);
1198
1199  fail:
1200         tlock->off = 0;
1201         if (tdb_unlock(tdb, tlock->hash, F_WRLCK) != 0)
1202                 TDB_LOG((tdb, 0, "tdb_next_lock: On error unlock failed!\n"));
1203         return -1;
1204 }
1205
1206 /* traverse the entire database - calling fn(tdb, key, data) on each element.
1207    return -1 on error or the record count traversed
1208    if fn is NULL then it is not called
1209    a non-zero return value from fn() indicates that the traversal should stop
1210   */
1211 int tdb_traverse(TDB_CONTEXT *tdb, tdb_traverse_func fn, void *state)
1212 {
1213         TDB_DATA key, dbuf;
1214         struct list_struct rec;
1215         struct tdb_traverse_lock tl = { NULL, 0, 0 };
1216         int ret, count = 0;
1217
1218         /* This was in the initializaton, above, but the IRIX compiler
1219          * did not like it.  crh
1220          */
1221         tl.next = tdb->travlocks.next;
1222
1223         /* fcntl locks don't stack: beware traverse inside traverse */
1224         tdb->travlocks.next = &tl;
1225
1226         /* tdb_next_lock places locks on the record returned, and its chain */
1227         while ((ret = tdb_next_lock(tdb, &tl, &rec)) > 0) {
1228                 count++;
1229                 /* now read the full record */
1230                 key.dptr = tdb_alloc_read(tdb, tl.off + sizeof(rec), 
1231                                           rec.key_len + rec.data_len);
1232                 if (!key.dptr) {
1233                         ret = -1;
1234                         if (tdb_unlock(tdb, tl.hash, F_WRLCK) != 0)
1235                                 goto out;
1236                         if (unlock_record(tdb, tl.off) != 0)
1237                                 TDB_LOG((tdb, 0, "tdb_traverse: key.dptr == NULL and unlock_record failed!\n"));
1238                         goto out;
1239                 }
1240                 key.dsize = rec.key_len;
1241                 dbuf.dptr = key.dptr + rec.key_len;
1242                 dbuf.dsize = rec.data_len;
1243
1244                 /* Drop chain lock, call out */
1245                 if (tdb_unlock(tdb, tl.hash, F_WRLCK) != 0) {
1246                         ret = -1;
1247                         goto out;
1248                 }
1249                 if (fn && fn(tdb, key, dbuf, state)) {
1250                         /* They want us to terminate traversal */
1251                         ret = count;
1252                         if (unlock_record(tdb, tl.off) != 0) {
1253                                 TDB_LOG((tdb, 0, "tdb_traverse: unlock_record failed!\n"));;
1254                                 ret = -1;
1255                         }
1256                         tdb->travlocks.next = tl.next;
1257                         SAFE_FREE(key.dptr);
1258                         return count;
1259                 }
1260                 SAFE_FREE(key.dptr);
1261         }
1262 out:
1263         tdb->travlocks.next = tl.next;
1264         if (ret < 0)
1265                 return -1;
1266         else
1267                 return count;
1268 }
1269
1270 /* find the first entry in the database and return its key */
1271 TDB_DATA tdb_firstkey(TDB_CONTEXT *tdb)
1272 {
1273         TDB_DATA key;
1274         struct list_struct rec;
1275
1276         /* release any old lock */
1277         if (unlock_record(tdb, tdb->travlocks.off) != 0)
1278                 return tdb_null;
1279         tdb->travlocks.off = tdb->travlocks.hash = 0;
1280
1281         if (tdb_next_lock(tdb, &tdb->travlocks, &rec) <= 0)
1282                 return tdb_null;
1283         /* now read the key */
1284         key.dsize = rec.key_len;
1285         key.dptr =tdb_alloc_read(tdb,tdb->travlocks.off+sizeof(rec),key.dsize);
1286         if (tdb_unlock(tdb, BUCKET(tdb->travlocks.hash), F_WRLCK) != 0)
1287                 TDB_LOG((tdb, 0, "tdb_firstkey: error occurred while tdb_unlocking!\n"));
1288         return key;
1289 }
1290
1291 /* find the next entry in the database, returning its key */
1292 TDB_DATA tdb_nextkey(TDB_CONTEXT *tdb, TDB_DATA oldkey)
1293 {
1294         u32 oldhash;
1295         TDB_DATA key = tdb_null;
1296         struct list_struct rec;
1297         char *k = NULL;
1298
1299         /* Is locked key the old key?  If so, traverse will be reliable. */
1300         if (tdb->travlocks.off) {
1301                 if (tdb_lock(tdb,tdb->travlocks.hash,F_WRLCK))
1302                         return tdb_null;
1303                 if (rec_read(tdb, tdb->travlocks.off, &rec) == -1
1304                     || !(k = tdb_alloc_read(tdb,tdb->travlocks.off+sizeof(rec),
1305                                             rec.key_len))
1306                     || memcmp(k, oldkey.dptr, oldkey.dsize) != 0) {
1307                         /* No, it wasn't: unlock it and start from scratch */
1308                         if (unlock_record(tdb, tdb->travlocks.off) != 0)
1309                                 return tdb_null;
1310                         if (tdb_unlock(tdb, tdb->travlocks.hash, F_WRLCK) != 0)
1311                                 return tdb_null;
1312                         tdb->travlocks.off = 0;
1313                 }
1314
1315                 SAFE_FREE(k);
1316         }
1317
1318         if (!tdb->travlocks.off) {
1319                 /* No previous element: do normal find, and lock record */
1320                 tdb->travlocks.off = tdb_find_lock(tdb, oldkey, F_WRLCK, &rec);
1321                 if (!tdb->travlocks.off)
1322                         return tdb_null;
1323                 tdb->travlocks.hash = BUCKET(rec.full_hash);
1324                 if (lock_record(tdb, tdb->travlocks.off) != 0) {
1325                         TDB_LOG((tdb, 0, "tdb_nextkey: lock_record failed (%s)!\n", strerror(errno)));
1326                         return tdb_null;
1327                 }
1328         }
1329         oldhash = tdb->travlocks.hash;
1330
1331         /* Grab next record: locks chain and returned record,
1332            unlocks old record */
1333         if (tdb_next_lock(tdb, &tdb->travlocks, &rec) > 0) {
1334                 key.dsize = rec.key_len;
1335                 key.dptr = tdb_alloc_read(tdb, tdb->travlocks.off+sizeof(rec),
1336                                           key.dsize);
1337                 /* Unlock the chain of this new record */
1338                 if (tdb_unlock(tdb, tdb->travlocks.hash, F_WRLCK) != 0)
1339                         TDB_LOG((tdb, 0, "tdb_nextkey: WARNING tdb_unlock failed!\n"));
1340         }
1341         /* Unlock the chain of old record */
1342         if (tdb_unlock(tdb, BUCKET(oldhash), F_WRLCK) != 0)
1343                 TDB_LOG((tdb, 0, "tdb_nextkey: WARNING tdb_unlock failed!\n"));
1344         return key;
1345 }
1346
1347 /* delete an entry in the database given a key */
1348 int tdb_delete(TDB_CONTEXT *tdb, TDB_DATA key)
1349 {
1350         tdb_off rec_ptr;
1351         struct list_struct rec;
1352         int ret;
1353
1354         if (!(rec_ptr = tdb_find_lock(tdb, key, F_WRLCK, &rec)))
1355                 return -1;
1356         ret = do_delete(tdb, rec_ptr, &rec);
1357         if (tdb_unlock(tdb, BUCKET(rec.full_hash), F_WRLCK) != 0)
1358                 TDB_LOG((tdb, 0, "tdb_delete: WARNING tdb_unlock failed!\n"));
1359         return ret;
1360 }
1361
1362 /* store an element in the database, replacing any existing element
1363    with the same key 
1364
1365    return 0 on success, -1 on failure
1366 */
1367 int tdb_store(TDB_CONTEXT *tdb, TDB_DATA key, TDB_DATA dbuf, int flag)
1368 {
1369         struct list_struct rec;
1370         u32 hash;
1371         tdb_off rec_ptr;
1372         char *p = NULL;
1373         int ret = 0;
1374
1375         /* find which hash bucket it is in */
1376         hash = tdb_hash(&key);
1377         if (!tdb_keylocked(tdb, hash))
1378                 return -1;
1379         if (tdb_lock(tdb, BUCKET(hash), F_WRLCK) == -1)
1380                 return -1;
1381
1382         /* check for it existing, on insert. */
1383         if (flag == TDB_INSERT) {
1384                 if (tdb_exists(tdb, key)) {
1385                         tdb->ecode = TDB_ERR_EXISTS;
1386                         goto fail;
1387                 }
1388         } else {
1389                 /* first try in-place update, on modify or replace. */
1390                 if (tdb_update(tdb, key, dbuf) == 0)
1391                         goto out;
1392                 if (flag == TDB_MODIFY && tdb->ecode == TDB_ERR_NOEXIST)
1393                         goto fail;
1394         }
1395         /* reset the error code potentially set by the tdb_update() */
1396         tdb->ecode = TDB_SUCCESS;
1397
1398         /* delete any existing record - if it doesn't exist we don't
1399            care.  Doing this first reduces fragmentation, and avoids
1400            coalescing with `allocated' block before it's updated. */
1401         if (flag != TDB_INSERT)
1402                 tdb_delete(tdb, key);
1403
1404         /* Copy key+value *before* allocating free space in case malloc
1405            fails and we are left with a dead spot in the tdb. */
1406
1407         if (!(p = (char *)malloc(key.dsize + dbuf.dsize))) {
1408                 tdb->ecode = TDB_ERR_OOM;
1409                 goto fail;
1410         }
1411
1412         memcpy(p, key.dptr, key.dsize);
1413         memcpy(p+key.dsize, dbuf.dptr, dbuf.dsize);
1414
1415         /* now we're into insert / modify / replace of a record which
1416          * we know could not be optimised by an in-place store (for
1417          * various reasons).  */
1418         if (!(rec_ptr = tdb_allocate(tdb, key.dsize + dbuf.dsize, &rec)))
1419                 goto fail;
1420
1421         /* Read hash top into next ptr */
1422         if (ofs_read(tdb, TDB_HASH_TOP(hash), &rec.next) == -1)
1423                 goto fail;
1424
1425         rec.key_len = key.dsize;
1426         rec.data_len = dbuf.dsize;
1427         rec.full_hash = hash;
1428         rec.magic = TDB_MAGIC;
1429
1430         /* write out and point the top of the hash chain at it */
1431         if (rec_write(tdb, rec_ptr, &rec) == -1
1432             || tdb_write(tdb, rec_ptr+sizeof(rec), p, key.dsize+dbuf.dsize)==-1
1433             || ofs_write(tdb, TDB_HASH_TOP(hash), &rec_ptr) == -1) {
1434                 /* Need to tdb_unallocate() here */
1435                 goto fail;
1436         }
1437  out:
1438         SAFE_FREE(p); 
1439         tdb_unlock(tdb, BUCKET(hash), F_WRLCK);
1440         return ret;
1441 fail:
1442         ret = -1;
1443         goto out;
1444 }
1445
1446 static int tdb_already_open(dev_t device,
1447                             ino_t ino)
1448 {
1449         TDB_CONTEXT *i;
1450         
1451         for (i = tdbs; i; i = i->next) {
1452                 if (i->device == device && i->inode == ino) {
1453                         return 1;
1454                 }
1455         }
1456
1457         return 0;
1458 }
1459
1460 /* open the database, creating it if necessary 
1461
1462    The open_flags and mode are passed straight to the open call on the
1463    database file. A flags value of O_WRONLY is invalid. The hash size
1464    is advisory, use zero for a default value.
1465
1466    Return is NULL on error, in which case errno is also set.  Don't 
1467    try to call tdb_error or tdb_errname, just do strerror(errno).
1468
1469    @param name may be NULL for internal databases. */
1470 TDB_CONTEXT *tdb_open(const char *name, int hash_size, int tdb_flags,
1471                       int open_flags, mode_t mode)
1472 {
1473         return tdb_open_ex(name, hash_size, tdb_flags, open_flags, mode, NULL);
1474 }
1475
1476
1477 TDB_CONTEXT *tdb_open_ex(const char *name, int hash_size, int tdb_flags,
1478                          int open_flags, mode_t mode,
1479                          tdb_log_func log_fn)
1480 {
1481         TDB_CONTEXT *tdb;
1482         struct stat st;
1483         int rev = 0, locked;
1484         unsigned char *vp;
1485         u32 vertest;
1486
1487         if (!(tdb = calloc(1, sizeof *tdb))) {
1488                 /* Can't log this */
1489                 errno = ENOMEM;
1490                 goto fail;
1491         }
1492         tdb->fd = -1;
1493         tdb->name = NULL;
1494         tdb->map_ptr = NULL;
1495         tdb->lockedkeys = NULL;
1496         tdb->flags = tdb_flags;
1497         tdb->open_flags = open_flags;
1498         tdb->log_fn = log_fn;
1499         
1500         if ((open_flags & O_ACCMODE) == O_WRONLY) {
1501                 TDB_LOG((tdb, 0, "tdb_open_ex: can't open tdb %s write-only\n",
1502                          name));
1503                 errno = EINVAL;
1504                 goto fail;
1505         }
1506         
1507         if (hash_size == 0)
1508                 hash_size = DEFAULT_HASH_SIZE;
1509         if ((open_flags & O_ACCMODE) == O_RDONLY) {
1510                 tdb->read_only = 1;
1511                 /* read only databases don't do locking or clear if first */
1512                 tdb->flags |= TDB_NOLOCK;
1513                 tdb->flags &= ~TDB_CLEAR_IF_FIRST;
1514         }
1515
1516         /* internal databases don't mmap or lock, and start off cleared */
1517         if (tdb->flags & TDB_INTERNAL) {
1518                 tdb->flags |= (TDB_NOLOCK | TDB_NOMMAP);
1519                 tdb->flags &= ~TDB_CLEAR_IF_FIRST;
1520                 if (tdb_new_database(tdb, hash_size) != 0) {
1521                         TDB_LOG((tdb, 0, "tdb_open_ex: tdb_new_database failed!"));
1522                         goto fail;
1523                 }
1524                 goto internal;
1525         }
1526
1527         if ((tdb->fd = open(name, open_flags, mode)) == -1) {
1528                 TDB_LOG((tdb, 5, "tdb_open_ex: could not open file %s: %s\n",
1529                          name, strerror(errno)));
1530                 goto fail;      /* errno set by open(2) */
1531         }
1532
1533         /* ensure there is only one process initialising at once */
1534         if (tdb_brlock(tdb, GLOBAL_LOCK, F_WRLCK, F_SETLKW, 0) == -1) {
1535                 TDB_LOG((tdb, 0, "tdb_open_ex: failed to get global lock on %s: %s\n",
1536                          name, strerror(errno)));
1537                 goto fail;      /* errno set by tdb_brlock */
1538         }
1539
1540         /* we need to zero database if we are the only one with it open */
1541         if ((locked = (tdb_brlock(tdb, ACTIVE_LOCK, F_WRLCK, F_SETLK, 0) == 0))
1542             && (tdb_flags & TDB_CLEAR_IF_FIRST)) {
1543                 open_flags |= O_CREAT;
1544                 if (ftruncate(tdb->fd, 0) == -1) {
1545                         TDB_LOG((tdb, 0, "tdb_open_ex: "
1546                                  "failed to truncate %s: %s\n",
1547                                  name, strerror(errno)));
1548                         goto fail; /* errno set by ftruncate */
1549                 }
1550         }
1551
1552         if (read(tdb->fd, &tdb->header, sizeof(tdb->header)) != sizeof(tdb->header)
1553             || strcmp(tdb->header.magic_food, TDB_MAGIC_FOOD) != 0
1554             || (tdb->header.version != TDB_VERSION
1555                 && !(rev = (tdb->header.version==TDB_BYTEREV(TDB_VERSION))))) {
1556                 /* its not a valid database - possibly initialise it */
1557                 if (!(open_flags & O_CREAT) || tdb_new_database(tdb, hash_size) == -1) {
1558                         errno = EIO; /* ie bad format or something */
1559                         goto fail;
1560                 }
1561                 rev = (tdb->flags & TDB_CONVERT);
1562         }
1563         vp = (unsigned char *)&tdb->header.version;
1564         vertest = (((u32)vp[0]) << 24) | (((u32)vp[1]) << 16) |
1565                   (((u32)vp[2]) << 8) | (u32)vp[3];
1566         tdb->flags |= (vertest==TDB_VERSION) ? TDB_BIGENDIAN : 0;
1567         if (!rev)
1568                 tdb->flags &= ~TDB_CONVERT;
1569         else {
1570                 tdb->flags |= TDB_CONVERT;
1571                 convert(&tdb->header, sizeof(tdb->header));
1572         }
1573         if (fstat(tdb->fd, &st) == -1)
1574                 goto fail;
1575
1576         /* Is it already in the open list?  If so, fail. */
1577         if (tdb_already_open(st.st_dev, st.st_ino)) {
1578                 TDB_LOG((tdb, 2, "tdb_open_ex: "
1579                          "%s (%d,%d) is already open in this process\n",
1580                          name, st.st_dev, st.st_ino));
1581                 errno = EBUSY;
1582                 goto fail;
1583         }
1584
1585         if (!(tdb->name = (char *)strdup(name))) {
1586                 errno = ENOMEM;
1587                 goto fail;
1588         }
1589
1590         tdb->map_size = st.st_size;
1591         tdb->device = st.st_dev;
1592         tdb->inode = st.st_ino;
1593         tdb->locked = calloc(tdb->header.hash_size+1, sizeof(tdb->locked[0]));
1594         if (!tdb->locked) {
1595                 TDB_LOG((tdb, 2, "tdb_open_ex: "
1596                          "failed to allocate lock structure for %s\n",
1597                          name));
1598                 errno = ENOMEM;
1599                 goto fail;
1600         }
1601         tdb_mmap(tdb);
1602         if (locked) {
1603                 if (!tdb->read_only)
1604                         if (tdb_clear_spinlocks(tdb) != 0) {
1605                                 TDB_LOG((tdb, 0, "tdb_open_ex: "
1606                                 "failed to clear spinlock\n"));
1607                                 goto fail;
1608                         }
1609                 if (tdb_brlock(tdb, ACTIVE_LOCK, F_UNLCK, F_SETLK, 0) == -1) {
1610                         TDB_LOG((tdb, 0, "tdb_open_ex: "
1611                                  "failed to take ACTIVE_LOCK on %s: %s\n",
1612                                  name, strerror(errno)));
1613                         goto fail;
1614                 }
1615         }
1616         /* leave this lock in place to indicate it's in use */
1617         if (tdb_brlock(tdb, ACTIVE_LOCK, F_RDLCK, F_SETLKW, 0) == -1)
1618                 goto fail;
1619
1620  internal:
1621         /* Internal (memory-only) databases skip all the code above to
1622          * do with disk files, and resume here by releasing their
1623          * global lock and hooking into the active list. */
1624         if (tdb_brlock(tdb, GLOBAL_LOCK, F_UNLCK, F_SETLKW, 0) == -1)
1625                 goto fail;
1626         tdb->next = tdbs;
1627         tdbs = tdb;
1628         return tdb;
1629
1630  fail:
1631         { int save_errno = errno;
1632
1633         if (!tdb)
1634                 return NULL;
1635         
1636         if (tdb->map_ptr) {
1637                 if (tdb->flags & TDB_INTERNAL)
1638                         SAFE_FREE(tdb->map_ptr);
1639                 else
1640                         tdb_munmap(tdb);
1641         }
1642         SAFE_FREE(tdb->name);
1643         if (tdb->fd != -1)
1644                 if (close(tdb->fd) != 0)
1645                         TDB_LOG((tdb, 5, "tdb_open_ex: failed to close tdb->fd on error!\n"));
1646         SAFE_FREE(tdb->locked);
1647         SAFE_FREE(tdb);
1648         errno = save_errno;
1649         return NULL;
1650         }
1651 }
1652
1653 /* close a database */
1654 int tdb_close(TDB_CONTEXT *tdb)
1655 {
1656         TDB_CONTEXT **i;
1657         int ret = 0;
1658
1659         if (tdb->map_ptr) {
1660                 if (tdb->flags & TDB_INTERNAL)
1661                         SAFE_FREE(tdb->map_ptr);
1662                 else
1663                         tdb_munmap(tdb);
1664         }
1665         SAFE_FREE(tdb->name);
1666         if (tdb->fd != -1)
1667                 ret = close(tdb->fd);
1668         SAFE_FREE(tdb->locked);
1669         SAFE_FREE(tdb->lockedkeys);
1670
1671         /* Remove from contexts list */
1672         for (i = &tdbs; *i; i = &(*i)->next) {
1673                 if (*i == tdb) {
1674                         *i = tdb->next;
1675                         break;
1676                 }
1677         }
1678
1679         memset(tdb, 0, sizeof(*tdb));
1680         SAFE_FREE(tdb);
1681
1682         return ret;
1683 }
1684
1685 /* lock/unlock entire database */
1686 int tdb_lockall(TDB_CONTEXT *tdb)
1687 {
1688         u32 i;
1689
1690         /* There are no locks on read-only dbs */
1691         if (tdb->read_only)
1692                 return TDB_ERRCODE(TDB_ERR_LOCK, -1);
1693         if (tdb->lockedkeys)
1694                 return TDB_ERRCODE(TDB_ERR_NOLOCK, -1);
1695         for (i = 0; i < tdb->header.hash_size; i++) 
1696                 if (tdb_lock(tdb, i, F_WRLCK))
1697                         break;
1698
1699         /* If error, release locks we have... */
1700         if (i < tdb->header.hash_size) {
1701                 u32 j;
1702
1703                 for ( j = 0; j < i; j++)
1704                         tdb_unlock(tdb, j, F_WRLCK);
1705                 return TDB_ERRCODE(TDB_ERR_NOLOCK, -1);
1706         }
1707
1708         return 0;
1709 }
1710 void tdb_unlockall(TDB_CONTEXT *tdb)
1711 {
1712         u32 i;
1713         for (i=0; i < tdb->header.hash_size; i++)
1714                 tdb_unlock(tdb, i, F_WRLCK);
1715 }
1716
1717 int tdb_lockkeys(TDB_CONTEXT *tdb, u32 number, TDB_DATA keys[])
1718 {
1719         u32 i, j, hash;
1720
1721         /* Can't lock more keys if already locked */
1722         if (tdb->lockedkeys)
1723                 return TDB_ERRCODE(TDB_ERR_NOLOCK, -1);
1724         if (!(tdb->lockedkeys = malloc(sizeof(u32) * (number+1))))
1725                 return TDB_ERRCODE(TDB_ERR_OOM, -1);
1726         /* First number in array is # keys */
1727         tdb->lockedkeys[0] = number;
1728
1729         /* Insertion sort by bucket */
1730         for (i = 0; i < number; i++) {
1731                 hash = tdb_hash(&keys[i]);
1732                 for (j = 0; j < i && BUCKET(tdb->lockedkeys[j+1]) < BUCKET(hash); j++);
1733                         memmove(&tdb->lockedkeys[j+2], &tdb->lockedkeys[j+1], sizeof(u32) * (i-j));
1734                 tdb->lockedkeys[j+1] = hash;
1735         }
1736         /* Finally, lock in order */
1737         for (i = 0; i < number; i++)
1738                 if (tdb_lock(tdb, i, F_WRLCK))
1739                         break;
1740
1741         /* If error, release locks we have... */
1742         if (i < number) {
1743                 for ( j = 0; j < i; j++)
1744                         tdb_unlock(tdb, j, F_WRLCK);
1745                 SAFE_FREE(tdb->lockedkeys);
1746                 return TDB_ERRCODE(TDB_ERR_NOLOCK, -1);
1747         }
1748         return 0;
1749 }
1750
1751 /* Unlock the keys previously locked by tdb_lockkeys() */
1752 void tdb_unlockkeys(TDB_CONTEXT *tdb)
1753 {
1754         u32 i;
1755         for (i = 0; i < tdb->lockedkeys[0]; i++)
1756                 tdb_unlock(tdb, tdb->lockedkeys[i+1], F_WRLCK);
1757         SAFE_FREE(tdb->lockedkeys);
1758 }
1759
1760 /* lock/unlock one hash chain. This is meant to be used to reduce
1761    contention - it cannot guarantee how many records will be locked */
1762 int tdb_chainlock(TDB_CONTEXT *tdb, TDB_DATA key)
1763 {
1764         return tdb_lock(tdb, BUCKET(tdb_hash(&key)), F_WRLCK);
1765 }
1766
1767 int tdb_chainunlock(TDB_CONTEXT *tdb, TDB_DATA key)
1768 {
1769         return tdb_unlock(tdb, BUCKET(tdb_hash(&key)), F_WRLCK);
1770 }
1771
1772
1773 /* register a loging function */
1774 void tdb_logging_function(TDB_CONTEXT *tdb, void (*fn)(TDB_CONTEXT *, int , const char *, ...))
1775 {
1776         tdb->log_fn = fn;
1777 }
1778
1779
1780 /* reopen a tdb - this is used after a fork to ensure that we have an independent
1781    seek pointer from our parent and to re-establish locks */
1782 int tdb_reopen(TDB_CONTEXT *tdb)
1783 {
1784         struct stat st;
1785
1786         if (tdb_munmap(tdb) != 0) {
1787                 TDB_LOG((tdb, 0, "tdb_reopen: munmap failed (%s)\n", strerror(errno)));
1788                 goto fail;
1789         }
1790         if (close(tdb->fd) != 0)
1791                 TDB_LOG((tdb, 0, "tdb_reopen: WARNING closing tdb->fd failed!\n"));
1792         tdb->fd = open(tdb->name, tdb->open_flags & ~(O_CREAT|O_TRUNC), 0);
1793         if (tdb->fd == -1) {
1794                 TDB_LOG((tdb, 0, "tdb_reopen: open failed (%s)\n", strerror(errno)));
1795                 goto fail;
1796         }
1797         if (fstat(tdb->fd, &st) != 0) {
1798                 TDB_LOG((tdb, 0, "tdb_reopen: fstat failed (%s)\n", strerror(errno)));
1799                 goto fail;
1800         }
1801         if (st.st_ino != tdb->inode || st.st_dev != tdb->device) {
1802                 TDB_LOG((tdb, 0, "tdb_reopen: file dev/inode has changed!\n"));
1803                 goto fail;
1804         }
1805         tdb_mmap(tdb);
1806         if (tdb_brlock(tdb, ACTIVE_LOCK, F_RDLCK, F_SETLKW, 0) == -1) {
1807                 TDB_LOG((tdb, 0, "tdb_reopen: failed to obtain active lock\n"));
1808                 goto fail;
1809         }
1810
1811         return 0;
1812
1813 fail:
1814         tdb_close(tdb);
1815         return -1;
1816 }
1817
1818 /* reopen all tdb's */
1819 int tdb_reopen_all(void)
1820 {
1821         TDB_CONTEXT *tdb;
1822
1823         for (tdb=tdbs; tdb; tdb = tdb->next) {
1824                 if (tdb_reopen(tdb) != 0) return -1;
1825         }
1826
1827         return 0;
1828 }