|
本帖最后由 matlab的旋律 于 2018-5-3 14:56 编辑
//note: How to run and test: Enter all the txt files' name you want to test into one txt file named "filename_list.txt".
// Put this .c document and the filename_list.txt file together under the same folder and then run and compile.
#include <stdio.h>
#include <string.h>
#include <ctype.h>
#include <stdlib.h>
#include <omp.h>
#define FILENAME_LEN 50
#define FILENAME_NUM 100
#define HASH_TABLE_MAX_SIZE 100000
#define MAX_LINE_SIZE 100000
char text[FILENAME_NUM][MAX_LINE_SIZE * 50];
int hash_size[FILENAME_NUM];
typedef struct HashTable Node;
struct HashTable
{
char* Key;
int Value;
Node* pNext;
};
Node * hashTable[FILENAME_NUM + 1][HASH_TABLE_MAX_SIZE];
unsigned int hashComputeByHashFunc(const char* key)
{
unsigned int hash = 1;
int temp;
while (temp = *key++)
{
hash = hash * 33 + temp;
}
return hash;
}
void hashTableInsertFunc(const char* key, int value, int index)
{
unsigned int pos, poscheck;
Node* NewNode = (Node*)malloc(sizeof(Node));
memset(NewNode, 0, sizeof(Node));
NewNode->Key = (char*)malloc(sizeof(char)* (strlen(key) + 1));
strcpy(NewNode->Key, key);
NewNode->Value = value;
pos = hashComputeByHashFunc(key) % HASH_TABLE_MAX_SIZE;
poscheck = pos;
while (hashTable[index][poscheck] != NULL && hashTable[index][poscheck]->Value != -1)
{
poscheck++;
poscheck %= HASH_TABLE_MAX_SIZE;
}
NewNode->pNext = hashTable[index][pos];
hashTable[index][pos] = NewNode;
hash_size[index]++;
}
Node* hashTableLookupFunc(const char* key, int index)
{
unsigned int pos = hashComputeByHashFunc(key) % HASH_TABLE_MAX_SIZE;
Node* pHead;
if (hashTable[index][pos])
{
pHead = hashTable[index][pos];
while (pHead)
{
if (strcmp(key, pHead->Key) == 0)
{
return pHead;
}
pHead = pHead->pNext;
}
}
return NULL;
}
void hashTablePrintFunc(int index)
{
int i = 0;
Node* pHead;
printf("=========== content of hash table ===========\n");
while (i < HASH_TABLE_MAX_SIZE)
{
if (hashTable[index])
{
pHead = hashTable[index];
while (pHead)
{
printf("Word: %s, count: %d ", pHead->Key, pHead->Value);
pHead = pHead->pNext;
printf("\n");
}
}
i++;
}
}
void rmPunct2LowerFunc(char *p)
{
char *src = p, *dst = p;
while (*src)
{
if (ispunct((unsigned char)*src) || isdigit((unsigned char)*src))
{
src++;
}
else if (isupper((unsigned char)*src))
{
*dst++ = tolower((unsigned char)*src);
src++;
}
else if (src == dst)
{
src++;
dst++;
}
else
{
*dst++ = *src++;
}
}
*dst = 0;
}
void readFunc(const char* filename, int index)
{
FILE *fp = fopen(filename, "r");
char word[1000];
if (fp == NULL)
{
printf("input file is invalid !");
return;
}
while (fscanf(fp, " %s", word) == 1)
{
rmPunct2LowerFunc(word);
strcat(text[index], word);
strcat(text[index], " ");
}
fclose(fp);
}
void mapperFunc(int index)
{
char *nextWord;
nextWord = strtok(text[index], " \r\n");
while (nextWord != NULL)
{
#pragma omp critical
{
if (hashTableLookupFunc(nextWord, index) == NULL)
{
hashTableInsertFunc(nextWord, 1, index);
}
else
{
hashTableLookupFunc(nextWord, index)->Value++;
}
nextWord = strtok(NULL, " \r\n");
}
}
}
void reducerFunc(int reduceCount)
{
int i, j;
#pragma omp parallel for
for (i = 0; i < reduceCount; ++i)
{
for (j = 0; j < HASH_TABLE_MAX_SIZE; j++)
{
#pragma omp critical
if (hashTable[j])
{
Node* pHead = hashTable[j];
while (pHead)
{
if (hashTableLookupFunc(pHead->Key, FILENAME_NUM) == NULL)
hashTableInsertFunc(pHead->Key, pHead->Value, FILENAME_NUM);
else
{
int val = pHead->Value;
hashTableLookupFunc(pHead->Key, FILENAME_NUM)->Value += val;
}
pHead = pHead->pNext;
}
}
}
}
}
void writerFunc(FILE * fp)
{
int i;
Node* p;
fprintf(fp, "------print the result------ \n");
#pragma omp critial
{
for (i = 0; i < HASH_TABLE_MAX_SIZE; ++i)
{
if (hashTable[FILENAME_NUM] != NULL)
{
p = hashTable[FILENAME_NUM];
while (p)
{
fprintf(fp, "Word: %s, Count: %d\n", p->Key, p->Value);
p = p->pNext;
}
}
}
}
}
// Main Function
int main()
{
int file_num = 0;
FILE *read_filename = fopen("filename_list.txt", "r");
char **filename_list_array = (char **)malloc(sizeof(char*)* FILENAME_NUM);
int i,j;
Node* pHead;
#pragma omp parallel for
for (i = 0; i < FILENAME_NUM; i++)
{
filename_list_array = (char *)malloc(sizeof(char)* FILENAME_LEN);
}
if (read_filename == NULL)
{
printf("open the file incorrectly !");
return 0;
}
while (!feof(read_filename))
{
fscanf(read_filename, "%s\n", filename_list_array[file_num]);
file_num++;
}
printf("The result can be found in output.txt .\n");
omp_set_num_threads(8);
double time = -omp_get_wtime();
#pragma omp parallel private (i)
{
#pragma omp single nowait
{
for (i = 0; i < file_num; i++)
{
#pragma omp task
{
readFunc(filename_list_array, i);
}
}
}
}
#pragma omp parallel private (i)
{
#pragma omp single nowait
{
for (i = 0; i < file_num; i++)
{
#pragma omp task
{
mapperFunc(i);
}
}
}
}
#pragma omp barrier
reducerFunc(file_num);
#pragma omp barrier
FILE *fp = fopen("output.txt", "w");
writerFunc(fp);
hashTablePrintFunc(FILENAME_NUM);
#pragma omp parallel for
for (i = 0; i < FILENAME_NUM; i++)
{
for (j = 0; j < HASH_TABLE_MAX_SIZE; j++)
{
if (hashTable[j])
{
pHead = hashTable[j];
if (pHead)
{
free(pHead->Key);
free(pHead);
}
}
}
}
fclose(fp);
time = time + omp_get_wtime();
printf("Elapsed time is %lf seconds. \n", time);
for (i = 0; i < FILENAME_NUM; i++)
{
free(filename_list_array);
}
free(filename_list_array);
return 0;
}
//Guanshi He
//ECE 563
//Small Project
//Matrix Multiply
//MPI Version
#include <mpi.h>
#include <stdio.h>
#include <stdlib.h>
#include <time.h>
//we need to define N as the multiply of the number of the worker thread
//Since the number of worker(s) would be 1, 3, 7, 15
//We set the matrix size as least common multiple 105
#define N 1050
//declare the three matrix, the result will be in c matrix
double a[N][N],b[N][N],c[N][N];
main(int argc, char *argv[]) {
//MPI useful variable
int size,rank;
//number of worker threads
int numworkers;
//variable for message send and recv
int source,dest;
int rows,offset;
int i,j,k;
MPI_Init(&argc, &argv);
MPI_Comm_rank(MPI_COMM_WORLD, &rank);
MPI_Comm_size(MPI_COMM_WORLD, &size);
MPI_Status status;
numworkers = size - 1;
clock_t start, end;
double cpu_time1, cpu_time2;
/*---------------------------- master ----------------------------*/
start = clock();
if (rank == 0) {
for (i=0; i<N; i++) {
for (j=0; j<N; j++) {
a[j]= 1.0;
b[j]= 1.0;
}
}
// send matrix data to the worker threads
rows = N / numworkers;
offset = 0;
for (dest = 1; dest <= numworkers; dest++) {
MPI_Send(&offset, 1, MPI_INT, dest, 1, MPI_COMM_WORLD);
MPI_Send(&rows, 1, MPI_INT, dest, 1, MPI_COMM_WORLD);
MPI_Send(&a[offset][0], rows * N, MPI_DOUBLE, dest, 1, MPI_COMM_WORLD);
MPI_Send(&b, N * N, MPI_DOUBLE, dest, 1, MPI_COMM_WORLD);
offset = offset + rows;
}
// receive the data from other thread
for (i = 1; i<= numworkers; i++) {
source = i;
MPI_Recv(&offset, 1, MPI_INT, source, 2, MPI_COMM_WORLD, &status);
MPI_Recv(&rows, 1, MPI_INT, source, 2, MPI_COMM_WORLD, &status);
MPI_Recv(&c[offset][0], rows * N, MPI_DOUBLE, source, 2, MPI_COMM_WORLD, &status);
}
//uncomment this for displaying the result
// printf("Here is the result matrix:\n");
// for (i=0; i<N; i++) {
// for (j=0; j<N; j++) {
// printf("%.2f ", c[j]);
// }
// printf ("\n");
// }
end = clock();
cpu_time1 = ((double)(end - start)) / CLOCKS_PER_SEC;
//Time the sequtial run
start = clock();
for (i = 0; i < N; i++) {
for (j = 0; j < N; j++) {
for (offset = 0; offset < N; offset++) {
c[j] += a[offset] * b[j][offset];
}
}
}
end = clock();
cpu_time2 = ((double)(end - start)) / CLOCKS_PER_SEC;
//output the results and do the comparation
printf("MPI size : %d\n", size);
printf("Parallel running time : %f\n", cpu_time1);
printf("Sequtial running time : %f\n", cpu_time2);
printf("SpeedUp : %.2f \n", (100*(float)cpu_time2/(float)cpu_time1));
}
/*---------------------------- worker----------------------------*/
if (rank > 0) {
source = 0;
//receving the data from master thread
MPI_Recv(&offset, 1, MPI_INT, source, 1, MPI_COMM_WORLD, &status);
MPI_Recv(&rows, 1, MPI_INT, source, 1, MPI_COMM_WORLD, &status);
MPI_Recv(&a, rows * N, MPI_DOUBLE, source, 1, MPI_COMM_WORLD, &status);
MPI_Recv(&b, N * N, MPI_DOUBLE, source, 1, MPI_COMM_WORLD, &status);
// do the computation
for (k = 0; k < N; k++) {
for (i = 0; i < rows; i++) {
c[k] = 0.0;
for (j = 0; j < N; j++) {
c[k] += a[j] * b[j][k];
}
}
}
//send back the data
MPI_Send(&offset, 1, MPI_INT, 0, 2, MPI_COMM_WORLD);
MPI_Send(&rows, 1, MPI_INT, 0, 2, MPI_COMM_WORLD);
MPI_Send(&c, rows * N, MPI_DOUBLE, 0, 2, MPI_COMM_WORLD);
}
MPI_Finalize();
}
// Result from the output
// he95@scholar-fe02:~/SmallProj $ mpiexec -n 2 ./a.out
// MPI size : 2
// Parallel running time : 3.700000
// Sequtial running time : 3.350000
// SpeedUp : 90.54
// he95@scholar-fe02:~/SmallProj $ mpiexec -n 4 ./a.out
// MPI size : 4
// Parallel running time : 1.270000
// Sequtial running time : 3.380000
// SpeedUp : 266.14
// he95@scholar-fe02:~/SmallProj $ mpiexec -n 8 ./a.out
// MPI size : 8
// Parallel running time : 0.620000
// Sequtial running time : 3.800000
// SpeedUp : 612.90
// he95@scholar-fe02:~/SmallProj $ mpiexec -n 16 ./a.out
// MPI size : 16
// Parallel running time : 0.360000
// Sequtial running time : 3.800000
// SpeedUp : 1055.56
//Team Member:
//Haichao Xu
//Guanshi He
//ECE563 Large Project Final Turnin MPI Version
#include <mpi.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <time.h>
#define HASH_TABLE_MAX_SIZE 10000
#define number_of_processor 4
#define MAX_LINE_SIZE 100000
#define MAX_FILE_NUM 32
#define LINE_NUM 40
int flag[2];
int global_count = 0;
char text[MAX_LINE_SIZE * LINE_NUM];
int hash_table_size; //the number of key-value pairs in the hash table!
typedef struct HashTable Node;
struct HashTable
{
char sKey[40];
int nValue;
Node* pNext;
};
Node* hashTable[MAX_FILE_NUM][HASH_TABLE_MAX_SIZE * LINE_NUM]; //hash table data strcutrue
//string hash function
unsigned int hash_table_hash_str(const char* skey)
{
const signed char *p = (const signed char*)skey;
unsigned int hash = 5381;
int temp;
while( temp = *skey++) {
hash = hash * 33 + temp;
}
return hash;
}
//insert key-value into hash table
void hash_table_insert(const char* skey, int nvalue, int index)
{
unsigned int pos = hash_table_hash_str(skey) % HASH_TABLE_MAX_SIZE;
Node* pNewNode = (Node*)malloc(sizeof(Node));
memset(pNewNode, 0, sizeof(Node));
if(hash_table_size >= HASH_TABLE_MAX_SIZE)
{
return;
}
//pNewNode->sKey = (char*)malloc(sizeof(char) * (strlen(skey) + 1));
strcpy(pNewNode->sKey, skey);
pNewNode->nValue = nvalue;
//pNewNode->pNext = hashTable[pos];
hashTable[index][pos] = pNewNode;
hash_table_size++;
}
//lookup a key in the hash table
Node* hash_table_lookup(const char* skey, int index)
{
unsigned int pos = hash_table_hash_str(skey) % HASH_TABLE_MAX_SIZE;
Node* pHead;
if(hashTable[index][pos])
{
pHead = hashTable[index][pos];
if (pHead)
{
if(strcmp(skey, pHead->sKey) == 0)
return pHead;
}
}
return NULL;
}
void remove_punct_and_make_lower_case(char *p)
{
char *src = p, *dst = p;
while (*src) {
if (ispunct((unsigned char)*src)) {
src++;
} else if (isdigit((unsigned char)*src)) {
src++;
} else if (isupper((unsigned char)*src)) {
*dst++ = tolower((unsigned char)*src);
src++;
} else if (src == dst) {
src++;
dst++;
} else {
*dst++ = *src++;
}
}
*dst = 0;
}
// Reader function
void reader(const char* filename)
{
FILE *fp = fopen(filename, "r");
char word[1024] = {0};
if(fp == NULL){
printf("invalid input file");
exit(1);
}
//read word by word
while(fscanf(fp, " %1023s", word) == 1){
//convert to lower case
remove_punct_and_make_lower_case(word);
strcat(text, word);
strcat(text," ");
global_count++;
}
fclose(fp);
}
// Mapper function
void mapper(int each_step, int ex, int pid) {
char *nextWord;
char * ptr = NULL;
int count = 0;
nextWord = strtok_r(text, " \r\n", &ptr);
while(nextWord != NULL) {
if(count < pid * each_step + ex) {
nextWord = strtok_r(NULL, " \r\n", &ptr);
}
else
break;
count++;
}
count = 0;
while (nextWord != NULL && count <= pid * each_step + ex) {
if (hash_table_lookup(nextWord, pid) == NULL) {
hash_table_insert(nextWord, 1, pid);
} else {
hash_table_lookup(nextWord, pid)->nValue++;
}
count++;
nextWord = strtok_r(NULL, " \r\n", &ptr);
}
}
// Reducer function
void reducer(int reduceCount) {
int i, j, val;
Node* pHead;
for(i = 0; i < reduceCount; ++i)
{
for (j = 0; j < HASH_TABLE_MAX_SIZE; j++) {
if(hashTable[j])
{
pHead = hashTable[j];
if(pHead)
{
if (hash_table_lookup(pHead->sKey, MAX_FILE_NUM) == NULL)
hash_table_insert(pHead->sKey, pHead->nValue, MAX_FILE_NUM);
else {
val = pHead -> nValue;
hash_table_lookup(pHead->sKey, MAX_FILE_NUM)->nValue += val;
}
}
}
}
}
}
void writer(FILE * f) {
int i, j;
Node* p;
fprintf(f, "====== Printing out the result =====\n");
for(i = 0; i < HASH_TABLE_MAX_SIZE*LINE_NUM; i++)
{
if(hashTable[0] != NULL)
{
p = hashTable[0];
if(p)
{
fprintf(f, "Word: %s, Count: %d\n", p->sKey, p->nValue);
}
}
}
}
/* ============================test function ============================*/
int main(int argc, char** argv) {
// Initialize the MPI environment
int world_size, world_rank,j;
MPI_Init(&argc, &argv);
MPI_Comm_size(MPI_COMM_WORLD, &world_size);
MPI_Comm_rank(MPI_COMM_WORLD, &world_rank);
MPI_Status status[16];
MPI_Request request[16];
int nthreads, tid;
srand(time(NULL));
hash_table_size = 0;
memset(hashTable, 0, sizeof(Node*) * HASH_TABLE_MAX_SIZE);
int i = 0;
//debug starts here
MPI_Datatype Particletype;
MPI_Datatype type[2] = { MPI_INT, MPI_BYTE};
int blocklen[2] = {40, 1};
MPI_Aint offsets[2];
offsets[0] = 0;
offsets[1] = 40;
MPI_Type_create_struct(2, blocklen, offsets, type, &Particletype);
MPI_Type_commit(&Particletype);
//double start = omp_get_wtime();
int each_step = 0;
int ex = 0;
//start READER
for(i = 0; i < argc - 1; i++) {
reader(argv[i+1]);
}
//calculate for steps and ex
each_step = global_count / world_size;
ex = global_count % world_size;
if (world_rank == 0) {
//broadcast text
Node rec[world_size][100000];
//initialization
int k = 0;
char * temp = " ";
for (i = 0; i < world_size; i++) {
for (k = 0; k < 100000; k++) {
rec[k].nValue = 0;
strcpy(rec[k].sKey, temp);
}
}
MPI_Bcast(text, MAX_LINE_SIZE * LINE_NUM, MPI_BYTE, 0, MPI_COMM_WORLD);
//mapper
char *nextWord;
char * ptr = NULL;
int count = 0;
nextWord = strtok_r(text, " \r\n", &ptr);
while (nextWord != NULL && count < each_step + ex) {
#pragma omp critical
{
if (hash_table_lookup(nextWord, 0) == NULL) {
hash_table_insert(nextWord, 1, 0);
} else {
hash_table_lookup(nextWord, 0)->nValue++;
}
count++;
nextWord = strtok_r(NULL, " \r\n", &ptr);
}
}
i = 1;
while(i < world_size) {
MPI_Irecv(&rec, 100000, Particletype, i, i, MPI_COMM_WORLD, request+i-1);
i++;
}
//printf("%s \n %d \n %d \n", text, each_step, global_count);
MPI_Waitall(world_size-1, request, status);
//reduce
int j = 0;
for (i = 1; i < world_size; i++) {
for (j = 0; j < 100000; j++) {
if(rec[j].nValue > 0) {
if (hash_table_lookup(rec[j].sKey, 0) == NULL) {
hash_table_insert(rec[j].sKey, rec[j].nValue, 0);
} else {
//printf("%s \n %d \n", rec[j].sKey, rec[j].nValue);
hash_table_lookup(rec[j].sKey, 0)->nValue += rec[j].nValue;
}
}
}
}
//writer thread
FILE *fp = fopen("output.txt", "w");
writer(fp);
//double end = omp_get_wtime();
printf("%f \n", end - start);
}
else if(world_rank != 0) {
MPI_Bcast(text, MAX_LINE_SIZE * LINE_NUM, MPI_BYTE, 0, MPI_COMM_WORLD);
//mapper
Node send[100000];
int index = world_rank;
int count = 0;
char* skey;
mapper(each_step, ex, world_rank);
//copy to send
for(i = 0; i < HASH_TABLE_MAX_SIZE; ++i)
if(hashTable[index])
{
Node* pHead = hashTable[index];
if (pHead)
{
send[count].nValue = pHead->nValue;
skey = pHead->sKey;
strcpy(send[count].sKey, skey);
count++;
}
}
MPI_Send(&send, count, Particletype, 0, world_rank, MPI_COMM_WORLD);
}
j = 0;
for(i = 0; i < HASH_TABLE_MAX_SIZE; ++i)//free the memory of the hash table
{
if(j >= 31) break;
if(hashTable[j])
{
j++;
Node* pHead = hashTable[j];
if (pHead)
{
Node* pTemp = pHead;
//pHead = pHead->pNext;
if(pTemp)
{
free(pTemp->sKey);
free(pTemp);
}
}
}
}
// Finalize the MPI environment.
MPI_Finalize();
}
|
|