3
3
#define _GNU_SOURCE 1 /* feature test macro so that RTLD_NEXT will be available */
4
4
#endif
5
5
6
+ #include "collfs.h"
6
7
// start dl-load
7
8
8
9
#if COLLFS_IN_LIBC
@@ -61,6 +62,7 @@ static void set_errno(int e) { errno = e; }
61
62
struct FileLink {
62
63
MPI_Comm comm ;
63
64
int fd ;
65
+ int refct ;
64
66
char fname [MAXPATHLEN ];
65
67
void * mem ;
66
68
size_t len ;
@@ -71,13 +73,21 @@ static struct FileLink *DLOpenFiles;
71
73
static const int BaseFD = 10000 ;
72
74
static int NextFD = 10001 ;
73
75
76
+ struct MMapLink {
77
+ void * addr ;
78
+ size_t len ;
79
+ int fd ;
80
+ struct MMapLink * next ;
81
+ };
82
+ static struct MMapLink * MMapRegions ;
83
+
74
84
struct CommLink {
75
85
MPI_Comm comm ;
76
86
struct CommLink * next ;
77
87
};
78
88
static struct CommLink * CommStack ;
79
89
80
- /* Not collective, but changes the communicator on which future IO is collective */
90
+ /* Logically collective, changes the communicator on which future IO is collective */
81
91
int __collfs_comm_push (MPI_Comm comm )
82
92
{
83
93
struct CommLink * link ;
@@ -86,14 +96,20 @@ int __collfs_comm_push(MPI_Comm comm)
86
96
link -> comm = comm ;
87
97
link -> next = CommStack ;
88
98
CommStack = link ;
99
+ #if DEBUG
100
+ MPI_Barrier (link -> comm );
101
+ #endif
89
102
return 0 ;
90
103
}
91
- /* Not collective, but changes the communicator on which future IO is collective */
104
+ /* Logically collective, changes the communicator on which future IO is collective */
92
105
int __collfs_comm_pop (void )
93
106
{
94
107
struct CommLink * link = CommStack ;
95
108
if (!link ) return -1 ;
96
109
CommStack = link -> next ;
110
+ #if DEBUG
111
+ MPI_Barrier (link -> comm );
112
+ #endif
97
113
free (link );
98
114
return 0 ;
99
115
}
@@ -179,65 +195,125 @@ int __collfs_xstat64(int vers, const char *file, struct stat64 *buf)
179
195
void * __collfs_mmap (void * addr , size_t len , int prot , int flags ,
180
196
int fildes , off_t off )
181
197
{
182
- int rank = 0 ;
183
- if (MPI_Initialized ) {
198
+ struct FileLink * link ;
199
+ for (link = DLOpenFiles ; link ; link = link -> next ) {
200
+ if (link -> fd == fildes ) {
201
+ int err ;
184
202
#if DEBUG
185
- stderr_printf ("[%x] mmap(fd:%x @%x,%x,%x,%x,%x)\n" , rank , fildes , (int )(intptr_t )addr , (int )len , prot , flags , (int )off );
186
- #endif
187
- stderr_printf ("__collfs_mmap has not been implemented yet! (passing through)\n" );
188
- return mmap (addr , len , prot , flags , fildes , off );
203
+ int rank ;
204
+ err = MPI_Comm_rank (link -> comm , & rank );
205
+ if (err ) {
206
+ set_errno (EPROTO );
207
+ return MAP_FAILED ;
208
+ }
209
+ stderr_printf ("[%x] mmap(fd:%x @%x,%x,%x,%x,%x)\n" , rank , fildes , (int )(intptr_t )addr , (int )len , prot , flags , (int )off );
210
+ #endif
211
+ struct MMapLink * mlink ;
212
+
213
+ if (prot != PROT_READ && prot != (PROT_READ | PROT_EXEC )) {
214
+ set_errno (EACCES );
215
+ return MAP_FAILED ;
216
+ }
217
+ if (flags & MAP_FIXED ) { /* Not implemented due to laziness */
218
+ set_errno (ENOTSUP );
219
+ return MAP_FAILED ;
220
+ }
221
+ if (flags != MAP_PRIVATE ) { /* Cannot do MAP_SHARED for a collective fd */
222
+ set_errno (ENOTSUP );
223
+ return MAP_FAILED ;
224
+ }
225
+ if (off < 0 ) {
226
+ set_errno (ENXIO );
227
+ return MAP_FAILED ;
228
+ }
229
+ if (off + len > link -> len ) {
230
+ set_errno (EOVERFLOW );
231
+ return MAP_FAILED ;
232
+ }
233
+ mlink = malloc (sizeof * mlink );
234
+ mlink -> addr = (char * )link -> mem + off ;
235
+ mlink -> len = len ;
236
+ mlink -> fd = fildes ;
237
+ mlink -> next = MMapRegions ;
238
+ MMapRegions = mlink ;
239
+ link -> refct ++ ;
240
+ return mlink -> addr ;
241
+ }
189
242
}
190
- else {
191
243
#if DEBUG
192
- stderr_printf ("[NO_MPI] mmap(fd:%x @%x,%x,%x,%x,%x)\n" , fildes , (int )(intptr_t )addr , (int )len , prot , flags , (int )off );
193
- #endif
194
- return mmap (addr , len , prot , flags , fildes , off );
195
- }
244
+ stderr_printf ("[NO_MPI] mmap(fd:%x @%x,%x,%x,%x,%x)\n" , fildes , (int )(intptr_t )addr , (int )len , prot , flags , (int )off );
245
+ #endif
246
+ return mmap (addr , len , prot , flags , fildes , off );
196
247
}
197
248
198
249
/* Not collective */
199
250
int __collfs_munmap (__ptr_t addr , size_t len )
200
251
{
201
- int rank = 0 ;
202
- if (MPI_Initialized ) {
252
+ if (CommStack ) {
203
253
#if DEBUG
254
+ int rank ;
255
+ MPI_Comm_rank (MPI_COMM_WORLD , & rank );
204
256
stderr_printf ("[%x] munmap(@%x,%x)\n" , rank , (int )(intptr_t )addr , (int )len );
205
- #endif
206
- stderr_printf ("__collfs_munmap has not been implemented yet! (passing through)\n" );
207
- return munmap (addr , len );
208
- }
209
- else {
257
+ #endif
258
+ struct MMapLink * mlink ;
259
+ for (mlink = MMapRegions ; mlink ; mlink = mlink -> next ) {
260
+ if (mlink -> addr == addr ) {
261
+ int fd = mlink -> fd ;
262
+ if (mlink -> len != len ) {
210
263
#if DEBUG
211
- stderr_printf ("[NO_MPI] munmap(@%x,%x)\n" , (int )(intptr_t )addr , (int )len );
212
- #endif
213
- return munmap (addr , len );
264
+ stderr_printf ("[%x) Attempt to unmap region of length %x when %x was mapped\n" , rank , (int )len , (int )mlink -> len );
265
+ #endif
266
+ set_errno (EINVAL );
267
+ return -1 ;
268
+ }
269
+ free (mlink );
270
+ return __collfs_close (fd );
271
+ }
272
+ }
214
273
}
274
+ #if DEBUG
275
+ stderr_printf ("[NO_MPI] munmap(@%x,%x)\n" , (int )(intptr_t )addr , (int )len );
276
+ #endif
277
+ return munmap (addr , len );
215
278
}
216
279
217
280
218
281
off_t __collfs_lseek (int fildes , off_t offset , int whence )
219
282
{
220
- int rank = 0 ;
221
- if (MPI_Initialized ) {
283
+ struct FileLink * link ;
284
+
285
+ for (link = DLOpenFiles ; link ; link = link -> next ) {
286
+ if (link -> fd == fildes ) {
287
+ int rank = 0 ;
288
+ MPI_Comm_rank (link -> comm ,& rank );
222
289
#if DEBUG
223
- stderr_printf ("[%x] lseek(fd:%x,%x,%x)\n" ,rank ,fildes ,(int )offset ,whence );
224
- #endif
225
- stderr_printf ("__collfs_lseek has not been implemented yet! (passing through)\n" );
226
- return __lseek (fildes , offset , whence );
290
+ stderr_printf ("[%x] lseek(fd:%x,%x,%x)\n" ,rank ,fildes ,(int )offset ,whence );
291
+ #endif
292
+ if (!rank ) return __lseek (fildes , offset , whence ); /* Rank 0 has a normal fd */
293
+ switch (whence ) {
294
+ case SEEK_SET :
295
+ link -> offset = offset ;
296
+ break ;
297
+ case SEEK_CUR :
298
+ link -> offset += offset ;
299
+ break ;
300
+ case SEEK_END :
301
+ link -> offset = link -> len + offset ;
302
+ break ;
303
+ }
304
+ return link -> offset ;
305
+ }
227
306
}
228
- else {
229
307
#if DEBUG
230
- stderr_printf ("[NO_MPI] lseek(fd:%x,%x,%x)\n" ,fildes ,(int )offset ,whence );
231
- #endif
232
- return __lseek (fildes , offset , whence );
233
- }
308
+ stderr_printf ("[NO_MPI] lseek(fd:%x,%x,%x)\n" ,fildes ,(int )offset ,whence );
309
+ #endif
310
+ return __lseek (fildes , offset , whence );
234
311
}
235
312
236
-
237
313
int __collfs_open (const char * pathname , int flags ,...)
238
314
{
239
315
mode_t mode = 0 ;
240
- int err ,rank = 0 , initialized ;
316
+ int err , rank , initialized ;
241
317
242
318
if (flags & O_CREAT ) {
243
319
va_list ap ;
@@ -270,6 +346,11 @@ int __collfs_open(const char *pathname, int flags,...)
270
346
return -1 ;
271
347
}
272
348
349
+ err = MPI_Comm_rank (CommStack -> comm , & rank );
350
+ if (err ) {
351
+ set_errno (ECOLLFS );
352
+ return -1 ;
353
+ }
273
354
#if DEBUG
274
355
fprintf (stderr , "[%d] open(\"%s\",%d,%d)\n" , rank , pathname , flags , mode );
275
356
#endif
@@ -287,6 +368,7 @@ int __collfs_open(const char *pathname, int flags,...)
287
368
else len = (int )fdst .st_size ; /* Cast prevents using large files, but MPI would need workarounds too */
288
369
}
289
370
}
371
+ MPI_Barrier (CommStack -> comm );
290
372
err = MPI_Bcast (& len , 1 ,MPI_INT , 0 , CommStack -> comm ); if (err ) return -1 ;
291
373
if (len < 0 ) return -1 ;
292
374
mem = NULL ;
@@ -299,8 +381,7 @@ int __collfs_open(const char *pathname, int flags,...)
299
381
#if DEBUG
300
382
if (fd < 0 ) stderr_printf ("could not shm_open because of \n" );
301
383
#endif
302
- if (fd >= 0 )
303
-
384
+ if (fd >= 0 )
304
385
/* Make sure everyone found memory */
305
386
gotmem = !!mem ;
306
387
err = MPI_Allreduce (MPI_IN_PLACE , & gotmem , 1 , MPI_INT , MPI_LAND , CommStack -> comm );
@@ -318,6 +399,7 @@ int __collfs_open(const char *pathname, int flags,...)
318
399
link = malloc (sizeof * link );
319
400
link -> comm = CommStack -> comm ;
320
401
link -> fd = fd ;
402
+ link -> refct = 1 ;
321
403
strcpy (link -> fname , pathname );
322
404
link -> mem = mem ;
323
405
link -> len = len ;
@@ -337,37 +419,27 @@ int __collfs_open(const char *pathname, int flags,...)
337
419
int __collfs_close (int fd )
338
420
{
339
421
struct FileLink * * linkp ;
340
- int err ,initialized ;
341
- int rank = 0 ;
342
-
343
- // pass through to libc __close if MPI has not been loaded yet
344
- if (MPI_Initialized ) {
345
- err = MPI_Initialized (& initialized ); if (err ) return -1 ;
346
- #if DEBUG
347
- if (initialized ) {err = MPI_Comm_rank (MPI_COMM_WORLD ,& rank ); if (err ) return -1 ;}
348
- stderr_printf ("[%x] close(fd:%x)\n" ,rank ,fd );
349
- #endif
350
- }
351
- else {
352
- #if DEBUG
353
- stderr_printf ("[NO_MPI] close(fd:%x)\n" ,fd );
354
- #endif
355
- return __close (fd );
356
- }
422
+ int err ;
357
423
358
424
for (linkp = & DLOpenFiles ; linkp && * linkp ; linkp = & (* linkp )-> next ) {
359
425
struct FileLink * link = * linkp ;
360
426
if (link -> fd == fd ) { /* remove it from the list */
361
- int rank = 0 , xerr = 0 ;
427
+ int rank = 0 , xerr = 0 , initialized ;
362
428
429
+ #if DEBUG
430
+ err = MPI_Comm_rank (MPI_COMM_WORLD ,& rank ); if (err ) return -1 ;
431
+ stderr_printf ("[%x] close(fd:%x)\n" ,rank ,fd );
432
+ #endif
433
+ if (-- link -> refct > 0 ) return 0 ;
434
+ err = MPI_Initialized (& initialized ); if (err ) return -1 ;
363
435
if (!initialized ) {
364
436
#if DEBUG
365
437
stderr_printf ("Attempt to close open collective fd, but MPI is not initialized. Perhaps it was finalized early?\n" );
366
438
#endif
367
439
set_errno (ECOLLFS );
368
440
return -1 ;
369
441
}
370
- err = MPI_Comm_rank (CommStack -> comm , & rank ); if (err ) return -1 ;
442
+ err = MPI_Comm_rank (CommStack ? CommStack -> comm : MPI_COMM_WORLD , & rank ); if (err ) return -1 ;
371
443
if (!rank ) {
372
444
munmap (link -> mem , link -> len );
373
445
xerr = __close (fd );
@@ -379,6 +451,9 @@ int __collfs_close(int fd)
379
451
return xerr ;
380
452
}
381
453
}
454
+ #if DEBUG
455
+ stderr_printf ("[NO_MPI] close(fd:%x)\n" ,fd );
456
+ #endif
382
457
return __close (fd );
383
458
}
384
459
@@ -394,6 +469,9 @@ ssize_t __collfs_read(int fd, void *buf, size_t count)
394
469
err = MPI_Initialized (& initialized ); if (err ) return -1 ;
395
470
if (initialized ) {err = MPI_Comm_rank (link -> comm , & rank ); if (err ) return -1 ;}
396
471
if (fd == link -> fd ) {
472
+ #if DEBUG > 1
473
+ stderr_printf ("[%x] read(%x,%x,%x)\n" , rank , fd , (unsigned )(uintptr_t )buf , (unsigned )count );
474
+ #endif
397
475
if (!rank ) return __read (fd , buf , count );
398
476
else {
399
477
if ((link -> len - link -> offset ) < count ) count = link -> len - link -> offset ;
@@ -403,6 +481,9 @@ ssize_t __collfs_read(int fd, void *buf, size_t count)
403
481
}
404
482
}
405
483
}
484
+ #if DEBUG > 1
485
+ stderr_printf ("[NO_MPI] read(%x,%x,%x)\n" , fd , (unsigned )(uintptr_t )buf , (unsigned )count );
486
+ #endif
406
487
return __read (fd , buf , count );
407
488
}
408
489
0 commit comments