diff --git a/A2/fhistogram-mt b/A2/fhistogram-mt new file mode 100755 index 0000000..3e8a058 Binary files /dev/null and b/A2/fhistogram-mt differ diff --git a/A2/fhistogram-mt.c b/A2/fhistogram-mt.c index 25f28db..6dcd041 100644 --- a/A2/fhistogram-mt.c +++ b/A2/fhistogram-mt.c @@ -22,6 +22,57 @@ pthread_mutex_t stdout_mutex = PTHREAD_MUTEX_INITIALIZER; #include "histogram.h" +int global_histogram[8] = { 0 }; + +struct job_queue q; + +pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER; + +void update_histogram_mt(int local_histogram[8]) { + pthread_mutex_lock(&mutex); + merge_histogram(local_histogram, global_histogram); + print_histogram(global_histogram); + pthread_mutex_unlock(&mutex); + +} + +int fhistogram_mt(char const *path) { + FILE *f = fopen(path, "r"); + + int local_histogram[8] = { 0 }; + + if (f == NULL) { + fflush(stdout); + warn("failed to open %s", path); + return -1; + } + + int i = 0; + + char c; + while (fread(&c, sizeof(c), 1, f) == 1) { + i++; + update_histogram(local_histogram, c); + if ((i % 100000) == 0) { + update_histogram_mt(local_histogram); + } + } + + fclose(f); + + update_histogram_mt(local_histogram); + + return 0; +} + +void* fhistogram_thread() { + char const* path; + while (job_queue_pop(&q, (void**) &path) == 0) { + fhistogram_mt(path); + } + return NULL; +} + int main(int argc, char * const *argv) { if (argc < 2) { err(1, "usage: paths..."); @@ -49,7 +100,14 @@ int main(int argc, char * const *argv) { paths = &argv[1]; } - assert(0); // Initialise the job queue and some worker threads here. + + job_queue_init(&q, 64); + pthread_t threads[num_threads]; + + for (int i = 0 ; i < num_threads ; i++) { + //printf("Initializing thread %i\n", i); + pthread_create(&threads[i], NULL, fhistogram_thread, NULL); + } // FTS_LOGICAL = follow symbolic links // FTS_NOCHDIR = do not change the working directory of the process @@ -70,7 +128,7 @@ int main(int argc, char * const *argv) { case FTS_D: break; case FTS_F: - assert(0); // Process the file p->fts_path, somehow. + job_queue_push(&q, strdup(p->fts_path)); break; default: break; @@ -79,7 +137,12 @@ int main(int argc, char * const *argv) { fts_close(ftsp); - assert(0); // Shut down the job queue and the worker threads here. + job_queue_destroy(&q); + //printf("done destroying jobqueue\n"); + + for (int i = 0 ; i < num_threads ; i++) { + pthread_join(threads[i], NULL); + } move_lines(9);