//note: How to run and test: Enter all the txt files' name you want to test into one txt file named "filename_list.txt".
// Put this .c document and the filename_list.txt file together under the same folder and then run and compile.
void reducerFunc(int reduceCount)
{
int i, j;
#pragma omp parallel for
for (i = 0; i < reduceCount; ++i)
{
for (j = 0; j < HASH_TABLE_MAX_SIZE; j++)
{
#pragma omp critical
if (hashTable[j])
{
Node* pHead = hashTable[j];
while (pHead)
{
if (hashTableLookupFunc(pHead->Key, FILENAME_NUM) == NULL)
hashTableInsertFunc(pHead->Key, pHead->Value, FILENAME_NUM);
else
{
int val = pHead->Value;
hashTableLookupFunc(pHead->Key, FILENAME_NUM)->Value += val;
}
pHead = pHead->pNext;
}
}
}
}
}
void writerFunc(FILE * fp)
{
int i;
Node* p;
fprintf(fp, "------print the result------ \n");
#pragma omp critial
{
for (i = 0; i < HASH_TABLE_MAX_SIZE; ++i)
{
if (hashTable[FILENAME_NUM] != NULL)
{
p = hashTable[FILENAME_NUM];
while (p)
{
fprintf(fp, "Word: %s, Count: %d\n", p->Key, p->Value);
p = p->pNext;
}
}
}
}
}
// Main Function
int main()
{
int file_num = 0;
FILE *read_filename = fopen("filename_list.txt", "r");
char **filename_list_array = (char **)malloc(sizeof(char*)* FILENAME_NUM);
int i,j;
Node* pHead;
#pragma omp parallel for
for (i = 0; i < FILENAME_NUM; i++)
{
filename_list_array = (char *)malloc(sizeof(char)* FILENAME_LEN);
}
if (read_filename == NULL)
{
printf("open the file incorrectly !");
return 0;
}
while (!feof(read_filename))
{
fscanf(read_filename, "%s\n", filename_list_array[file_num]);
file_num++;
}
printf("The result can be found in output.txt .\n");
omp_set_num_threads(8);
double time = -omp_get_wtime();
#pragma omp parallel private (i)
{
#pragma omp single nowait
{
for (i = 0; i < file_num; i++)
{
#pragma omp task
{
readFunc(filename_list_array, i);
}
}
}
}
#pragma omp parallel private (i)
{
#pragma omp single nowait
{
for (i = 0; i < file_num; i++)
{
#pragma omp task
{
mapperFunc(i);
}
}
}
}
#pragma omp barrier
reducerFunc(file_num);
#pragma omp barrier
FILE *fp = fopen("output.txt", "w");
writerFunc(fp);
hashTablePrintFunc(FILENAME_NUM);
#pragma omp parallel for
for (i = 0; i < FILENAME_NUM; i++)
{
for (j = 0; j < HASH_TABLE_MAX_SIZE; j++)
{
if (hashTable[j])
{
pHead = hashTable[j];
if (pHead)
{
free(pHead->Key);
free(pHead);
}
}
}
}
fclose(fp);
time = time + omp_get_wtime();
printf("Elapsed time is %lf seconds. \n", time);
for (i = 0; i < FILENAME_NUM; i++)
{
free(filename_list_array);
}
free(filename_list_array);
return 0;
}
//Guanshi He
//ECE 563
//Small Project
//Matrix Multiply
//MPI Version
//we need to define N as the multiply of the number of the worker thread
//Since the number of worker(s) would be 1, 3, 7, 15
//We set the matrix size as least common multiple 105
#define N 1050
//declare the three matrix, the result will be in c matrix
double a[N][N],b[N][N],c[N][N];
main(int argc, char *argv[]) {
//MPI useful variable
int size,rank;
//number of worker threads
int numworkers;
//variable for message send and recv
int source,dest;
/*---------------------------- master ----------------------------*/
start = clock();
if (rank == 0) {
for (i=0; i<N; i++) {
for (j=0; j<N; j++) {
a[j]= 1.0;
b[j]= 1.0;
}
}
// send matrix data to the worker threads
rows = N / numworkers;
offset = 0;
for (dest = 1; dest <= numworkers; dest++) {
MPI_Send(&offset, 1, MPI_INT, dest, 1, MPI_COMM_WORLD);
MPI_Send(&rows, 1, MPI_INT, dest, 1, MPI_COMM_WORLD);
MPI_Send(&a[offset][0], rows * N, MPI_DOUBLE, dest, 1, MPI_COMM_WORLD);
MPI_Send(&b, N * N, MPI_DOUBLE, dest, 1, MPI_COMM_WORLD);
offset = offset + rows;
}
// receive the data from other thread
for (i = 1; i<= numworkers; i++) {
source = i;
MPI_Recv(&offset, 1, MPI_INT, source, 2, MPI_COMM_WORLD, &status);
MPI_Recv(&rows, 1, MPI_INT, source, 2, MPI_COMM_WORLD, &status);
MPI_Recv(&c[offset][0], rows * N, MPI_DOUBLE, source, 2, MPI_COMM_WORLD, &status);
}
//uncomment this for displaying the result
// printf("Here is the result matrix:\n");
// for (i=0; i<N; i++) {
// for (j=0; j<N; j++) {
// printf("%.2f ", c[j]);
// }
// printf ("\n");
// }
end = clock();
cpu_time1 = ((double)(end - start)) / CLOCKS_PER_SEC;
//Time the sequtial run
start = clock();
for (i = 0; i < N; i++) {
for (j = 0; j < N; j++) {
for (offset = 0; offset < N; offset++) {
c[j] += a[offset] * b[j][offset];
}
}
}
end = clock();
cpu_time2 = ((double)(end - start)) / CLOCKS_PER_SEC;
//output the results and do the comparation
printf("MPI size : %d\n", size);
printf("Parallel running time : %f\n", cpu_time1);
printf("Sequtial running time : %f\n", cpu_time2);
printf("SpeedUp : %.2f \n", (100*(float)cpu_time2/(float)cpu_time1));
}
/*---------------------------- worker----------------------------*/
if (rank > 0) {
source = 0;
//receving the data from master thread
MPI_Recv(&offset, 1, MPI_INT, source, 1, MPI_COMM_WORLD, &status);
MPI_Recv(&rows, 1, MPI_INT, source, 1, MPI_COMM_WORLD, &status);
MPI_Recv(&a, rows * N, MPI_DOUBLE, source, 1, MPI_COMM_WORLD, &status);
MPI_Recv(&b, N * N, MPI_DOUBLE, source, 1, MPI_COMM_WORLD, &status);
// do the computation
for (k = 0; k < N; k++) {
for (i = 0; i < rows; i++) {
c[k] = 0.0;
for (j = 0; j < N; j++) {
c[k] += a[j] * b[j][k];
}
}
}
//send back the data
MPI_Send(&offset, 1, MPI_INT, 0, 2, MPI_COMM_WORLD);
MPI_Send(&rows, 1, MPI_INT, 0, 2, MPI_COMM_WORLD);
MPI_Send(&c, rows * N, MPI_DOUBLE, 0, 2, MPI_COMM_WORLD);
}
MPI_Finalize();
//double start = omp_get_wtime();
int each_step = 0;
int ex = 0;
//start READER
for(i = 0; i < argc - 1; i++) {
reader(argv[i+1]);
}
//calculate for steps and ex
each_step = global_count / world_size;
ex = global_count % world_size;
if (world_rank == 0) {
//broadcast text
Node rec[world_size][100000];
//initialization
int k = 0;
char * temp = " ";
for (i = 0; i < world_size; i++) {
for (k = 0; k < 100000; k++) {
rec[k].nValue = 0;